Advertisement
Guest User

Untitled

a guest
May 4th, 2016
53
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.46 KB | None | 0 0
  1. (0 0 1 0) // (elements in queue 1, in queue 2, in queue 3, in queue 4)
  2. (1 0 1 0)
  3. (1 1 1 0)
  4. (0 0 0 0)
  5. (0 0 0 0)
  6. (0 0 0 0)
  7. (1 0 0 0)
  8. (2 0 0 0)
  9. (2 1 0 0)
  10. (1 1 0 1)
  11. (1 0 0 1)
  12. (1 0 0 0)
  13. (1 0 0 0)
  14. (0 0 0 0)
  15. (1 0 0 0)
  16. ...CTRL+c
  17.  
  18. #include <iostream>
  19. #include <queue>
  20. #include <mutex>
  21. #include <condition_variable>
  22. #include <thread>
  23. #include <random>
  24.  
  25. using namespace std;
  26.  
  27. // modify this to modify the number of consumer threads
  28. #define WORKERS_THREADS 4
  29. // max size of each of four queues
  30. #define MAX_QUEUE_SIZE 100
  31. // debug
  32. #define DEFAULTCOLOR "33[0m"
  33. #define RED "33[22;31m"
  34. #define YELLOW "33[1;33m"
  35. #define GREEN "33[0;0;32m"
  36.  
  37. class MultiQueue {
  38. public:
  39. void initThreadPool(void);
  40. void insert(int num);
  41. void remove(void);
  42. void insertPriorityQueue(int num);
  43. int removePriorityQueue(void);
  44. void printQueues(string what);
  45. int getQueue1Size(void);
  46. int getQueue2Size(void);
  47. int getQueue3Size(void);
  48. int getQueue4Size(void);
  49. int getPrioQueueSize(void);
  50. private:
  51. vector<thread> workers;
  52.  
  53. queue<int>q1;
  54. queue<int>q2;
  55. queue<int>q3;
  56. queue<int>q4;
  57.  
  58. priority_queue<int, vector<int>, greater<int>> prioq;
  59.  
  60. // mutex for push/pop in priority queue
  61. mutex priority_queue_mutex;
  62. // 4 mutexes for each queue
  63. mutex m1, m2, m3, m4;
  64. // mutex for printing 4 queues size
  65. mutex print;
  66.  
  67. // mutex for push/pop to priority_queue
  68. condition_variable prioq_cond;
  69. // 4 conds for consumer threads
  70. condition_variable w1, w2, w3, w4;
  71. };
  72.  
  73. int MultiQueue::getQueue1Size() { return q1.size(); }
  74.  
  75. int MultiQueue::getQueue2Size() { return q2.size(); }
  76.  
  77. int MultiQueue::getQueue3Size() { return q3.size(); }
  78.  
  79. int MultiQueue::getQueue4Size() { return q4.size(); }
  80.  
  81. int MultiQueue::getPrioQueueSize() { return prioq.size(); }
  82.  
  83. void MultiQueue::initThreadPool(void) {
  84. for (int i=0; i<WORKERS_THREADS; i++) {
  85. workers.push_back(thread(&MultiQueue::remove, this));
  86. workers[i].detach();
  87. }
  88. }
  89.  
  90. void MultiQueue::printQueues(string what) {
  91. lock_guard<mutex> l(print);
  92. if (what == "insert")
  93. cout << GREEN << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << 'n' << flush;
  94. else
  95. cout << YELLOW << '(' << getQueue1Size() << ' ' << getQueue2Size() << ' ' << getQueue3Size() << ' ' << getQueue4Size() << ')' << DEFAULTCOLOR << 'n' << flush;
  96. }
  97.  
  98. // called from producer thread to tell consumer threads
  99. // what queues to pop() from
  100. void MultiQueue::insertPriorityQueue(int num) {
  101. lock_guard<mutex> prio(priority_queue_mutex);
  102. prioq.push(num);
  103. prioq_cond.notify_one();
  104. }
  105.  
  106. // called from consumer threads to see what queues
  107. // have elements to pop() from
  108. int MultiQueue::removePriorityQueue(void) {
  109. int ret = 0;
  110. unique_lock<mutex> prio(priority_queue_mutex);
  111. prioq_cond.wait(prio, [this] () { return getPrioQueueSize() > 0; });
  112. ret = prioq.top();
  113. prioq.pop();
  114. return ret;
  115. }
  116.  
  117. // producer thread
  118. void MultiQueue::insert(int num) {
  119. switch(num) {
  120. case 1: {
  121. unique_lock<mutex> locker(m1);
  122. w1.wait(locker, [this] () { return getQueue1Size() < MAX_QUEUE_SIZE; });
  123. q1.push(num);
  124. break;
  125. }
  126. case 2: {
  127. unique_lock<mutex> locker(m2);
  128. w2.wait(locker, [this] () { return getQueue2Size() < MAX_QUEUE_SIZE; });
  129. q2.push(num);
  130. break;
  131. }
  132. case 3: {
  133. unique_lock<mutex> locker(m3);
  134. w3.wait(locker, [this] () { return getQueue3Size() < MAX_QUEUE_SIZE; });
  135. q3.push(num);
  136. break;
  137. }
  138. case 4: {
  139. unique_lock<mutex> locker(m4);
  140. w4.wait(locker, [this] () { return getQueue4Size() < MAX_QUEUE_SIZE; });
  141. q4.push(num);
  142. break;
  143. }
  144. default: {
  145. cout << "number not 1, 2, 3 nor 4: " << num << 'n' << flush;
  146. break;
  147. }
  148. }
  149. printQueues("insert");
  150. insertPriorityQueue(num);
  151. }
  152.  
  153. void MultiQueue::remove(void) {
  154. int which_queue = 0;
  155. while (true) {
  156. which_queue = removePriorityQueue();
  157. switch (which_queue) {
  158. case 1: {
  159. lock_guard<mutex> lock(m1);
  160. int ret = q1.front();
  161. q1.pop();
  162. printQueues("remove");
  163. break;
  164. }
  165. case 2: {
  166. lock_guard<mutex> lock(m2);
  167. int ret = q2.front();
  168. q2.pop();
  169. printQueues("remove");
  170. break;
  171. }
  172. case 3: {
  173. lock_guard<mutex> lock(m3);
  174. int ret = q3.front();
  175. q3.pop();
  176. printQueues("remove");
  177. break;
  178. }
  179. case 4: {
  180. lock_guard<mutex> lock(m4);
  181. int ret = q4.front();
  182. q4.pop();
  183. printQueues("remove");
  184. break;
  185. }
  186. default: {
  187. break;
  188. }
  189. }
  190. }
  191. }
  192.  
  193. int main(void) {
  194. int random_num = 0;
  195.  
  196. MultiQueue mq;
  197. mq.initThreadPool();
  198.  
  199. default_random_engine eng((random_device())());
  200. uniform_int_distribution<int> idis(1, 4);
  201. while (true) {
  202. random_num = idis(eng);
  203. mq.insert(random_num);
  204. }
  205.  
  206. return 0;
  207. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement