Advertisement
Riskybiz

zmq2.h

Aug 23rd, 2014
154
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.88 KB | None | 0 0
  1. /**
  2.  * Extending zmq::socket_t to send/receive multipart messages.
  3.  * This is the first version, where Frames is a std::vector, now deprecated.
  4.  * In the second version Frames is a std::deque, see: http://thisthread.blogspot.com/2012/06/zeromq-multipart-message-as-deque.html
  5.  *
  6.  * More information here: http://thisthread.blogspot.com/2012/04/extending-zmqsockett.html
  7.  * Improved multipart send discussed here: http://thisthread.blogspot.com/2012/05/improved-sending-for-zmqsocket.html
  8.  * Int messages: http://thisthread.blogspot.com/2012/05/sendingreceiving-ints-over-zeromq.html
  9.  * Inspired by czmq, as described in the ZGuide: http://zguide.zeromq.org/page:all#A-High-Level-API-for-MQ
  10.  */
  11.  
  12. #pragma once
  13.  
  14. #include <cstdint>
  15. #include <vector>
  16. #include <zmq.hpp>
  17.  
  18. namespace zmq
  19. {
  20.     typedef std::vector<std::string> Frames;
  21.  
  22.     class Socket : public socket_t
  23.     {
  24.     public:
  25.         Socket(context_t& context, int type) : socket_t(context, type) {}
  26.  
  27.         Socket(context_t& context, int type, const std::string& id) : socket_t(context, type)
  28.         {
  29.             zmq_setsockopt(this->operator void*(), ZMQ_IDENTITY, id.c_str(), id.length());
  30.         }
  31.  
  32.         bool send(const std::string& frame, int flags =0)
  33.         {
  34.             return send(frame.c_str(), frame.length(), flags);
  35.         }
  36.  
  37.         bool send(const char* frame, int flags =0)
  38.         {
  39.             return send(frame, strlen(frame), flags);
  40.         }
  41.  
  42.         bool send(const std::string& frame1, const std::string& frame2)
  43.         {
  44.             if(!send(frame1, ZMQ_SNDMORE))
  45.                 return false;
  46.             // last frame
  47.             return send(frame2);
  48.         }
  49.  
  50.         bool send(const char* frame1, const char* frame2)
  51.         {
  52.             if(!send(frame1, ZMQ_SNDMORE))
  53.                 return false;
  54.             // last frame
  55.             return send(frame2);
  56.         }
  57.  
  58.         bool send(const std::string& frame1, const std::string& frame2, const std::string& frame3)
  59.         {
  60.             if(!send(frame1, ZMQ_SNDMORE))
  61.                 return false;
  62.             if(!send(frame2, ZMQ_SNDMORE))
  63.                 return false;
  64.             // last frame
  65.             return send(frame3);
  66.         }
  67.  
  68.         bool send(const char* frame1, const char* frame2, const char* frame3)
  69.         {
  70.             if(!send(frame1, ZMQ_SNDMORE))
  71.                 return false;
  72.             if(!send(frame2, ZMQ_SNDMORE))
  73.                 return false;
  74.             // last frame
  75.             return send(frame3);
  76.         }
  77.  
  78.         bool send(const Frames& frames)
  79.         {
  80.             if(!frames.size())
  81.                 throw error_t();
  82.  
  83.             // all frames but last one
  84.             for(unsigned int i = 0; i < frames.size() - 1; ++i)
  85.                 if(!send(frames[i], ZMQ_SNDMORE))
  86.                     return false;
  87.             // last frame
  88.             return send(frames.back());
  89.         }
  90.  
  91.         bool send(int value, int flags =0)
  92.         {
  93.             zmq::message_t msg(sizeof(int));
  94.             memcpy(msg.data(), &value, sizeof(int));
  95.             return socket_t::send(msg, flags);
  96.         }
  97.  
  98.         /*
  99.             n: expected number of frames, including separators
  100.          */
  101.         Frames blockingRecv(int n, bool checked =true)
  102.         {
  103.             Frames frames;
  104.             frames.reserve(n);
  105.  
  106.             do {
  107.                 zmq::message_t message;
  108.                 if(!socket_t::recv(&message, 0))
  109.                     throw error_t();
  110.  
  111.                 const char* base = static_cast<const char*>(message.data());
  112.                 frames.push_back(std::string(base, base + message.size()));
  113.             } while(sockopt_rcvmore());
  114.  
  115.             if(checked && frames.size() != n)
  116.                 throw error_t();
  117.  
  118.             return frames;
  119.         }
  120.  
  121.         std::string recvAsString(int flags =0)
  122.         {
  123.             zmq::message_t message;
  124.             if(!socket_t::recv(&message, flags))
  125.                 throw error_t();
  126.  
  127.             const char* base = static_cast<const char*>(message.data());
  128.             return std::string(base, base + message.size());
  129.         }
  130.  
  131.         int recvAsInt(int flags =0)
  132.         {
  133.             zmq::message_t message;
  134.             if(!socket_t::recv(&message, flags))
  135.                 throw error_t();
  136.  
  137.             return *(static_cast<int*>(message.data()));
  138.         }
  139.     private:
  140.         Socket(const Socket&);
  141.         void operator=(const Socket&);
  142.  
  143.         bool sockopt_rcvmore()
  144.         {
  145.             int64_t rcvmore;
  146.             size_t type_size = sizeof(int64_t);
  147.             getsockopt(ZMQ_RCVMORE, &rcvmore, &type_size);
  148.             return rcvmore ? true : false;
  149.         }
  150.  
  151.         bool send(const char* frame, size_t len, int flags =0)
  152.         {
  153.             zmq::message_t msg(len);
  154.             memcpy(msg.data(), frame, len);
  155.             return socket_t::send(msg, flags);
  156.         }
  157.     };
  158. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement