Guest User

Untitled

a guest
May 31st, 2017
109
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #include <string>
  2. #include <iostream>
  3. #include <exception>
  4. #include <thread>
  5. #include <vector>
  6. #include <zmq.hpp>
  7. #include <stdio.h>
  8. #include <assert.h>
  9. #include <cstring>
  10. #include <algorithm>
  11. #include <cerrno>
  12. #include <chrono>
  13.  
  14. #define SYNC_MSG "sync"
  15. #define SYNC_MSGLEN strlen((SYNC_MSG))
  16.  
  17. using namespace std;
  18.  
  19. bool has_only_digits(const string s)
  20. {
  21. return s.find_first_not_of( "0123456789" ) == string::npos;
  22. }
  23.  
  24. // The function we want to execute on the new thread.
  25. void task1(void * arg, int id)
  26. {
  27. bool sync_done = false;
  28.  
  29. zmq::context_t * context = (zmq::context_t *)arg;
  30.  
  31. zmq::socket_t gettask_socket(*context, ZMQ_PULL);
  32. gettask_socket.connect("inproc://task_publisher");
  33.  
  34. zmq::socket_t sendres_socket(*context, ZMQ_PUSH);
  35. sendres_socket.connect("inproc://collect_res");
  36.  
  37. zmq::message_t sync_msg;
  38. gettask_socket.recv(&sync_msg); // block waiting for sync msg
  39. cout << "thread " << id << " receives: " << (char *)sync_msg.data() << endl;
  40.  
  41. if(strlen((char *)sync_msg.data()) == SYNC_MSGLEN)
  42. {
  43. const char * stringID = to_string(id).c_str();
  44. cout << "to_string " << to_string(id) << endl;
  45. zmq::message_t msg_back((void *)stringID, to_string(id).size(), NULL);
  46. cout << "thread " << id << " sends: " << stringID << ", with size: " << to_string(id).size() << endl;
  47.  
  48. sendres_socket.send(msg_back);
  49. sync_done = true;
  50. }
  51.  
  52. cout << "thread " << id << " sync done" << endl;
  53.  
  54. // zmq::message_t msg(6);
  55. // memcpy((void *)msg.data(), "Hello", 6);
  56. // sendres_socket.send(msg);
  57.  
  58. // zmq::message_t job;
  59. // gettask_socket.recv(&job);
  60.  
  61. // cout << "in worker: " << (char *)job.data() << endl;
  62.  
  63. // cout << "task1 says: Hello " << endl;
  64. }
  65.  
  66. bool if_sync_done(int threadID, bool * sync_array, int worker_num)
  67. {
  68. cout << "main thread receives sync msg from thread " << threadID << endl;
  69.  
  70. // for(int i = 0; i < worker_num; i++)
  71. // cout << "before " << boolalpha << sync_array[i] << " " << endl;
  72.  
  73. if(!sync_array[threadID])
  74. sync_array[threadID] = true;
  75.  
  76. // for(int i = 0; i < worker_num; i++)
  77. // cout << "after " << boolalpha << sync_array[i] << " " << endl;
  78.  
  79. bool sync_done = true;
  80. for(int i = 0; i < worker_num; i++)
  81. sync_done = sync_array[i] && sync_done;
  82.  
  83. return sync_done;
  84. }
  85.  
  86. int main(int argc, char* argv[])
  87. {
  88. int worker_num;
  89.  
  90. if(argc != 2)
  91. {
  92. cout << "1 parameter pls" << endl;
  93. exit(1);
  94. }
  95.  
  96. try
  97. {
  98. if(!has_only_digits(string(argv[1])))
  99. {
  100. cout << "digit pls" << endl;
  101. exit(1);
  102. }
  103. worker_num = stoi(string(argv[1]));
  104. cout << "input parameter is: " << worker_num << endl;
  105. }
  106. catch(exception e)
  107. {
  108. cout << "exception while processing parameters" << endl;
  109. exit(1);
  110. }
  111.  
  112. zmq::context_t context(0);
  113. zmq::socket_t distask_socket(context, ZMQ_PUSH);
  114. distask_socket.bind("inproc://task_publisher");
  115.  
  116. zmq::socket_t getres_socket(context, ZMQ_PULL);
  117. getres_socket.bind("inproc://collect_res");
  118.  
  119. vector<thread> pool;
  120. for(int i = 0; i < worker_num; i++)
  121. {
  122. cout << "main() : creating thread, " << i << endl;
  123. pool.push_back(thread(task1, (void *)&context, i));
  124. }
  125.  
  126. bool sync_done = false;
  127. bool * sync_array = new bool[worker_num];
  128. fill_n(sync_array, worker_num, false);
  129.  
  130. zmq::pollitem_t items[] =
  131. {
  132. { (void *)distask_socket, 0, ZMQ_POLLOUT, 0 },
  133. { (void *)getres_socket, 0, ZMQ_POLLIN, 0 }
  134. };
  135.  
  136. while(true)
  137. {
  138. if(sync_done)
  139. {
  140. cout << "sync done in main thread" << endl;
  141. break;
  142. }
  143.  
  144. zmq::message_t sync_msg(4);
  145. zmq::message_t res_msg;
  146. memcpy((void *)sync_msg.data(), SYNC_MSG, SYNC_MSGLEN);
  147. zmq::poll(&items[0], 2, -1);
  148.  
  149. if (items[0].revents & ZMQ_POLLOUT)
  150. {
  151. distask_socket.send(sync_msg, ZMQ_DONTWAIT);
  152. }
  153. if (items[1].revents & ZMQ_POLLIN)
  154. {
  155. getres_socket.recv(&res_msg);
  156. cout << "pass " << (char *)res_msg.data() << " to if_sync_done" << endl;
  157. int threadID = stoi(string((char *)res_msg.data()));
  158. sync_done = if_sync_done(threadID, sync_array, worker_num);
  159. }
  160. }
  161.  
  162. // while(true)
  163. // {
  164. // if(sync_done)
  165. // {
  166. // cout << "sync done in main thread" << endl;
  167. // break;
  168. // }
  169.  
  170. // zmq::message_t sync_msg(4);
  171. // memcpy((void *)sync_msg.data(), SYNC_MSG, SYNC_MSGLEN);
  172. // for(int i = 0; i < worker_num; i++)
  173. // distask_socket.send(sync_msg);
  174.  
  175. // for(int i = 0; i < worker_num; i++)
  176. // {
  177. // if(sync_done)
  178. // break;
  179. // if(i != 0)
  180. // this_thread::sleep_for(chrono::milliseconds(500));
  181.  
  182. // zmq::message_t res_msg;
  183. // int ret = getres_socket.recv(&res_msg);
  184.  
  185. // // if(ret == -1 && errno == EAGAIN)
  186. // // continue;
  187.  
  188. // int threadID = stoi(string((char *)res_msg.data()));
  189. // cout << "pass " << (char *)res_msg.data() << " to if_sync_done" << endl;
  190. // sync_done = if_sync_done(threadID, sync_array, worker_num);
  191. // }
  192. // }
  193.  
  194.  
  195. assert(sync_done);
  196.  
  197. // for(int i = 0; i < worker_num; i++)
  198. // {
  199. // zmq::message_t msg(6);
  200. // memcpy((void *) msg.data (), "World", 6);
  201. // distask_socket.send(msg);
  202. // }
  203.  
  204.  
  205.  
  206. for(auto &t : pool)
  207. t.join();
  208.  
  209. delete[] sync_array;
  210.  
  211. exit(0);
  212. }
RAW Paste Data