Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <vector>
- #include <thread>
- #include <mutex>
- #include <condition_variable>
- #include <fstream>
- #include <sstream>
- #include <queue>
- #include <atomic>
- #include <chrono>
- #include <iostream>
- #define MESSAGES_PER_THREAD 100000U
- #define THREADS_AMOUNT 30U
- #define DIRECT_LOG_FILENAME "direct.log"
- #define MUTEXED_LOG_FILENAME "mutexed.log"
- #define DEFERRED_LOG_FILENAME "deferred.log"
- typedef std::vector<std::thread *> ThreadPool;
- typedef std::queue<std::string> LogQueue;
- std::ofstream directLogFileStream;
- std::ofstream mutexedLogFileStream;
- std::mutex mutexedLogMutex;
- std::ofstream deferredLogFileStream;
- std::atomic_bool isRunning(true);
- LogQueue logQueue;
- std::mutex logQueueMutex;
- std::condition_variable logQueueCond;
- std::size_t maxLogQueueSize = 0U;
- void direct_logging_worker()
- {
- for (std::size_t i = 0U; i < MESSAGES_PER_THREAD; ++i) {
- std::ostringstream msg;
- msg << "> This is a very-very-very important log message from thread-pool" << std::endl <<
- ": And this message is multiline! :)";
- directLogFileStream << msg.str() << std::endl;
- }
- }
- void mutexed_logging_worker()
- {
- for (std::size_t i = 0U; i < MESSAGES_PER_THREAD; ++i) {
- std::ostringstream msg;
- msg << "> This is a very-very-very important log message from thread-pool" << std::endl <<
- ": And this message is multiline! :)";
- std::unique_lock<std::mutex> lock(mutexedLogMutex);
- mutexedLogFileStream << msg.str() << std::endl;
- }
- }
- void deferred_logging_worker()
- {
- for (std::size_t i = 0U; i < MESSAGES_PER_THREAD; ++i) {
- std::ostringstream msg;
- msg << "> This is a very-very-very important log message from thread-pool" << std::endl <<
- ": And this message is multiline! :)";
- {
- std::unique_lock<std::mutex> lock(logQueueMutex);
- try {
- logQueue.push(std::move(msg.str()));
- } catch (std::exception& e) {
- std::cerr << "Log queue overflow has been detected" << std::endl;
- }
- if (maxLogQueueSize < logQueue.size()) {
- maxLogQueueSize = logQueue.size();
- }
- logQueueCond.notify_one();
- }
- }
- }
- void deferred_logging_thread()
- {
- bool hasPenfdingMessages = false;
- while (hasPenfdingMessages || isRunning.load()) {
- std::string pendingMessage;
- {
- std::unique_lock<std::mutex> lock(logQueueMutex);
- if (!logQueue.empty()) {
- pendingMessage = std::move(logQueue.front());
- logQueue.pop();
- } else {
- logQueueCond.wait(lock);
- if (!logQueue.empty()) {
- pendingMessage = std::move(logQueue.front());
- logQueue.pop();
- }
- }
- hasPenfdingMessages = !logQueue.empty();
- }
- if (!pendingMessage.empty()) {
- deferredLogFileStream << pendingMessage << std::endl;
- pendingMessage.clear();
- }
- }
- }
- int main(int argc, char* argv[])
- {
- ThreadPool threadPool(THREADS_AMOUNT);
- // Logging directly
- directLogFileStream.open(DIRECT_LOG_FILENAME, std::ios_base::out);
- std::chrono::system_clock::time_point directLoggingStarted = std::chrono::system_clock::now();
- for (std::size_t i = 0; i < threadPool.size(); ++i) {
- threadPool[i] = new std::thread(direct_logging_worker);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- for (std::size_t i = 0; i < threadPool.size(); ++i) {
- threadPool[i]->join();
- delete threadPool[i];
- }
- std::chrono::system_clock::time_point directLoggingFinished = std::chrono::system_clock::now();
- directLogFileStream.close();
- std::cout << "Direct logging took " << std::chrono::duration_cast<std::chrono::microseconds>(directLoggingFinished - directLoggingStarted).count() << " microseconds" << std::endl;
- // Mutexed logging
- mutexedLogFileStream.open(MUTEXED_LOG_FILENAME, std::ios_base::out);
- std::chrono::system_clock::time_point mutexedLoggingStarted = std::chrono::system_clock::now();
- for (std::size_t i = 0; i < threadPool.size(); ++i) {
- threadPool[i] = new std::thread(mutexed_logging_worker);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- for (std::size_t i = 0; i < threadPool.size(); ++i) {
- threadPool[i]->join();
- delete threadPool[i];
- }
- std::chrono::system_clock::time_point mutexedLoggingFinished = std::chrono::system_clock::now();
- directLogFileStream.close();
- std::cout << "Mutexed logging took " << std::chrono::duration_cast<std::chrono::microseconds>(mutexedLoggingFinished - mutexedLoggingStarted).count() << " microseconds" << std::endl;
- // Deferred logging
- deferredLogFileStream.open(DEFERRED_LOG_FILENAME, std::ios_base::out);
- isRunning = true;
- std::thread deferredLoggingThread(deferred_logging_thread);
- std::chrono::system_clock::time_point deferredLoggingStarted = std::chrono::system_clock::now();
- for (std::size_t i = 0; i < threadPool.size(); ++i) {
- threadPool[i] = new std::thread(deferred_logging_worker);
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(100));
- for (std::size_t i = 0; i < threadPool.size(); ++i) {
- threadPool[i]->join();
- delete threadPool[i];
- }
- std::chrono::system_clock::time_point deferredLoggingFinished = std::chrono::system_clock::now();
- isRunning = false;
- {
- std::unique_lock<std::mutex> lock(logQueueMutex);
- logQueueCond.notify_one();
- }
- deferredLoggingThread.join();
- std::chrono::system_clock::time_point deferredPostprocessingFinished = std::chrono::system_clock::now();
- deferredLogFileStream.close();
- std::cout << "Deferred logging took " << std::chrono::duration_cast<std::chrono::microseconds>(deferredLoggingFinished - deferredLoggingStarted).count() <<
- " microseconds, postprocessing took " << std::chrono::duration_cast<std::chrono::microseconds>(deferredPostprocessingFinished - deferredLoggingFinished).count() <<
- " microseconds, max queue size is " << maxLogQueueSize << std::endl;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement