Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <thread>
- #include <vector>
- #include <chrono>
- #include <future>
- #include <mutex>
- #include <condition_variable>
- class Task
- {
- public:
- virtual void execute() = 0;
- };
- class ThreadPool
- {
- public:
- ThreadPool( int nMaxThreads );
- ~ThreadPool();
- void execute();
- void addTask( Task* pTask );
- protected:
- int m_nMaxThreads;
- std::vector< std::thread > m_threads;
- std::vector< Task* > m_tasks;
- void runTask( Task* pTask );
- static void exec( ThreadPool* pPool, Task* pTask );
- bool finalizeThread( const std::thread::id& threadId );
- void finalize();
- std::mutex m_accessMutex;
- std::mutex m_consoleMutex;
- bool m_bThreadDone;
- std::thread::id m_threadId;
- std::condition_variable m_notifier;
- };
- ThreadPool::ThreadPool( int nMaxThreads ) : m_nMaxThreads( nMaxThreads ),
- m_bThreadDone( false )
- {
- }
- ThreadPool::~ThreadPool()
- {
- finalize();
- }
- void ThreadPool::finalize()
- {
- m_tasks.clear();
- for( auto& i : m_threads )
- {
- i.join();
- }
- m_threads.clear();
- }
- void ThreadPool::execute()
- {
- bool bWait = !m_threads.empty() || !m_tasks.empty();
- while( bWait )
- {
- if( !m_tasks.empty() )
- {
- int nMin = m_nMaxThreads - m_threads.size() < m_tasks.size() ? m_nMaxThreads - m_threads.size() : (int)( m_tasks.size() );
- for( int i = 0; i < nMin; ++i )
- {
- runTask( m_tasks[ 0 ] );
- m_tasks.erase( m_tasks.begin() );
- }
- }
- if( !m_threads.empty() )
- {
- std::unique_lock<std::mutex> lk( m_accessMutex );
- m_notifier.wait( lk, [this]{return m_bThreadDone == true; } );
- m_bThreadDone = false;
- if( !finalizeThread( m_threadId ) )
- {
- throw "Can not finalize thread";
- }
- lk.unlock();
- }
- bWait = !m_threads.empty() || !m_tasks.empty();
- }
- finalize();
- }
- void ThreadPool::addTask( Task* pTask )
- {
- m_tasks.push_back( pTask );
- }
- void ThreadPool::runTask( Task* pTask )
- {
- m_threads.push_back( std::thread( ThreadPool::exec, this, pTask ) );
- }
- void ThreadPool::exec( ThreadPool* pPool, Task* pTask )
- {
- {
- std::lock_guard< std::mutex > g( pPool->m_consoleMutex );
- std::cout << "Execute task in thread " << std::this_thread::get_id() << std::endl;
- }
- clock_t t1 = clock();
- pTask->execute();
- clock_t t2 = clock();
- {
- std::lock_guard< std::mutex > g( pPool->m_consoleMutex );
- std::cout << "Execute task in thread " << std::this_thread::get_id() << " DONE is " << t2 - t1 << " msecs\n";
- }
- pPool->m_accessMutex.lock();
- pPool->m_bThreadDone = true;
- pPool->m_threadId = std::this_thread::get_id();
- pPool->m_notifier.notify_one();
- pPool->m_accessMutex.unlock();
- }
- bool ThreadPool::finalizeThread( const std::thread::id& threadId )
- {
- for( auto tr = m_threads.begin(); tr != m_threads.end(); ++tr )
- {
- if( tr->get_id() == threadId )
- {
- tr->join();
- m_threads.erase( tr );
- return true;
- }
- }
- return false;
- }
- class WaitTask : public Task
- {
- protected:
- int m_waitSec;
- public:
- WaitTask( int sec ) : m_waitSec( sec ) {};
- virtual void execute()
- {
- std::this_thread::sleep_for( std::chrono::seconds( m_waitSec ) );
- }
- };
- int main()
- {
- ThreadPool tp( 3 );
- WaitTask tasks[] = { WaitTask( 5 ), WaitTask( 1 ), WaitTask( 2 ), WaitTask( 3 ), WaitTask( 1 ) };
- clock_t t1 = clock();
- for( int i = 0; i < 5; ++i )
- {
- tp.addTask( tasks + i );
- }
- tp.execute();
- clock_t t2 = clock();
- std::cout << "Done by " << t2 - t1 << " msecs\n";
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment