Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <iostream>
- #include <queue>
- #include <map>
- #include <pthread.h>
- using namespace std;
- #define COUNT_WORKERS 10
- #define MAX_QUEUE_SIZE 100
- class Server
- {
- private:
- std::queue<string> _queryQueue;
- pthread_mutex_t _queryQueueMutex;
- pthread_t _workerThreads[COUNT_WORKERS];
- pthread_mutex_t _workerSuspendMutex;
- pthread_cond_t _workerResumeCondition;
- pthread_t _thread;
- Server(Server const&);
- void operator=(Server const&);
- public:
- Server();
- bool start();
- static void* threadFunc(void* ptr);
- static void* threadWorkerFunc(void* ptr);
- };
- //-----------------------------------------------------------------------------
- Server::Server()
- {
- if (pthread_mutex_init(&_queryQueueMutex, NULL))
- throw "err";
- if (pthread_mutex_init(&_workerSuspendMutex, NULL))
- throw "err";
- } // End
- //-----------------------------------------------------------------------------
- bool Server::start()
- {
- if (pthread_create(&_thread, NULL, threadFunc, this))
- return false;
- // Worker threads.
- for (int i = 0; i < COUNT_WORKERS; i++)
- if (pthread_create(&_workerThreads[i], NULL, threadWorkerFunc, this))
- return false;
- pthread_join(_thread, NULL);
- return true;
- } // End
- //-----------------------------------------------------------------------------
- void* Server::threadFunc(void* ptr)
- {
- cout << "Start thread" << endl;
- Server* server = (Server*)ptr;
- while (1)
- {
- cout << "Request" << endl;
- sleep(1);
- pthread_mutex_lock(&server->_queryQueueMutex);
- if (server->_queryQueue.size() < MAX_QUEUE_SIZE)
- {
- server->_queryQueue.push("text");
- pthread_mutex_unlock(&server->_queryQueueMutex);
- // Wake up all worker threads.
- pthread_cond_broadcast(&server->_workerResumeCondition);
- continue;
- }
- pthread_mutex_unlock(&server->_queryQueueMutex);
- }
- return NULL;
- } // End
- //-----------------------------------------------------------------------------
- void* Server::threadWorkerFunc(void* ptr)
- {
- Server* server = (Server*)ptr;
- while (true)
- {
- // Lock queue mutex.
- pthread_mutex_lock(&server->_queryQueueMutex);
- // If queue is empty, waiting new requests.
- if (server->_queryQueue.empty())
- {
- // Unlock queue mutex.
- pthread_mutex_unlock(&server->_queryQueueMutex);
- // Suspend current worker thread.
- pthread_mutex_lock(&server->_workerSuspendMutex);
- pthread_cond_wait(&server->_workerResumeCondition, &server->_workerSuspendMutex);
- cout << "Wake up" << endl;
- pthread_mutex_unlock(&server->_workerSuspendMutex);
- continue;
- }
- // Load request from queue.
- string request = server->_queryQueue.front();
- server->_queryQueue.pop();
- pthread_mutex_unlock(&server->_queryQueueMutex);
- } // End while
- return NULL;
- } // End
- //-----------------------------------------------------------------------------
- int main()
- {
- try
- {
- // Server srv1;
- // if (!srv1.start())
- // return 1;
- Server* srv2 = new Server();
- if (!srv2->start())
- return 1;
- }
- catch (...)
- {
- return 1;
- }
- return 0;
- } // End
- //-----------------------------------------------------------------------------
Add Comment
Please, Sign In to add comment