SergGulko

ZMQ/Win OUT OF MEMORY

Jan 18th, 2012
180
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 1.92 KB | None | 0 0
  1. #include <zmq.h>
  2. #include <memory.h>
  3. #include <iostream>
  4. #include <boost/thread/thread.hpp>
  5. #include <boost/thread/xtime.hpp>
  6.  
  7.  
  8. #define INTERNAL_DATA_SOCKET "inproc://data"
  9. #define MESSAGE  "{\"m_t\": 0, \"data\": {}}"
  10. void *context;
  11.  
  12. void outputWorker();
  13.  
  14. using namespace std;
  15.  
  16. int _tmain(int argc, _TCHAR* argv[]) {
  17.     context = zmq_init(1);
  18.     boost::thread outputWorkerThread(&outputWorker);
  19.     boost::xtime xtime;
  20.     boost::xtime_get(&xtime, boost::TIME_UTC);
  21.     xtime.sec+=1;
  22.     boost::thread::sleep(xtime);
  23.  
  24.  
  25.     void *intDataSocket = zmq_socket(context, ZMQ_PUB);
  26.     zmq_connect(intDataSocket, INTERNAL_DATA_SOCKET);
  27.     int messagesToStore = 1;
  28.     zmq_setsockopt(intDataSocket, ZMQ_HWM, &messagesToStore, strlen(MESSAGE));
  29.  
  30.  
  31.     int size = strlen(MESSAGE);
  32.     int rc;
  33.     while(1) {
  34.         zmq_msg_t request;
  35.         zmq_msg_init_size(&request, size);
  36.         memcpy(zmq_msg_data(&request), MESSAGE, size);
  37.         rc = zmq_send(intDataSocket, &request, 0);
  38.         assert(rc == 0);
  39.         zmq_msg_close(&request);
  40.        
  41.         //Remove comments to remove OUT OF MEMORY problem
  42.  
  43.         //boost::xtime_get(&xtime, boost::TIME_UTC);
  44.         //xtime.nsec+=1000;
  45.         //boost::thread::sleep(xtime);
  46.     }
  47.     return 0;
  48. }
  49.  
  50. void outputWorker() {
  51.     cout << "Starting worker thread " << endl;
  52.     void *outputSocket = zmq_socket(context, ZMQ_PUB);
  53.  
  54.     int bindResult = zmq_bind(outputSocket, OUTPUT_SOCKET);
  55.     int messagesToStore = 1;
  56.     zmq_setsockopt(outputSocket, ZMQ_HWM, &messagesToStore, strlen(MESSAGE));
  57.    
  58.     void *intDataSocket = zmq_socket(context, ZMQ_SUB);
  59.     bindResult = zmq_bind(intDataSocket, INTERNAL_DATA_SOCKET);
  60.     zmq_setsockopt(intDataSocket, ZMQ_SUBSCRIBE, "", 0);
  61.  
  62.     while(1) {
  63.         zmq_msg_t request;
  64.         zmq_msg_init(&request);
  65.         zmq_recv(intDataSocket, &request, 0);
  66.  
  67.         size_t size = zmq_msg_size(&request);
  68.         char *buffer = (char*) malloc(sizeof(char) * (size + 1));
  69.         memcpy(buffer, zmq_msg_data(&request), size);
  70.         buffer[size] = 0;
  71.         zmq_msg_close(&request);
  72.         free(buffer);
  73.     }
  74. }
Advertisement
Add Comment
Please, Sign In to add comment