Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "JobMgr.h"
- #include "Job.h"
- CJobMgr *g_pJobMgr = NULL;
- typedef CUtlMap<JobType_t, CCouchJob *> CUtlJobMap;
- // stupid hack to ensure the static job map is initialized first
- CUtlJobMap &JobMap()
- {
- static CUtlJobMap jobMap( DefLessFunc( JobType_t ) );
- return jobMap;
- }
- CJobMgr::CJobMgr()
- {
- }
- CJobMgr::~CJobMgr()
- {
- }
- void CJobMgr::Register( JobType_t eType, CCouchJob *pJob )
- {
- CUtlJobMap::IndexType_t idx = JobMap().Find( eType );
- if ( JobMap().IsValidIndex( idx ) )
- {
- Msg( "[CouchDB] Attempting to register job type that already exists: %u\n", eType );
- return;
- }
- JobMap().Insert( eType, pJob );
- }
- bool CJobMgr::StartWorkers( uint32 numWorkers )
- {
- if ( m_WorkerThreads.Count() > 0 )
- {
- // don't start workers more than once
- return false;
- }
- Msg( "[CouchDB] Starting %u worker threads.\n", numWorkers );
- for ( uint32 x = 0; x < numWorkers ; ++x )
- {
- CCouchWorker *pWorker = new CCouchWorker;
- m_WorkerThreads.AddToTail( pWorker );
- pWorker->Start();
- }
- return true;
- }
- void CJobMgr::StopWorkers()
- {
- uint32 numWorkers = m_WorkerThreads.Count();
- // don't need to do anything if we never started our workers
- if ( numWorkers == 0 )
- {
- return;
- }
- // start a shutdown job for every worker we have
- FOR_EACH_VEC( m_WorkerThreads, x )
- {
- this->StartJob( NULL );
- }
- // join our worker threads and cleanup
- FOR_EACH_VEC( m_WorkerThreads, x )
- {
- CCouchWorker *pWorker = m_WorkerThreads[ x ];
- pWorker->Join();
- delete pWorker;
- }
- m_WorkerThreads.RemoveAll();
- Msg( "[CouchDB] Stopped %u workers.\n", numWorkers );
- }
- void CJobMgr::StartJob( CouchJobInfo_t *pJob )
- {
- Assert( m_WorkerThreads.Count() > 0 );
- m_PendingJobs.QueueMessage( pJob );
- }
- void CJobMgr::FinishJob( CouchJobInfo_t *pJob )
- {
- m_CompletedJobs.QueueMessage( pJob );
- }
- bool CJobMgr::RunJob( CouchJobInfo_t *pJob )
- {
- CUtlJobMap::IndexType_t idx = JobMap().Find( pJob->m_Type );
- if ( !JobMap().IsValidIndex( idx ) )
- {
- Msg( "[CouchDB] CJobMgr was told to run a job type it doesn't know about: %u\n", pJob->m_Type );
- return false;
- }
- return JobMap()[ idx ]->Run( pJob );
- }
- int CCouchWorker::Run()
- {
- for ( ; ; )
- {
- CouchJobInfo_t *pJob = NULL;
- // wait for a job
- GJobMgr().GetPendingJob( &pJob );
- if ( pJob == NULL )
- {
- // shutdown signal
- break;
- }
- if ( !GJobMgr().RunJob( pJob ) )
- {
- continue;
- }
- // notify completion
- // this adds the job to the completed queue
- GJobMgr().FinishJob( pJob );
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement