leproza

Simple ThreadPool

Mar 24th, 2022
688
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.61 KB | None | 0 0
  1. #include <iostream>
  2.  
  3. #include <thread>
  4. #include <vector>
  5. #include <chrono>
  6. #include <future>
  7. #include <mutex>
  8. #include <condition_variable>
  9.  
  10. class Task
  11. {
  12. public:
  13.   virtual void execute() = 0;
  14. };
  15.  
  16. class ThreadPool
  17. {
  18. public:
  19.   ThreadPool( int nMaxThreads );
  20.   ~ThreadPool();
  21.  
  22.   void execute();
  23.  
  24.   void addTask( Task* pTask );
  25.  
  26. protected:
  27.   int m_nMaxThreads;
  28.   std::vector< std::thread > m_threads;
  29.   std::vector< Task* > m_tasks;
  30.  
  31.   void runTask( Task* pTask );
  32.   static void exec( ThreadPool* pPool, Task* pTask );
  33.   bool finalizeThread( const std::thread::id& threadId );
  34.  
  35.   void finalize();
  36.  
  37.   std::mutex m_accessMutex;
  38.   std::mutex m_consoleMutex;
  39.   bool m_bThreadDone;
  40.   std::thread::id m_threadId;
  41.   std::condition_variable m_notifier;
  42. };
  43.  
  44. ThreadPool::ThreadPool( int nMaxThreads ) : m_nMaxThreads( nMaxThreads ),
  45.                                             m_bThreadDone( false )
  46. {
  47.  
  48. }
  49.  
  50. ThreadPool::~ThreadPool()
  51. {
  52.   finalize();
  53. }
  54.  
  55. void ThreadPool::finalize()
  56. {
  57.   m_tasks.clear();
  58.   for( auto& i : m_threads )
  59.   {
  60.     i.join();
  61.   }
  62.   m_threads.clear();
  63. }
  64.  
  65. void ThreadPool::execute()
  66. {
  67.   bool bWait = !m_threads.empty() || !m_tasks.empty();
  68.   while( bWait )
  69.   {
  70.     if( !m_tasks.empty() )
  71.     {
  72.       int nMin = m_nMaxThreads - m_threads.size() < m_tasks.size() ? m_nMaxThreads - m_threads.size() : (int)( m_tasks.size() );
  73.       for( int i = 0; i < nMin; ++i )
  74.       {
  75.         runTask( m_tasks[ 0 ] );
  76.         m_tasks.erase( m_tasks.begin() );
  77.       }
  78.     }
  79.     if( !m_threads.empty() )
  80.     {
  81.       std::unique_lock<std::mutex> lk( m_accessMutex );
  82.       m_notifier.wait( lk, [this]{return m_bThreadDone == true; } );
  83.       m_bThreadDone = false;
  84.       if( !finalizeThread( m_threadId ) )
  85.       {
  86.         throw "Can not finalize thread";
  87.       }
  88.       lk.unlock();
  89.     }
  90.     bWait = !m_threads.empty() || !m_tasks.empty();
  91.   }
  92.   finalize();
  93. }
  94.  
  95. void ThreadPool::addTask( Task* pTask )
  96. {
  97.   m_tasks.push_back( pTask );
  98. }
  99.  
  100. void ThreadPool::runTask( Task* pTask )
  101. {
  102.   m_threads.push_back( std::thread( ThreadPool::exec, this, pTask ) );
  103. }
  104.  
  105. void ThreadPool::exec( ThreadPool* pPool, Task* pTask )
  106. {
  107.   {
  108.     std::lock_guard< std::mutex > g( pPool->m_consoleMutex );
  109.     std::cout << "Execute task in thread " << std::this_thread::get_id() << std::endl;
  110.   }
  111.   clock_t t1 = clock();
  112.   pTask->execute();
  113.   clock_t t2 = clock();
  114.   {
  115.     std::lock_guard< std::mutex > g( pPool->m_consoleMutex );
  116.     std::cout << "Execute task in thread " << std::this_thread::get_id() << " DONE is " << t2 - t1 << " msecs\n";
  117.   }
  118.  
  119.   pPool->m_accessMutex.lock();
  120.   pPool->m_bThreadDone = true;
  121.   pPool->m_threadId = std::this_thread::get_id();
  122.   pPool->m_notifier.notify_one();
  123.   pPool->m_accessMutex.unlock();
  124. }
  125.  
  126. bool ThreadPool::finalizeThread( const std::thread::id& threadId )
  127. {
  128.   for( auto tr = m_threads.begin(); tr != m_threads.end(); ++tr )
  129.   {
  130.     if( tr->get_id() == threadId )
  131.     {
  132.       tr->join();
  133.       m_threads.erase( tr );
  134.       return true;
  135.     }
  136.   }
  137.   return false;
  138. }
  139.  
  140. class WaitTask : public Task
  141. {
  142. protected:
  143.   int m_waitSec;
  144. public:
  145.   WaitTask( int sec ) : m_waitSec( sec ) {};
  146.   virtual void execute()
  147.   {
  148.     std::this_thread::sleep_for( std::chrono::seconds( m_waitSec ) );
  149.   }
  150. };
  151.  
  152. int main()
  153. {
  154.   ThreadPool tp( 3 );
  155.   WaitTask tasks[] = { WaitTask( 5 ), WaitTask( 1 ), WaitTask( 2 ), WaitTask( 3 ), WaitTask( 1 ) };
  156.  
  157.   clock_t t1 = clock();
  158.  
  159.   for( int i = 0; i < 5; ++i )
  160.   {
  161.     tp.addTask( tasks + i );
  162.   }
  163.   tp.execute();
  164.  
  165.   clock_t t2 = clock();
  166.   std::cout << "Done by " << t2 - t1 << " msecs\n";
  167.  
  168.   return 0;
  169. }
  170.  
  171.  
Advertisement
Add Comment
Please, Sign In to add comment