Advertisement
Guest User

Untitled

a guest
Nov 28th, 2014
169
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.65 KB | None | 0 0
  1. #include <vector>
  2. #include <thread>
  3. #include <mutex>
  4. #include <condition_variable>
  5. #include <fstream>
  6. #include <sstream>
  7. #include <queue>
  8. #include <atomic>
  9. #include <chrono>
  10. #include <iostream>
  11.  
  12. #define MESSAGES_PER_THREAD 100000U
  13. #define THREADS_AMOUNT 30U
  14. #define DIRECT_LOG_FILENAME "direct.log"
  15. #define MUTEXED_LOG_FILENAME "mutexed.log"
  16. #define DEFERRED_LOG_FILENAME "deferred.log"
  17.  
  18. typedef std::vector<std::thread *> ThreadPool;
  19. typedef std::queue<std::string> LogQueue;
  20.  
  21. std::ofstream directLogFileStream;
  22. std::ofstream mutexedLogFileStream;
  23. std::mutex mutexedLogMutex;
  24. std::ofstream deferredLogFileStream;
  25. std::atomic_bool isRunning(true);
  26. LogQueue logQueue;
  27. std::mutex logQueueMutex;
  28. std::condition_variable logQueueCond;
  29. std::size_t maxLogQueueSize = 0U;
  30.  
  31. void direct_logging_worker()
  32. {
  33.     for (std::size_t i = 0U; i < MESSAGES_PER_THREAD; ++i) {
  34.         std::ostringstream msg;
  35.         msg << "> This is a very-very-very important log message from thread-pool" << std::endl <<
  36.             ": And this message is multiline! :)";
  37.         directLogFileStream << msg.str() << std::endl;
  38.     }
  39. }
  40.  
  41. void mutexed_logging_worker()
  42. {
  43.     for (std::size_t i = 0U; i < MESSAGES_PER_THREAD; ++i) {
  44.         std::ostringstream msg;
  45.         msg << "> This is a very-very-very important log message from thread-pool" << std::endl <<
  46.             ": And this message is multiline! :)";
  47.         std::unique_lock<std::mutex> lock(mutexedLogMutex);
  48.         mutexedLogFileStream << msg.str() << std::endl;
  49.     }
  50. }
  51.  
  52. void deferred_logging_worker()
  53. {
  54.     for (std::size_t i = 0U; i < MESSAGES_PER_THREAD; ++i) {
  55.         std::ostringstream msg;
  56.         msg << "> This is a very-very-very important log message from thread-pool" << std::endl <<
  57.             ": And this message is multiline! :)";
  58.         {
  59.             std::unique_lock<std::mutex> lock(logQueueMutex);
  60.             try {
  61.                 logQueue.push(std::move(msg.str()));
  62.             } catch (std::exception& e) {
  63.                 std::cerr << "Log queue overflow has been detected" << std::endl;
  64.             }
  65.             if (maxLogQueueSize < logQueue.size()) {
  66.                 maxLogQueueSize = logQueue.size();
  67.             }
  68.             logQueueCond.notify_one();
  69.         }
  70.     }
  71. }
  72.  
  73. void deferred_logging_thread()
  74. {
  75.     bool hasPenfdingMessages = false;
  76.     while (hasPenfdingMessages || isRunning.load()) {
  77.         std::string pendingMessage;
  78.         {
  79.             std::unique_lock<std::mutex> lock(logQueueMutex);
  80.             if (!logQueue.empty()) {
  81.                 pendingMessage = std::move(logQueue.front());
  82.                 logQueue.pop();
  83.             } else {
  84.                 logQueueCond.wait(lock);
  85.                 if (!logQueue.empty()) {
  86.                     pendingMessage = std::move(logQueue.front());
  87.                     logQueue.pop();
  88.                 }
  89.             }
  90.             hasPenfdingMessages = !logQueue.empty();
  91.         }
  92.         if (!pendingMessage.empty()) {
  93.             deferredLogFileStream << pendingMessage << std::endl;
  94.             pendingMessage.clear();
  95.         }
  96.     }
  97. }
  98.  
  99. int main(int argc, char* argv[])
  100. {
  101.     ThreadPool threadPool(THREADS_AMOUNT);
  102.  
  103.     // Logging directly
  104.     directLogFileStream.open(DIRECT_LOG_FILENAME, std::ios_base::out);
  105.     std::chrono::system_clock::time_point directLoggingStarted = std::chrono::system_clock::now();
  106.     for (std::size_t i = 0; i < threadPool.size(); ++i) {
  107.         threadPool[i] = new std::thread(direct_logging_worker);
  108.     }
  109.     std::this_thread::sleep_for(std::chrono::milliseconds(100));
  110.     for (std::size_t i = 0; i < threadPool.size(); ++i) {
  111.         threadPool[i]->join();
  112.         delete threadPool[i];
  113.     }
  114.     std::chrono::system_clock::time_point directLoggingFinished = std::chrono::system_clock::now();
  115.     directLogFileStream.close();
  116.     std::cout << "Direct logging took " << std::chrono::duration_cast<std::chrono::microseconds>(directLoggingFinished - directLoggingStarted).count() << " microseconds" << std::endl;
  117.  
  118.     // Mutexed logging
  119.     mutexedLogFileStream.open(MUTEXED_LOG_FILENAME, std::ios_base::out);
  120.     std::chrono::system_clock::time_point mutexedLoggingStarted = std::chrono::system_clock::now();
  121.     for (std::size_t i = 0; i < threadPool.size(); ++i) {
  122.         threadPool[i] = new std::thread(mutexed_logging_worker);
  123.     }
  124.     std::this_thread::sleep_for(std::chrono::milliseconds(100));
  125.     for (std::size_t i = 0; i < threadPool.size(); ++i) {
  126.         threadPool[i]->join();
  127.         delete threadPool[i];
  128.     }
  129.     std::chrono::system_clock::time_point mutexedLoggingFinished = std::chrono::system_clock::now();
  130.     directLogFileStream.close();
  131.     std::cout << "Mutexed logging took " << std::chrono::duration_cast<std::chrono::microseconds>(mutexedLoggingFinished - mutexedLoggingStarted).count() << " microseconds" << std::endl;
  132.  
  133.     // Deferred logging
  134.     deferredLogFileStream.open(DEFERRED_LOG_FILENAME, std::ios_base::out);
  135.     isRunning = true;
  136.     std::thread deferredLoggingThread(deferred_logging_thread);
  137.     std::chrono::system_clock::time_point deferredLoggingStarted = std::chrono::system_clock::now();
  138.     for (std::size_t i = 0; i < threadPool.size(); ++i) {
  139.         threadPool[i] = new std::thread(deferred_logging_worker);
  140.     }
  141.     std::this_thread::sleep_for(std::chrono::milliseconds(100));
  142.     for (std::size_t i = 0; i < threadPool.size(); ++i) {
  143.         threadPool[i]->join();
  144.         delete threadPool[i];
  145.     }
  146.     std::chrono::system_clock::time_point deferredLoggingFinished = std::chrono::system_clock::now();
  147.     isRunning = false;
  148.     {
  149.         std::unique_lock<std::mutex> lock(logQueueMutex);
  150.         logQueueCond.notify_one();
  151.     }
  152.     deferredLoggingThread.join();
  153.     std::chrono::system_clock::time_point deferredPostprocessingFinished = std::chrono::system_clock::now();
  154.     deferredLogFileStream.close();
  155.     std::cout << "Deferred logging took " << std::chrono::duration_cast<std::chrono::microseconds>(deferredLoggingFinished - deferredLoggingStarted).count() <<
  156.         " microseconds, postprocessing took " << std::chrono::duration_cast<std::chrono::microseconds>(deferredPostprocessingFinished - deferredLoggingFinished).count() <<
  157.         " microseconds, max queue size is " << maxLogQueueSize << std::endl;
  158. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement