wAngel

pthread mutex problem

Nov 3rd, 2011
243
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 3.54 KB | None | 0 0
  1. #include <iostream>
  2. #include <queue>
  3. #include <map>
  4. #include <pthread.h>
  5.  
  6. using namespace std;
  7.  
  8. #define COUNT_WORKERS 10
  9. #define MAX_QUEUE_SIZE 100
  10.  
  11. class Server
  12. {
  13. private:
  14.     std::queue<string> _queryQueue;
  15.     pthread_mutex_t _queryQueueMutex;
  16.  
  17.     pthread_t _workerThreads[COUNT_WORKERS];
  18.     pthread_mutex_t _workerSuspendMutex;
  19.     pthread_cond_t _workerResumeCondition;
  20.  
  21.     pthread_t _thread;
  22.  
  23.     Server(Server const&);
  24.     void operator=(Server const&);
  25.  
  26. public:
  27.     Server();
  28.  
  29.     bool start();
  30.  
  31.     static void* threadFunc(void* ptr);
  32.     static void* threadWorkerFunc(void* ptr);
  33. };
  34. //-----------------------------------------------------------------------------
  35.  
  36. Server::Server()
  37. {
  38.     if (pthread_mutex_init(&_queryQueueMutex, NULL))
  39.         throw "err";
  40.  
  41.     if (pthread_mutex_init(&_workerSuspendMutex, NULL))
  42.         throw "err";
  43.  
  44. } // End
  45. //-----------------------------------------------------------------------------
  46.  
  47. bool Server::start()
  48. {
  49.     if (pthread_create(&_thread, NULL, threadFunc, this))
  50.         return false;
  51.  
  52.     // Worker threads.
  53.     for (int i = 0; i < COUNT_WORKERS; i++)
  54.         if (pthread_create(&_workerThreads[i], NULL, threadWorkerFunc, this))
  55.             return false;
  56.  
  57.     pthread_join(_thread, NULL);
  58.     return true;
  59.  
  60. } // End
  61. //-----------------------------------------------------------------------------
  62.  
  63. void* Server::threadFunc(void* ptr)
  64. {
  65.     cout << "Start thread" << endl;
  66.  
  67.     Server* server = (Server*)ptr;
  68.  
  69.     while (1)
  70.     {
  71.         cout << "Request" << endl;
  72.  
  73.         sleep(1);
  74.  
  75.         pthread_mutex_lock(&server->_queryQueueMutex);
  76.         if (server->_queryQueue.size() < MAX_QUEUE_SIZE)
  77.         {
  78.             server->_queryQueue.push("text");
  79.             pthread_mutex_unlock(&server->_queryQueueMutex);
  80.  
  81.             // Wake up all worker threads.
  82.             pthread_cond_broadcast(&server->_workerResumeCondition);
  83.             continue;
  84.         }
  85.  
  86.         pthread_mutex_unlock(&server->_queryQueueMutex);
  87.     }
  88.  
  89.     return NULL;
  90.  
  91. } // End
  92. //-----------------------------------------------------------------------------
  93.  
  94. void* Server::threadWorkerFunc(void* ptr)
  95. {
  96.     Server* server = (Server*)ptr;
  97.  
  98.     while (true)
  99.     {
  100.         // Lock queue mutex.
  101.         pthread_mutex_lock(&server->_queryQueueMutex);
  102.  
  103.         // If queue is empty, waiting new requests.
  104.         if (server->_queryQueue.empty())
  105.         {
  106.             // Unlock queue mutex.
  107.             pthread_mutex_unlock(&server->_queryQueueMutex);
  108.  
  109.             // Suspend current worker thread.
  110.             pthread_mutex_lock(&server->_workerSuspendMutex);
  111.             pthread_cond_wait(&server->_workerResumeCondition, &server->_workerSuspendMutex);
  112.  
  113.             cout << "Wake up" << endl;
  114.  
  115.             pthread_mutex_unlock(&server->_workerSuspendMutex);
  116.             continue;
  117.         }
  118.  
  119.         // Load request from queue.
  120.         string request = server->_queryQueue.front();
  121.         server->_queryQueue.pop();
  122.         pthread_mutex_unlock(&server->_queryQueueMutex);
  123.  
  124.     } // End while
  125.  
  126.     return NULL;
  127.  
  128. } // End
  129. //-----------------------------------------------------------------------------
  130.  
  131. int main()
  132. {
  133.     try
  134.     {
  135. //        Server srv1;
  136. //        if (!srv1.start())
  137. //            return 1;
  138.  
  139.         Server* srv2 = new Server();
  140.         if (!srv2->start())
  141.             return 1;
  142.     }
  143.     catch (...)
  144.     {
  145.         return 1;
  146.     }
  147.  
  148.     return 0;
  149.  
  150. } // End
  151. //-----------------------------------------------------------------------------
  152.  
  153.  
Add Comment
Please, Sign In to add comment