Advertisement
Guest User

Untitled

a guest
Sep 19th, 2017
53
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 2.56 KB | None | 0 0
  1.  
  2. #include "JobMgr.h"
  3.  
  4. #include "Job.h"
  5.  
  6.  
  7. CJobMgr *g_pJobMgr = NULL;
  8.  
  9. typedef CUtlMap<JobType_t, CCouchJob *> CUtlJobMap;
  10.  
  11.  
  12. // stupid hack to ensure the static job map is initialized first
  13. CUtlJobMap &JobMap()
  14. {
  15.     static CUtlJobMap jobMap( DefLessFunc( JobType_t ) );
  16.     return jobMap;
  17. }
  18.  
  19.  
  20. CJobMgr::CJobMgr()
  21. {
  22. }
  23.  
  24. CJobMgr::~CJobMgr()
  25. {
  26. }
  27.  
  28.  
  29. void CJobMgr::Register( JobType_t eType, CCouchJob *pJob )
  30. {
  31.     CUtlJobMap::IndexType_t idx = JobMap().Find( eType );
  32.  
  33.     if ( JobMap().IsValidIndex( idx ) )
  34.     {
  35.         Msg( "[CouchDB] Attempting to register job type that already exists: %u\n", eType );
  36.         return;
  37.     }
  38.  
  39.     JobMap().Insert( eType, pJob );
  40. }
  41.  
  42. bool CJobMgr::StartWorkers( uint32 numWorkers )
  43. {
  44.  
  45.     if ( m_WorkerThreads.Count() > 0 )
  46.     {
  47.         // don't start workers more than once
  48.         return false;
  49.     }
  50.  
  51.     Msg( "[CouchDB] Starting %u worker threads.\n", numWorkers );
  52.  
  53.     for ( uint32 x = 0; x < numWorkers ; ++x )
  54.     {
  55.         CCouchWorker *pWorker = new CCouchWorker;
  56.  
  57.         m_WorkerThreads.AddToTail( pWorker );
  58.  
  59.         pWorker->Start();
  60.     }
  61.  
  62.     return true;
  63. }
  64.  
  65. void CJobMgr::StopWorkers()
  66. {
  67.  
  68.     uint32 numWorkers = m_WorkerThreads.Count();
  69.  
  70.     // don't need to do anything if we never started our workers
  71.     if ( numWorkers == 0 )
  72.     {
  73.         return;
  74.     }
  75.  
  76.     // start a shutdown job for every worker we have
  77.     FOR_EACH_VEC( m_WorkerThreads, x )
  78.     {
  79.         this->StartJob( NULL );
  80.     }
  81.  
  82.     // join our worker threads and cleanup
  83.     FOR_EACH_VEC( m_WorkerThreads, x )
  84.     {
  85.         CCouchWorker *pWorker = m_WorkerThreads[ x ];
  86.  
  87.         pWorker->Join();
  88.  
  89.         delete pWorker;
  90.     }
  91.  
  92.     m_WorkerThreads.RemoveAll();
  93.  
  94.     Msg( "[CouchDB] Stopped %u workers.\n", numWorkers );
  95. }
  96.  
  97. void CJobMgr::StartJob( CouchJobInfo_t *pJob )
  98. {
  99.     Assert( m_WorkerThreads.Count() > 0 );
  100.  
  101.     m_PendingJobs.QueueMessage( pJob );
  102. }
  103.  
  104. void CJobMgr::FinishJob( CouchJobInfo_t *pJob )
  105. {
  106.     m_CompletedJobs.QueueMessage( pJob );
  107. }
  108.  
  109. bool CJobMgr::RunJob( CouchJobInfo_t *pJob )
  110. {
  111.     CUtlJobMap::IndexType_t idx = JobMap().Find( pJob->m_Type );
  112.  
  113.     if ( !JobMap().IsValidIndex( idx ) )
  114.     {
  115.         Msg( "[CouchDB] CJobMgr was told to run a job type it doesn't know about: %u\n", pJob->m_Type );
  116.         return false;
  117.     }
  118.  
  119.     return JobMap()[ idx ]->Run( pJob );
  120. }
  121.  
  122.  
  123. int CCouchWorker::Run()
  124. {
  125.     for ( ; ; )
  126.     {
  127.         CouchJobInfo_t *pJob = NULL;
  128.  
  129.         // wait for a job
  130.         GJobMgr().GetPendingJob( &pJob );
  131.  
  132.         if ( pJob == NULL )
  133.         {
  134.             // shutdown signal
  135.             break;
  136.         }
  137.  
  138.         if ( !GJobMgr().RunJob( pJob ) )
  139.         {
  140.             continue;
  141.         }
  142.  
  143.         // notify completion
  144.         // this adds the job to the completed queue
  145.         GJobMgr().FinishJob( pJob );
  146.     }
  147.  
  148.     return 0;
  149. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement