Advertisement
Guest User

Untitled

a guest
Jul 21st, 2017
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 15.25 KB | None | 0 0
  1. /*
  2.  *  logger.h
  3.  *  ACIDCache
  4.  *
  5.  *  Created by Bob Walters on 2/24/08.
  6.  *  Copyright 2008 __MyCompanyName__. All rights reserved.
  7.  *
  8.  */
  9.  
  10. #ifndef STLDB_LOGGER_H
  11. #define STLDB_LOGGER_H 1
  12.  
  13. #include <cstdlib>
  14. #include <ios>
  15. #include <iostream>
  16. #include <iomanip>
  17. #include <sstream>
  18. #include <fstream>
  19.  
  20. #include <boost/interprocess/containers/deque.hpp>
  21. #include <boost/interprocess/containers/string.hpp>
  22. #include <boost/interprocess/sync/scoped_lock.hpp>
  23. #include <boost/interprocess/streams/vectorstream.hpp>
  24.  
  25. #include <stldb/cachetypes.h>
  26. #include <stldb/commit_buffer.h>
  27. #include <stldb/containers/string_compare.h>
  28. #include <stldb/logging.h>
  29. #include <stldb/detail/os_file_functions_ext.h>
  30. #include <stldb/timing/timer.h>
  31. #include <stldb/statistics.h>
  32. #include <stldb/trace.h>
  33. #include <stldb/detail/db_file_util.h>
  34.  
  35. using boost::interprocess::basic_string;
  36. using boost::interprocess::basic_ovectorstream;
  37.  
  38. using std::cout;
  39. using std::endl;
  40.  
  41. namespace stldb {
  42.  
  43.  
  44. /**
  45.  * The Logging information kept in shared memory
  46.  */
  47. template <class void_alloc_t, class mutex_type>
  48. struct SharedLogInfo
  49. {
  50.     typedef typename void_alloc_t::template rebind<char>::other  shm_char_alloc_t;
  51.     typedef boost::interprocess::basic_string<char, std::char_traits<char>, shm_char_alloc_t>  shm_string;
  52.  
  53.     // For synchronization
  54.     mutex_type      _queue_mutex;
  55.     mutex_type      _file_mutex;
  56.  
  57.     // This sequence value is used to ensure that the order of disk writes
  58.     // reflects the order of transaction commits.
  59.     transaction_id_t _next_lsn;
  60.  
  61.     // Info about what has been written, and synced to disk...
  62.     transaction_id_t _last_write_txn_id;
  63.     transaction_id_t _last_sync_txn_id;
  64.  
  65.     // queue of transactions waitng for disk write.
  66.     typedef stldb::commit_buffer_t<void_alloc_t>*  waiting_write;
  67.     typedef typename void_alloc_t::template rebind<waiting_write>::other deque_alloc_t;
  68.     boost::interprocess::deque<waiting_write,deque_alloc_t> waiting_txns;   // transactions waiting to enter log().
  69.  
  70.     // Configured log directory and current filename
  71.     shm_string  log_dir;        // config - directory to contain log files
  72.     shm_string  log_filename;   // form: LXXXXXXXXXXXXXXXX.log, where 16X = the 64-bit value of the first txn_id in the log
  73.  
  74.     boost::interprocess::offset_t   log_len;        // current - current len.
  75.     boost::interprocess::offset_t   log_max_len;    // config - max len of any one log file
  76.  
  77.     bool        log_sync;       // config - should disk sync be done after write?
  78.  
  79.     struct log_stats  stats;    // stats
  80.  
  81.     // Meant to be constructed within a shared region.
  82.     SharedLogInfo(const void_alloc_t &alloc)
  83.         : _queue_mutex(), _file_mutex()
  84.         , _next_lsn(1)
  85.         , _last_write_txn_id(0), _last_sync_txn_id(0)
  86.         , waiting_txns(alloc)
  87.         , log_dir(alloc), log_filename(alloc)
  88.         , log_len(0), log_max_len(256*1024*1024)
  89.         , log_sync(true), stats()
  90.         { }
  91.  
  92.     // provide for serialization of an XML-based form of DatabaseInfo contents
  93.     template <class Archive>
  94.     void serialize(Archive &ar, const unsigned int version)
  95.     {
  96.         std::size_t num_waiting_txns = waiting_txns.size();
  97.         ar & BOOST_SERIALIZATION_NVP(_next_lsn)
  98.            & BOOST_SERIALIZATION_NVP(_last_write_txn_id)
  99.            & BOOST_SERIALIZATION_NVP(_last_sync_txn_id)
  100.            & BOOST_SERIALIZATION_NVP( num_waiting_txns)
  101.            & BOOST_SERIALIZATION_NVP(log_dir)
  102.            & BOOST_SERIALIZATION_NVP(log_filename)
  103.            & BOOST_SERIALIZATION_NVP(log_len)
  104.            & BOOST_SERIALIZATION_NVP(log_sync)
  105.            & BOOST_SERIALIZATION_NVP(stats);
  106.     }
  107. };
  108.  
  109. /**
  110.  * The logger is invoked when transactions are committing.
  111.  */
  112. template <class void_alloc_t, class mutex_type>
  113. class Logger
  114. {
  115. public:
  116.     Logger()
  117.         : padding_buffer()
  118.         , _shm_info(NULL)
  119.         , _logfd(0) /* TODO - Too OS specific an initializer? */
  120.         , _my_log_filename()
  121.         , _my_fp_offset(0)
  122.     { }
  123.  
  124.     /**
  125.      * Set the shared log info.
  126.      */
  127.     void set_shared_info( SharedLogInfo<void_alloc_t,mutex_type> *log_info) {
  128.         _shm_info = log_info;
  129.     }
  130.  
  131.     /**
  132.      * Destructor closes this processes file handle to any open log file.
  133.      */
  134.     ~Logger()
  135.     {
  136.         if (!_my_log_filename.empty()) {
  137.             boost::interprocess::detail::close_file(_logfd);
  138.         }
  139.     }
  140.  
  141.     // Called by higher levels of the stack to record that a diskless commit
  142.     // has occurred.  This is for cases of
  143.     void record_diskless_commit() {
  144.         boost::interprocess::scoped_lock<mutex_type> file_lock_holder(_shm_info->_file_mutex);
  145.         _shm_info->stats.total_commits++;
  146.         _shm_info->stats.total_free_commits++;
  147.     }
  148.  
  149.     // returns a copy of current logging statistics
  150.     log_stats get_stats(bool reset=false) {
  151.         boost::interprocess::scoped_lock<mutex_type> file_lock_holder(_shm_info->_file_mutex);
  152.         log_stats result = _shm_info->stats;
  153.         if (reset) {
  154.             _shm_info->stats = log_stats(); // reset via re-construction
  155.         }
  156.         return result;
  157.     }
  158.  
  159.     /**
  160.      * If there is no txn currently waiting for a shot at the log, then immediately
  161.      * grant the process access to the log.  Otherwise, it joins the waiting list.
  162.      */
  163.     transaction_id_t queue_for_commit( stldb::commit_buffer_t<void_alloc_t> *buff )
  164.     {
  165.         stldb::timer t1("Logger::queue_for_commit");
  166.  
  167.         // sanity test (debugging)
  168.         if (buff->size()==0 || buff->op_count==0) {
  169.             std::ostringstream error;
  170.             error << "Assertion: detected commit buffer of size==0 or ops==0 in log() method";
  171.             STLDB_TRACE(severe_e, error.str() );
  172.             throw std::ios_base::failure( error.str() );
  173.         }
  174.  
  175.         buff->prepare_header();
  176.  
  177.         boost::interprocess::scoped_lock<mutex_type> lock_holder(_shm_info->_queue_mutex);
  178.  
  179.         transaction_id_t lsn = _shm_info->_next_lsn++;
  180.         buff->finalize_header(lsn);
  181.         _shm_info->waiting_txns.push_back( buff );
  182.         return lsn;
  183.     }
  184.  
  185.     /**
  186.      * Acquire the right to write to the log file, and then write data from the commit
  187.      * queue as necessary until out buffer has been written.  Once the data has been
  188.      * written, optionally call sync() to wait for that data to make it out of OS
  189.      * memory and to the disk.
  190.      */
  191.     void log( transaction_id_t my_commit_seq )
  192.     {
  193.         stldb::timer t1("Logger::log");
  194.         boost::interprocess::scoped_lock<mutex_type> file_lock_holder(_shm_info->_file_mutex);
  195.  
  196.         uint64_t writes=0, txn_written=0, bytes=0, syncs=0;
  197.  
  198.         // we need to write commit buffers, until ours is written.
  199.         while (_shm_info->_last_write_txn_id < my_commit_seq) {
  200.  
  201.             // ensure we have an open logfile, and if the current one has
  202.             // reached its maximum size, open a new one.
  203.             ensure_open_logfile();
  204.  
  205.             // get a set of commit buffers up the the limits allowed,
  206.             // and write them all to disk.
  207.             size_t txn_count = 0;
  208.             transaction_id_t max_lsn = _shm_info->_last_write_txn_id;
  209.             boost::interprocess::offset_t new_file_len = _shm_info->log_len;
  210.  
  211.             boost::interprocess::scoped_lock<mutex_type> queue_lock_holder(_shm_info->_queue_mutex);
  212.  
  213.             stldb::timer t4("fetch commit_buffers, do checksums");
  214.             while (txn_count < max_txn_per_write
  215.                    && !_shm_info->waiting_txns.empty()
  216.                    && new_file_len < _shm_info->log_max_len ) {
  217.  
  218.                 // get one txn buffer, create a header record for it.
  219.                 stldb::commit_buffer_t<void_alloc_t> *buff = _shm_info->waiting_txns[0];
  220.  
  221.                 // add the header and the buffer to the io_vec structures.
  222.                 iov[2*txn_count].iov_base = &(buff->header);
  223.                 iov[2*txn_count].iov_len = sizeof(struct log_header);
  224.                 iov[2*txn_count+1].iov_base = const_cast<char*>(&*(buff->begin()));
  225.                 iov[2*txn_count+1].iov_len = buff->size()*sizeof(char);
  226.  
  227.                 // stats keeping
  228.                 bytes += iov[2*txn_count].iov_len + iov[2*txn_count+1].iov_len;
  229.                 txn_written++;
  230.  
  231.                 max_lsn = buff->header.lsn;
  232.                 new_file_len += iov[2*txn_count].iov_len + iov[2*txn_count+1].iov_len;
  233.                 txn_count++;
  234.  
  235.                 _shm_info->waiting_txns.pop_front();
  236.             }
  237.             t4.end();
  238.  
  239.             // we can release the queue lock at this point...
  240.             queue_lock_holder.unlock();
  241.  
  242.             // pad the writes, as necessary, in order to ensure that each
  243.             // write can begin on sector/page boundaries, per optimum_write_alignment.
  244.             int buffercount = txn_count*2;
  245.             uint64_t padding = (new_file_len % optimum_write_alignment != 0)
  246.                 ? optimum_write_alignment - (new_file_len % optimum_write_alignment) : 0;
  247.             if (padding != 0 && padding < sizeof(struct log_header))
  248.                 padding += optimum_write_alignment;
  249.             if (padding != 0) {
  250.                 // we can add a padding transaction record in order to
  251.                 // promote disk writes of 'optimum_write_alignment' increments.
  252.                 padding_header.segment_size = padding - sizeof(struct log_header);
  253.                 padding_header.finalize();
  254.                 iov[2*txn_count].iov_base = &padding_header;
  255.                 iov[2*txn_count].iov_len = sizeof(struct log_header);
  256.                 buffercount++;
  257.                 if (padding - sizeof(struct log_header) > 0) {
  258.                     iov[2*txn_count+1].iov_base = padding_buffer;
  259.                     iov[2*txn_count+1].iov_len = padding - sizeof(struct log_header);
  260.                     buffercount++;
  261.                 }
  262.                 new_file_len += padding;
  263.                 bytes += padding;
  264.             }
  265.  
  266.             // make sure our fd is pointing to the correct offset in the file.
  267.             if (_my_fp_offset != _shm_info->log_len) {
  268.                 boost::interprocess::detail::set_file_pointer(_logfd, _shm_info->log_len,
  269.                     boost::interprocess::file_begin);
  270.             }
  271.  
  272.             // Write the data we have gathered to the file
  273.             stldb::timer t5("write()");
  274.             bool written = stldb::io::gathered_write_file(_logfd, &iov[0], buffercount);
  275.             if (!written) {
  276.                 std::ostringstream error;
  277.                 error << "stldb::io::gathered_write_file() to log file failed.  errno: " << errno;
  278.                 throw std::ios_base::failure( error.str() );
  279.             }
  280.             _my_fp_offset = new_file_len;
  281.             writes++;
  282.             t5.end();
  283.  
  284.             // Update the info about file len, and last written log_seq
  285.             _shm_info->_last_write_txn_id = max_lsn;
  286.             _shm_info->log_len = new_file_len;
  287.  
  288.         } // while max log_seq written is < my log_seq
  289.  
  290.         // Ok, once here, our commit buffer has been written, but we still might
  291.         // need to wait for it to go to disk (via stldb::io::sync())
  292.         if (_shm_info->_last_sync_txn_id < my_commit_seq && _shm_info->log_sync ) {
  293.             this->sync_log(file_lock_holder);
  294.             syncs++;
  295.         }
  296.  
  297.         // record stats:
  298.         _shm_info->stats.total_commits++;
  299.         _shm_info->stats.total_write_commits += (writes>0) ? 1 : 0;
  300.         _shm_info->stats.total_sync_only_commits += (writes==0 && syncs>0) ? 1 : 0;
  301.         _shm_info->stats.total_free_commits += (writes==0 && syncs==0) ? 1 : 0;
  302.         _shm_info->stats.total_writes += writes;
  303.         if ( _shm_info->stats.max_writes_per_commit < writes )
  304.             _shm_info->stats.max_writes_per_commit = writes;
  305.         if ( _shm_info->stats.max_buffers_per_commit < txn_written )
  306.             _shm_info->stats.max_buffers_per_commit = txn_written;
  307.         _shm_info->stats.total_bytes_written += bytes;
  308.         _shm_info->stats.total_syncs += syncs;
  309.  
  310.         // file_lock_holder releases as it goes out of scope.
  311.     };
  312.  
  313.     // Sync the log to disk.   CALLER MUST HOLD FILE LOCK.
  314.     void sync_log(boost::interprocess::scoped_lock<mutex_type> &held_file_lock)
  315.     {
  316.  
  317.         transaction_id_t new_sync_txn_id = _shm_info->_last_write_txn_id;
  318.  
  319.         // TODO - it might be possible to allow a thread to do additional
  320.         // writes to this file while the current thread oes an fsync to
  321.         // sync the last write to disk.  This behavior is enabled by
  322.         // uncommenting the unlock() and lock() pair below.
  323.         // held_file_lock.unlock();
  324.  
  325.         // may block for some time....
  326.         stldb::timer t6("sync()");
  327.         stldb::io::sync(_logfd);
  328.         t6.end();
  329.  
  330.         // now re-acquire the file mutex..
  331.         // held_file_lock.lock();
  332.  
  333.         if (_shm_info->_last_sync_txn_id < new_sync_txn_id) {
  334.             _shm_info->_last_sync_txn_id = new_sync_txn_id;
  335.         }
  336.     }
  337.  
  338.     // Called by checkpoint logic to deliberately advance the current logfile so that the
  339.     // the first transaction to commit after the start of a checkpoint causes all preceding
  340.     // logs to become archivable.  It minimizes the log reads needed during recovery.
  341.     // CALLER must hold file lock.
  342.     void advance_logfile() {
  343.         // In this case it is time to advance to a new log file.
  344.         _shm_info->log_filename = stldb::detail::log_filename(_shm_info->log_dir.c_str(), _shm_info->_last_write_txn_id).c_str();
  345.         _shm_info->log_len = 0;
  346.         STLDB_TRACE(fine_e, "Logger: starting new log file: " << _shm_info->log_filename.c_str());
  347.     }
  348.  
  349. private:
  350.  
  351.     /**
  352.      * ensure that _logfd is referring to the correct (current) logfile,
  353.      * and that the logfile in question has not exceeded its maximum size.
  354.      * Caller holds _shm_info->file_mutex.
  355.      */
  356.     void ensure_open_logfile() {
  357.         // write the buff to the log file.
  358.         if ( _shm_info->log_filename.empty()
  359.              || (_shm_info->log_len >= _shm_info->log_max_len)
  360.              || (_shm_info->log_filename.compare(_my_log_filename.c_str()) != 0) )
  361.         {
  362.             stldb::timer t1("Logger::open_logfile");
  363.             // need to reopen our _logfd at a minimum.
  364.  
  365.             if (_shm_info->log_filename.empty() || _shm_info->log_len >= _shm_info->log_max_len)
  366.             {
  367.                 // In this case it is time to advance to a new log file.
  368.                 this->advance_logfile();
  369.             }
  370.  
  371.             if ( _shm_info->log_filename.compare( _my_log_filename.c_str() ) != 0
  372.                  && !_my_log_filename.empty() ) {
  373.                 // close whatever file we used to have open.
  374.                 stldb::timer t2("Logger::close_logfile");
  375.                 boost::interprocess::detail::close_file(_logfd);
  376.             }
  377.  
  378.             // now open _shm_info->log_filename
  379.             STLDB_TRACE(fine_e, "Logger: create_or_open log file: " << _shm_info->log_filename.c_str());
  380.             _logfd = boost::interprocess::detail::create_or_open_file( _shm_info->log_filename.c_str(),
  381.                     boost::interprocess::read_write );
  382.             if (_logfd < 0) {  /* TODO - Too OS specific */
  383.                 std::ostringstream error;
  384.                 error << "STLdb Logger: create_or_open_file() of log file '" << _shm_info->log_filename.c_str() << "' failed.  errno=" << errno << ": " << strerror(errno);
  385.                 STLDB_TRACE(severe_e, error.str());
  386.                 throw std::ios_base::failure( error.str() );
  387.             }
  388.             // most filesystems will benefit from this:
  389.             // we seek to maxsize and write a byte, so that during subsequent I/O
  390.             // into the file, we aren't changing the inode's length constantly.
  391.             boost::interprocess::detail::set_file_pointer(_logfd, _shm_info->log_max_len,
  392.                     boost::interprocess::file_begin);
  393.  
  394.             char byte = 0;
  395.             bool written = stldb::io::write_file(_logfd, &byte, 1);
  396.             if (!written) {
  397.                 std::ostringstream error;
  398.                 error << "stldb::io::_write_file(1) at offset log_max_len failed.  errno: " << errno;
  399.                 throw std::ios_base::failure( error.str() );
  400.             }
  401.             _my_fp_offset = _shm_info->log_max_len +1;
  402.  
  403.             // Note that we now have this file open.
  404.             _my_log_filename = _shm_info->log_filename.c_str();
  405.         }
  406.     }
  407.  
  408.     // hard-coded approach needed due to gcc-3.4.3 bug.  Corresponds
  409.     // to IOV_MAX=16 (Solaris)
  410.     //static const size_t max_txn_per_write = (io::max_write_region_per_call-1)/2;
  411.     static const size_t max_txn_per_write = 7;
  412.     static const size_t optimum_write_alignment = 512;
  413.  
  414.     // Used to hold structures used with writev()
  415.     io::write_region_t iov[2*(max_txn_per_write+1)];
  416.  
  417.     // used to provide padding of writes.
  418.     log_header padding_header;
  419.     char padding_buffer[optimum_write_alignment + sizeof(struct log_header)];
  420.  
  421.     // details about logging kept in shared memory.
  422.     SharedLogInfo<void_alloc_t,mutex_type> *_shm_info;
  423.  
  424.     // The details of the open file as this process currently knows it.
  425.     boost::interprocess::file_handle_t  _logfd;
  426.     std::string   _my_log_filename; // what _logfd refers to
  427.     boost::interprocess::offset_t  _my_fp_offset;  // offset into _logfd which a write() would currently go to.
  428. };
  429.  
  430. } // stldb namespace
  431.  
  432. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement