Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <string>
- #include <iostream>
- #include <exception>
- #include <thread>
- #include <vector>
- #include <zmq.hpp>
- #include <stdio.h>
- #include <assert.h>
- #include <cstring>
- #include <algorithm>
- #include <cerrno>
- #include <chrono>
- #define SYNC_MSG "sync"
- #define SYNC_MSGLEN strlen((SYNC_MSG))
- using namespace std;
- bool has_only_digits(const string s)
- {
- return s.find_first_not_of( "0123456789" ) == string::npos;
- }
- // The function we want to execute on the new thread.
- void task1(void * arg, int id)
- {
- bool sync_done = false;
- zmq::context_t * context = (zmq::context_t *)arg;
- zmq::socket_t gettask_socket(*context, ZMQ_PULL);
- gettask_socket.connect("inproc://task_publisher");
- zmq::socket_t sendres_socket(*context, ZMQ_PUSH);
- sendres_socket.connect("inproc://collect_res");
- zmq::message_t sync_msg;
- gettask_socket.recv(&sync_msg); // block waiting for sync msg
- cout << "thread " << id << " receives: " << (char *)sync_msg.data() << endl;
- if(strlen((char *)sync_msg.data()) == SYNC_MSGLEN)
- {
- const char * stringID = to_string(id).c_str();
- cout << "to_string " << to_string(id) << endl;
- zmq::message_t msg_back((void *)stringID, to_string(id).size(), NULL);
- cout << "thread " << id << " sends: " << stringID << ", with size: " << to_string(id).size() << endl;
- sendres_socket.send(msg_back);
- sync_done = true;
- }
- cout << "thread " << id << " sync done" << endl;
- // zmq::message_t msg(6);
- // memcpy((void *)msg.data(), "Hello", 6);
- // sendres_socket.send(msg);
- // zmq::message_t job;
- // gettask_socket.recv(&job);
- // cout << "in worker: " << (char *)job.data() << endl;
- // cout << "task1 says: Hello " << endl;
- }
- bool if_sync_done(int threadID, bool * sync_array, int worker_num)
- {
- cout << "main thread receives sync msg from thread " << threadID << endl;
- // for(int i = 0; i < worker_num; i++)
- // cout << "before " << boolalpha << sync_array[i] << " " << endl;
- if(!sync_array[threadID])
- sync_array[threadID] = true;
- // for(int i = 0; i < worker_num; i++)
- // cout << "after " << boolalpha << sync_array[i] << " " << endl;
- bool sync_done = true;
- for(int i = 0; i < worker_num; i++)
- sync_done = sync_array[i] && sync_done;
- return sync_done;
- }
- int main(int argc, char* argv[])
- {
- int worker_num;
- if(argc != 2)
- {
- cout << "1 parameter pls" << endl;
- exit(1);
- }
- try
- {
- if(!has_only_digits(string(argv[1])))
- {
- cout << "digit pls" << endl;
- exit(1);
- }
- worker_num = stoi(string(argv[1]));
- cout << "input parameter is: " << worker_num << endl;
- }
- catch(exception e)
- {
- cout << "exception while processing parameters" << endl;
- exit(1);
- }
- zmq::context_t context(0);
- zmq::socket_t distask_socket(context, ZMQ_PUSH);
- distask_socket.bind("inproc://task_publisher");
- zmq::socket_t getres_socket(context, ZMQ_PULL);
- getres_socket.bind("inproc://collect_res");
- vector<thread> pool;
- for(int i = 0; i < worker_num; i++)
- {
- cout << "main() : creating thread, " << i << endl;
- pool.push_back(thread(task1, (void *)&context, i));
- }
- bool sync_done = false;
- bool * sync_array = new bool[worker_num];
- fill_n(sync_array, worker_num, false);
- zmq::pollitem_t items[] =
- {
- { (void *)distask_socket, 0, ZMQ_POLLOUT, 0 },
- { (void *)getres_socket, 0, ZMQ_POLLIN, 0 }
- };
- while(true)
- {
- if(sync_done)
- {
- cout << "sync done in main thread" << endl;
- break;
- }
- zmq::message_t sync_msg(4);
- zmq::message_t res_msg;
- memcpy((void *)sync_msg.data(), SYNC_MSG, SYNC_MSGLEN);
- zmq::poll(&items[0], 2, -1);
- if (items[0].revents & ZMQ_POLLOUT)
- {
- distask_socket.send(sync_msg, ZMQ_DONTWAIT);
- }
- if (items[1].revents & ZMQ_POLLIN)
- {
- getres_socket.recv(&res_msg);
- cout << "pass " << (char *)res_msg.data() << " to if_sync_done" << endl;
- int threadID = stoi(string((char *)res_msg.data()));
- sync_done = if_sync_done(threadID, sync_array, worker_num);
- }
- }
- // while(true)
- // {
- // if(sync_done)
- // {
- // cout << "sync done in main thread" << endl;
- // break;
- // }
- // zmq::message_t sync_msg(4);
- // memcpy((void *)sync_msg.data(), SYNC_MSG, SYNC_MSGLEN);
- // for(int i = 0; i < worker_num; i++)
- // distask_socket.send(sync_msg);
- // for(int i = 0; i < worker_num; i++)
- // {
- // if(sync_done)
- // break;
- // if(i != 0)
- // this_thread::sleep_for(chrono::milliseconds(500));
- // zmq::message_t res_msg;
- // int ret = getres_socket.recv(&res_msg);
- // // if(ret == -1 && errno == EAGAIN)
- // // continue;
- // int threadID = stoi(string((char *)res_msg.data()));
- // cout << "pass " << (char *)res_msg.data() << " to if_sync_done" << endl;
- // sync_done = if_sync_done(threadID, sync_array, worker_num);
- // }
- // }
- assert(sync_done);
- // for(int i = 0; i < worker_num; i++)
- // {
- // zmq::message_t msg(6);
- // memcpy((void *) msg.data (), "World", 6);
- // distask_socket.send(msg);
- // }
- for(auto &t : pool)
- t.join();
- delete[] sync_array;
- exit(0);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement