Advertisement
Guest User

Untitled

a guest
May 30th, 2017
225
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 4.14 KB | None | 0 0
  1. #include <string>
  2. #include <iostream>
  3. #include <exception>
  4. #include <thread>
  5. #include <vector>
  6. #include <zmq.hpp>
  7. #include <stdio.h>
  8. #include <assert.h>
  9. #include <cstring>
  10. #include <algorithm>
  11. #include <cerrno>
  12. #include <chrono>
  13.  
  14. #define SYNC_MSG "sync"
  15. #define SYNC_MSGLEN strlen((SYNC_MSG))
  16.  
  17. using namespace std;
  18.  
  19. bool has_only_digits(const string s)
  20. {
  21. return s.find_first_not_of( "0123456789" ) == string::npos;
  22. }
  23.  
  24. // The function we want to execute on the new thread.
  25. void task1(void * arg, int id)
  26. {
  27. bool sync_done = false;
  28.  
  29. zmq::context_t * context = (zmq::context_t *)arg;
  30.  
  31. zmq::socket_t gettask_socket(*context, ZMQ_PULL);
  32. gettask_socket.connect("inproc://task_publisher");
  33.  
  34. zmq::socket_t sendres_socket(*context, ZMQ_PUSH);
  35. sendres_socket.connect("inproc://collect_res");
  36.  
  37. zmq::message_t sync_msg;
  38. gettask_socket.recv(&sync_msg); // block waiting for sync msg
  39. cout << "thread " << id << " receives: " << (char *)sync_msg.data() << endl;
  40.  
  41. if(strlen((char *)sync_msg.data()) == SYNC_MSGLEN)
  42. {
  43. zmq::message_t msg_back((void *)to_string(id).c_str(), to_string(id).size() + 1, NULL);
  44. cout << "thread " << id << " sends: " << (char *)to_string(id).c_str() << endl;
  45.  
  46. sendres_socket.send(msg_back);
  47. sync_done = true;
  48. }
  49.  
  50. cout << "thread " << id << " sync done" << endl;
  51.  
  52. // zmq::message_t msg(6);
  53. // memcpy((void *)msg.data(), "Hello", 6);
  54. // sendres_socket.send(msg);
  55.  
  56. // zmq::message_t job;
  57. // gettask_socket.recv(&job);
  58.  
  59. // cout << "in worker: " << (char *)job.data() << endl;
  60.  
  61. // cout << "task1 says: Hello " << endl;
  62. }
  63.  
  64. bool if_sync_done(int threadID, bool * sync_array, int worker_num)
  65. {
  66. cout << "main thread receives sync msg from thread " << threadID << endl;
  67.  
  68. // for(int i = 0; i < worker_num; i++)
  69. // cout << "before " << boolalpha << sync_array[i] << " " << endl;
  70.  
  71. if(!sync_array[threadID])
  72. sync_array[threadID] = true;
  73.  
  74. // for(int i = 0; i < worker_num; i++)
  75. // cout << "after " << boolalpha << sync_array[i] << " " << endl;
  76.  
  77. bool sync_done = true;
  78. for(int i = 0; i < worker_num; i++)
  79. sync_done = sync_array[i] && sync_done;
  80.  
  81. return sync_done;
  82. }
  83.  
  84. int main(int argc, char* argv[])
  85. {
  86. int worker_num;
  87.  
  88. if(argc != 2)
  89. {
  90. cout << "1 parameter pls" << endl;
  91. exit(1);
  92. }
  93.  
  94. try
  95. {
  96. if(!has_only_digits(string(argv[1])))
  97. {
  98. cout << "digit pls" << endl;
  99. exit(1);
  100. }
  101. worker_num = stoi(string(argv[1]));
  102. }
  103. catch(exception e)
  104. {
  105. cout << "exception while processing parameters" << endl;
  106. exit(1);
  107. }
  108.  
  109. zmq::context_t context(0);
  110. zmq::socket_t distask_socket(context, ZMQ_PUSH);
  111. distask_socket.bind("inproc://task_publisher");
  112.  
  113. zmq::socket_t getres_socket(context, ZMQ_PULL);
  114. getres_socket.bind("inproc://collect_res");
  115.  
  116. vector<thread> pool;
  117. for(int i = 0; i < worker_num; i++)
  118. {
  119. cout << "main() : creating thread, " << i << endl;
  120. pool.push_back(thread(task1, (void *)&context, i));
  121. }
  122.  
  123. bool sync_done = false;
  124. bool * sync_array = new bool[worker_num];
  125. fill_n(sync_array, worker_num, false);
  126.  
  127. while(true)
  128. {
  129. if(sync_done)
  130. {
  131. cout << "sync done in main thread" << endl;
  132. break;
  133. }
  134.  
  135. zmq::message_t sync_msg(4);
  136. memcpy((void *)sync_msg.data(), SYNC_MSG, SYNC_MSGLEN);
  137. for(int i = 0; i < worker_num; i++)
  138. distask_socket.send(sync_msg);
  139.  
  140. for(int i = 0; i < worker_num; i++)
  141. {
  142. if(sync_done)
  143. break;
  144. if(i != 0)
  145. this_thread::sleep_for(chrono::milliseconds(500));
  146.  
  147. zmq::message_t res_msg;
  148. int ret = getres_socket.recv(&res_msg, ZMQ_DONTWAIT);
  149.  
  150. if(ret == -1 && errno == EAGAIN)
  151. continue;
  152.  
  153. int threadID = stoi(string((char *)res_msg.data()));
  154. sync_done = if_sync_done(threadID, sync_array, worker_num);
  155. }
  156. }
  157.  
  158.  
  159. assert(sync_done);
  160.  
  161. // for(int i = 0; i < worker_num; i++)
  162. // {
  163. // zmq::message_t msg(6);
  164. // memcpy((void *) msg.data (), "World", 6);
  165. // distask_socket.send(msg);
  166. // }
  167.  
  168.  
  169.  
  170. for(auto &t : pool)
  171. t.join();
  172.  
  173. delete[] sync_array;
  174.  
  175. exit(0);
  176. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement