SHARE
TWEET

Untitled

a guest Apr 21st, 2018 79 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. #ifndef ZMQ_HPP_
  2. #define ZMQ_HPP_
  3.  
  4. #include <zmq.h>
  5.  
  6. #include <algorithm>
  7. #include <cstring>
  8. #include <exception>
  9. #include <iterator>
  10. #include <memory>
  11. #include <tuple>
  12. #include <vector>
  13.  
  14. namespace zmq {
  15. /* Version information */
  16. void version(int &major, int &minor, int &patch) {
  17.   zmq_version(&major, &minor, &patch);
  18. }
  19.  
  20. std::tuple<int, int, int> version() {
  21.   int major, minor, patch;
  22.   version(major, minor, patch);
  23.   return std::make_tuple(major, minor, patch);
  24. }
  25.  
  26. /* Error handling through exceptions */
  27. struct error : public std::exception {
  28.   error() noexcept : errno_{zmq_errno()} {}
  29.   const char *what() const noexcept override { return zmq_strerror(errno_); }
  30.   operator int() const noexcept { return errno_; }
  31.  
  32. private:
  33.   int errno_;
  34. };
  35.  
  36. /* Forward declarations and definitions */
  37. struct context;
  38. struct socket;
  39. struct message;
  40. struct poller;
  41.  
  42. enum struct context_opt : int;
  43. enum struct socket_type : int;
  44. enum struct socket_opt : int;
  45. enum struct monitor_event : int;
  46. enum struct poll_event : short;
  47.  
  48. struct context {
  49.   context();
  50.  
  51.   void set(context_opt, int) const;
  52.   int get(context_opt) const;
  53.  
  54.   explicit operator void *() const noexcept;
  55.   explicit operator bool() const noexcept;
  56.  
  57. private:
  58.   std::shared_ptr<void> ptr_;
  59. };
  60.  
  61. struct socket {
  62.   socket(const context &, socket_type);
  63.  
  64.   template <class T> void set(socket_opt, const T &) const;
  65.   template <class T> void get(socket_opt, T &) const;
  66.  
  67.   void bind(const char *) const;
  68.   void connect(const char *) const;
  69.   void unbind(const char *) const;
  70.   void disconnect(const char *) const;
  71.  
  72.   int send(message, bool = true) const;
  73.   template <class T> int send(std::size_t, const T *, int) const;
  74.  
  75.   message receive(bool = true) const;
  76.   template <class T> int receive(std::size_t, T *, int) const;
  77.  
  78.   socket monitor(const context &, const char *, monitor_event) const;
  79.   socket monitor(const context &, const char *, int) const;
  80.  
  81.   explicit operator void *() const noexcept;
  82.  
  83. private:
  84.   std::unique_ptr<void, void (*)(void *)> ptr_;
  85. };
  86.  
  87. struct message {
  88. private:
  89.   struct part {
  90.     /* Constructors */
  91.     part() noexcept;
  92.     explicit part(std::size_t);
  93.     template <class T> part(const T &);
  94.     template <class T> part(std::size_t, const T *);
  95.     template <class T> part(std::size_t, T *, zmq_free_fn *, void * = nullptr);
  96.     template <class Iter> part(Iter, Iter);
  97.  
  98.     /* Copying and moving */
  99.     part(const part &) noexcept;
  100.     part &operator=(const part &) noexcept;
  101.     part(part &&) noexcept;
  102.     part &operator=(part &&) noexcept;
  103.  
  104.     /* Destruction */
  105.     ~part();
  106.  
  107.     /* Convenience */
  108.     std::size_t size() const noexcept;
  109.     explicit operator void *() noexcept;
  110.     void *data() noexcept;
  111.     bool more() const noexcept;
  112.  
  113.     int send(const socket &, int);
  114.     int receive(const socket &, int);
  115.     template <class OutputIt> OutputIt read(OutputIt, OutputIt) noexcept;
  116.     void clear() noexcept;
  117.  
  118.   private:
  119.     zmq_msg_t msg_;
  120.   };
  121.  
  122.   std::vector<part> parts_;
  123.  
  124. public:
  125.   message() noexcept(noexcept(std::vector<part>()));
  126.   message(std::vector<part>) noexcept;
  127.  
  128.   std::size_t addpart();
  129.   std::size_t addpart(std::size_t);
  130.   template <class T> std::size_t addpart(const T &);
  131.   template <class T> std::size_t addpart(std::size_t, const T *);
  132.   template <class T>
  133.   std::size_t addpart(std::size_t, T *, zmq_free_fn *, void * = nullptr);
  134.   template <class Iter> std::size_t addpart(Iter, Iter);
  135.  
  136.   void *data(std::size_t) noexcept;
  137.  
  138.   std::size_t numparts() const noexcept;
  139.   std::size_t size() const noexcept;
  140.   std::size_t size(std::size_t) const noexcept;
  141.  
  142.   part &operator[](std::size_t);
  143.   const part &operator[](std::size_t) const;
  144.  
  145.   void pop_back();
  146.  
  147.   int send(const socket &, bool = true);
  148.   int receive(const socket &, bool = true);
  149.   template <class OutputIt>
  150.   OutputIt read(std::size_t, OutputIt, OutputIt) noexcept;
  151.   void clear() noexcept;
  152. };
  153.  
  154. struct poller {
  155. private:
  156.   struct item {
  157.     explicit item(const zmq_pollitem_t) noexcept;
  158.  
  159.     bool belongsto(const socket &) const noexcept;
  160.     bool isready() const noexcept;
  161.  
  162.   private:
  163.     const zmq_pollitem_t item_;
  164.   };
  165.  
  166.   std::vector<zmq_pollitem_t> items_;
  167.  
  168. public:
  169.   poller() noexcept(noexcept(std::vector<zmq_pollitem_t>()));
  170.  
  171.   void additem(const socket &, poll_event);
  172.   void additem(const socket &, short);
  173.  
  174.   std::size_t size() const noexcept;
  175.   item operator[](std::size_t) const;
  176.  
  177.   int poll(long);
  178.  
  179.   void clear() noexcept;
  180. };
  181.  
  182. /* Proxy functionality */
  183. void proxy(const socket *frontend, const socket *backend,
  184.            const socket *capture = nullptr) {
  185.   void *f = static_cast<void *>(*frontend);
  186.   void *b = static_cast<void *>(*backend);
  187.   void *c = (capture == nullptr) ? nullptr : static_cast<void *>(*capture);
  188.   zmq_proxy(f, b, c);
  189. }
  190. void proxy(const socket *frontend, const socket *backend, const socket *capture,
  191.            const socket *control) {
  192.   void *f = static_cast<void *>(*frontend);
  193.   void *b = static_cast<void *>(*backend);
  194.   void *cap = (capture == nullptr) ? nullptr : static_cast<void *>(*capture);
  195.   void *con = static_cast<void *>(*control);
  196.   zmq_proxy_steerable(f, b, cap, con);
  197. }
  198.  
  199. /* Implementations */
  200. enum struct context_opt : int {
  201. #ifdef ZMQ_IO_THREADS
  202.   io_threads = ZMQ_IO_THREADS,
  203. #endif
  204. #ifdef ZMQ_MAX_SOCKETS
  205.   max_sockets = ZMQ_MAX_SOCKETS,
  206. #endif
  207. #ifdef ZMQ_SOCKET_LIMIT
  208.   socket_limit = ZMQ_SOCKET_LIMIT,
  209. #endif
  210. #ifdef ZMQ_THREAD_PRIORITY
  211.   thread_priority = ZMQ_THREAD_PRIORITY,
  212. #endif
  213. #ifdef ZMQ_THREAD_SCHED_POLICY
  214.   thread_sched_policy = ZMQ_THREAD_SCHED_POLICY,
  215. #endif
  216. #ifdef ZMQ_MAX_MSGSZ
  217.   max_msgsz = ZMQ_MAX_MSGSZ,
  218. #endif
  219. #ifdef ZMQ_MSG_T_SIZE
  220.   msg_t_size = ZMQ_MSG_T_SIZE,
  221. #endif
  222. #ifdef ZMQ_THREAD_AFFINITY_CPU_ADD
  223.   thread_affinity_cpu_add = ZMQ_THREAD_AFFINITY_CPU_ADD,
  224. #endif
  225. #ifdef ZMQ_THREAD_AFFINITY_CPU_REMOVE
  226.   thread_affinity_cpu_remove = ZMQ_THREAD_AFFINITY_CPU_REMOVE,
  227. #endif
  228. #ifdef ZMQ_THREAD_NAME_PREFIX
  229.   thread_name_prefix = ZMQ_THREAD_NAME_PREFIX,
  230. #endif
  231. #ifdef ZMQ_ZERO_COPY_RCV
  232.   zero_copy_rcv = ZMQ_ZERO_COPY_RCV,
  233. #endif
  234. };
  235.  
  236. context::context()
  237.     : ptr_{zmq_ctx_new(), [](void *ptr) noexcept {
  238.              if (ptr != nullptr)
  239.                zmq_ctx_term(ptr);
  240.            }} {
  241.   if (!ptr_)
  242.     throw error();
  243. }
  244.  
  245. void context::set(context_opt name, int value) const {
  246.   int rc = zmq_ctx_set(ptr_.get(), static_cast<int>(name), value);
  247.   if (rc != 0)
  248.     throw error();
  249. }
  250. int context::get(context_opt name) const {
  251.   int val = zmq_ctx_get(ptr_.get(), static_cast<int>(name));
  252.   if (val < 0)
  253.     throw error();
  254.   return val;
  255. }
  256.  
  257. context::operator void *() const noexcept { return ptr_.get(); }
  258. context::operator bool() const noexcept { return static_cast<bool>(ptr_); }
  259.  
  260. enum struct socket_type : int {
  261. #ifdef ZMQ_PAIR
  262.   pair = ZMQ_PAIR,
  263. #endif
  264. #ifdef ZMQ_PUB
  265.   pub = ZMQ_PUB,
  266. #endif
  267. #ifdef ZMQ_SUB
  268.   sub = ZMQ_SUB,
  269. #endif
  270. #ifdef ZMQ_REQ
  271.   req = ZMQ_REQ,
  272. #endif
  273. #ifdef ZMQ_REP
  274.   rep = ZMQ_REP,
  275. #endif
  276. #ifdef ZMQ_DEALER
  277.   dealer = ZMQ_DEALER,
  278. #endif
  279. #ifdef ZMQ_ROUTER
  280.   router = ZMQ_ROUTER,
  281. #endif
  282. #ifdef ZMQ_PULL
  283.   pull = ZMQ_PULL,
  284. #endif
  285. #ifdef ZMQ_PUSH
  286.   push = ZMQ_PUSH,
  287. #endif
  288. #ifdef ZMQ_XPUB
  289.   xpub = ZMQ_XPUB,
  290. #endif
  291. #ifdef ZMQ_XSUB
  292.   xsub = ZMQ_XSUB,
  293. #endif
  294. #ifdef ZMQ_STREAM
  295.   stream = ZMQ_STREAM,
  296. #endif
  297. #ifdef ZMQ_SERVER
  298.   server = ZMQ_SERVER,
  299. #endif
  300. #ifdef ZMQ_CLIENT
  301.   client = ZMQ_CLIENT,
  302. #endif
  303. #ifdef ZMQ_RADIO
  304.   radio = ZMQ_RADIO,
  305. #endif
  306. #ifdef ZMQ_DISH
  307.   dish = ZMQ_DISH,
  308. #endif
  309. #ifdef ZMQ_GATHER
  310.   gather = ZMQ_GATHER,
  311. #endif
  312. #ifdef ZMQ_SCATTER
  313.   scatter = ZMQ_SCATTER,
  314. #endif
  315. #ifdef ZMQ_DGRAM
  316.   dgram = ZMQ_DGRAM,
  317. #endif
  318. };
  319.  
  320. enum struct socket_opt : int {
  321. #ifdef ZMQ_AFFINITY
  322.   affinity = ZMQ_AFFINITY,
  323. #endif
  324. #ifdef ZMQ_ROUTING_ID
  325.   routing_id = ZMQ_ROUTING_ID,
  326. #endif
  327. #ifdef ZMQ_SUBSCRIBE
  328.   subscribe = ZMQ_SUBSCRIBE,
  329. #endif
  330. #ifdef ZMQ_UNSUBSCRIBE
  331.   unsubscribe = ZMQ_UNSUBSCRIBE,
  332. #endif
  333. #ifdef ZMQ_RATE
  334.   rate = ZMQ_RATE,
  335. #endif
  336. #ifdef ZMQ_RECOVERY_IVL
  337.   recovery_ivl = ZMQ_RECOVERY_IVL,
  338. #endif
  339. #ifdef ZMQ_SNDBUF
  340.   sndbuf = ZMQ_SNDBUF,
  341. #endif
  342. #ifdef ZMQ_RCVBUF
  343.   rcvbuf = ZMQ_RCVBUF,
  344. #endif
  345. #ifdef ZMQ_RCVMORE
  346.   rcvmore = ZMQ_RCVMORE,
  347. #endif
  348. #ifdef ZMQ_FD
  349.   fd = ZMQ_FD,
  350. #endif
  351. #ifdef ZMQ_EVENTS
  352.   events = ZMQ_EVENTS,
  353. #endif
  354. #ifdef ZMQ_TYPE
  355.   type = ZMQ_TYPE,
  356. #endif
  357. #ifdef ZMQ_LINGER
  358.   linger = ZMQ_LINGER,
  359. #endif
  360. #ifdef ZMQ_RECONNECT_IVL
  361.   reconnect_ivl = ZMQ_RECONNECT_IVL,
  362. #endif
  363. #ifdef ZMQ_BACKLOG
  364.   backlog = ZMQ_BACKLOG,
  365. #endif
  366. #ifdef ZMQ_RECONNECT_IVL_MAX
  367.   reconnect_ivl_max = ZMQ_RECONNECT_IVL_MAX,
  368. #endif
  369. #ifdef ZMQ_MAXMSGSIZE
  370.   maxmsgsize = ZMQ_MAXMSGSIZE,
  371. #endif
  372. #ifdef ZMQ_SNDHWM
  373.   sndhwm = ZMQ_SNDHWM,
  374. #endif
  375. #ifdef ZMQ_RCVHWM
  376.   rcvhwm = ZMQ_RCVHWM,
  377. #endif
  378. #ifdef ZMQ_MULTICAST_HOPS
  379.   multicast_hops = ZMQ_MULTICAST_HOPS,
  380. #endif
  381. #ifdef ZMQ_RCVTIMEO
  382.   rcvtimeo = ZMQ_RCVTIMEO,
  383. #endif
  384. #ifdef ZMQ_SNDTIMEO
  385.   sndtimeo = ZMQ_SNDTIMEO,
  386. #endif
  387. #ifdef ZMQ_LAST_ENDPOINT
  388.   last_endpoint = ZMQ_LAST_ENDPOINT,
  389. #endif
  390. #ifdef ZMQ_ROUTER_MANDATORY
  391.   router_mandatory = ZMQ_ROUTER_MANDATORY,
  392. #endif
  393. #ifdef ZMQ_TCP_KEEPALIVE
  394.   tcp_keepalive = ZMQ_TCP_KEEPALIVE,
  395. #endif
  396. #ifdef ZMQ_TCP_KEEPALIVE_CNT
  397.   tcp_keepalive_cnt = ZMQ_TCP_KEEPALIVE_CNT,
  398. #endif
  399. #ifdef ZMQ_TCP_KEEPALIVE_IDLE
  400.   tcp_keepalive_idle = ZMQ_TCP_KEEPALIVE_IDLE,
  401. #endif
  402. #ifdef ZMQ_TCP_KEEPALIVE_INTVL
  403.   tcp_keepalive_intvl = ZMQ_TCP_KEEPALIVE_INTVL,
  404. #endif
  405. #ifdef ZMQ_IMMEDIATE
  406.   immediate = ZMQ_IMMEDIATE,
  407. #endif
  408. #ifdef ZMQ_XPUB_VERBOSE
  409.   xpub_verbose = ZMQ_XPUB_VERBOSE,
  410. #endif
  411. #ifdef ZMQ_ROUTER_RAW
  412.   router_raw = ZMQ_ROUTER_RAW,
  413. #endif
  414. #ifdef ZMQ_IPV6
  415.   ipv6 = ZMQ_IPV6,
  416. #endif
  417. #ifdef ZMQ_MECHANISM
  418.   mechanism = ZMQ_MECHANISM,
  419. #endif
  420. #ifdef ZMQ_PLAIN_SERVER
  421.   plain_server = ZMQ_PLAIN_SERVER,
  422. #endif
  423. #ifdef ZMQ_PLAIN_USERNAME
  424.   plain_username = ZMQ_PLAIN_USERNAME,
  425. #endif
  426. #ifdef ZMQ_PLAIN_PASSWORD
  427.   plain_password = ZMQ_PLAIN_PASSWORD,
  428. #endif
  429. #ifdef ZMQ_CURVE_SERVER
  430.   curve_server = ZMQ_CURVE_SERVER,
  431. #endif
  432. #ifdef ZMQ_CURVE_PUBLICKEY
  433.   curve_publickey = ZMQ_CURVE_PUBLICKEY,
  434. #endif
  435. #ifdef ZMQ_CURVE_SECRETKEY
  436.   curve_secretkey = ZMQ_CURVE_SECRETKEY,
  437. #endif
  438. #ifdef ZMQ_CURVE_SERVERKEY
  439.   curve_serverkey = ZMQ_CURVE_SERVERKEY,
  440. #endif
  441. #ifdef ZMQ_PROBE_ROUTER
  442.   probe_router = ZMQ_PROBE_ROUTER,
  443. #endif
  444. #ifdef ZMQ_REQ_CORRELATE
  445.   req_correlate = ZMQ_REQ_CORRELATE,
  446. #endif
  447. #ifdef ZMQ_REQ_RELAXED
  448.   req_relaxed = ZMQ_REQ_RELAXED,
  449. #endif
  450. #ifdef ZMQ_CONFLATE
  451.   conflate = ZMQ_CONFLATE,
  452. #endif
  453. #ifdef ZMQ_ZAP_DOMAIN
  454.   zap_domain = ZMQ_ZAP_DOMAIN,
  455. #endif
  456. #ifdef ZMQ_ROUTER_HANDOVER
  457.   router_handover = ZMQ_ROUTER_HANDOVER,
  458. #endif
  459. #ifdef ZMQ_TOS
  460.   tos = ZMQ_TOS,
  461. #endif
  462. #ifdef ZMQ_CONNECT_ROUTING_ID
  463.   connect_routing_id = ZMQ_CONNECT_ROUTING_ID,
  464. #endif
  465. #ifdef ZMQ_GSSAPI_SERVER
  466.   gssapi_server = ZMQ_GSSAPI_SERVER,
  467. #endif
  468. #ifdef ZMQ_GSSAPI_PRINCIPAL
  469.   gssapi_principal = ZMQ_GSSAPI_PRINCIPAL,
  470. #endif
  471. #ifdef ZMQ_GSSAPI_SERVICE_PRINCIPAL
  472.   gssapi_service_principal = ZMQ_GSSAPI_SERVICE_PRINCIPAL,
  473. #endif
  474. #ifdef ZMQ_GSSAPI_PLAINTEXT
  475.   gssapi_plaintext = ZMQ_GSSAPI_PLAINTEXT,
  476. #endif
  477. #ifdef ZMQ_HANDSHAKE_IVL
  478.   handshake_ivl = ZMQ_HANDSHAKE_IVL,
  479. #endif
  480. #ifdef ZMQ_SOCKS_PROXY
  481.   socks_proxy = ZMQ_SOCKS_PROXY,
  482. #endif
  483. #ifdef ZMQ_XPUB_NODROP
  484.   xpub_nodrop = ZMQ_XPUB_NODROP,
  485. #endif
  486. #ifdef ZMQ_BLOCKY
  487.   blocky = ZMQ_BLOCKY,
  488. #endif
  489. #ifdef ZMQ_XPUB_MANUAL
  490.   xpub_manual = ZMQ_XPUB_MANUAL,
  491. #endif
  492. #ifdef ZMQ_XPUB_WELCOME_MSG
  493.   xpub_welcome_msg = ZMQ_XPUB_WELCOME_MSG,
  494. #endif
  495. #ifdef ZMQ_STREAM_NOTIFY
  496.   stream_notify = ZMQ_STREAM_NOTIFY,
  497. #endif
  498. #ifdef ZMQ_INVERT_MATCHING
  499.   invert_matching = ZMQ_INVERT_MATCHING,
  500. #endif
  501. #ifdef ZMQ_HEARTBEAT_IVL
  502.   heartbeat_ivl = ZMQ_HEARTBEAT_IVL,
  503. #endif
  504. #ifdef ZMQ_HEARTBEAT_TTL
  505.   heartbeat_ttl = ZMQ_HEARTBEAT_TTL,
  506. #endif
  507. #ifdef ZMQ_HEARTBEAT_TIMEOUT
  508.   heartbeat_timeout = ZMQ_HEARTBEAT_TIMEOUT,
  509. #endif
  510. #ifdef ZMQ_XPUB_VERBOSER
  511.   xpub_verboser = ZMQ_XPUB_VERBOSER,
  512. #endif
  513. #ifdef ZMQ_CONNECT_TIMEOUT
  514.   connect_timeout = ZMQ_CONNECT_TIMEOUT,
  515. #endif
  516. #ifdef ZMQ_TCP_MAXRT
  517.   tcp_maxrt = ZMQ_TCP_MAXRT,
  518. #endif
  519. #ifdef ZMQ_THREAD_SAFE
  520.   thread_safe = ZMQ_THREAD_SAFE,
  521. #endif
  522. #ifdef ZMQ_MULTICAST_MAXTPDU
  523.   multicast_maxtpdu = ZMQ_MULTICAST_MAXTPDU,
  524. #endif
  525. #ifdef ZMQ_VMCI_BUFFER_SIZE
  526.   vmci_buffer_size = ZMQ_VMCI_BUFFER_SIZE,
  527. #endif
  528. #ifdef ZMQ_VMCI_BUFFER_MIN_SIZE
  529.   vmci_buffer_min_size = ZMQ_VMCI_BUFFER_MIN_SIZE,
  530. #endif
  531. #ifdef ZMQ_VMCI_BUFFER_MAX_SIZE
  532.   vmci_buffer_max_size = ZMQ_VMCI_BUFFER_MAX_SIZE,
  533. #endif
  534. #ifdef ZMQ_VMCI_CONNECT_TIMEOUT
  535.   vmci_connect_timeout = ZMQ_VMCI_CONNECT_TIMEOUT,
  536. #endif
  537. #ifdef ZMQ_USE_FD
  538.   use_fd = ZMQ_USE_FD,
  539. #endif
  540. #ifdef ZMQ_GSSAPI_PRINCIPAL_NAMETYPE
  541.   gssapi_principal_nametype = ZMQ_GSSAPI_PRINCIPAL_NAMETYPE,
  542. #endif
  543. #ifdef ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE
  544.   gssapi_service_principal_nametype = ZMQ_GSSAPI_SERVICE_PRINCIPAL_NAMETYPE,
  545. #endif
  546. #ifdef ZMQ_BINDTODEVICE
  547.   bindtodevice = ZMQ_BINDTODEVICE,
  548. #endif
  549. #ifdef ZMQ_ZAP_ENFORCE_DOMAIN
  550.   zap_enforce_domain = ZMQ_ZAP_ENFORCE_DOMAIN,
  551. #endif
  552. #ifdef ZMQ_LOOPBACK_FASTPATH
  553.   loopback_fastpath = ZMQ_LOOPBACK_FASTPATH,
  554. #endif
  555. #ifdef ZMQ_METADATA
  556.   metadata = ZMQ_METADATA,
  557. #endif
  558. };
  559.  
  560. enum struct monitor_event : int {
  561. #ifdef ZMQ_EVENT_CONNECTED
  562.   event_connected = ZMQ_EVENT_CONNECTED,
  563. #endif
  564. #ifdef ZMQ_EVENT_CONNECT_DELAYED
  565.   event_connect_delayed = ZMQ_EVENT_CONNECT_DELAYED,
  566. #endif
  567. #ifdef ZMQ_EVENT_CONNECT_RETRIED
  568.   event_connect_retried = ZMQ_EVENT_CONNECT_RETRIED,
  569. #endif
  570. #ifdef ZMQ_EVENT_LISTENING
  571.   event_listening = ZMQ_EVENT_LISTENING,
  572. #endif
  573. #ifdef ZMQ_EVENT_BIND_FAILED
  574.   event_bind_failed = ZMQ_EVENT_BIND_FAILED,
  575. #endif
  576. #ifdef ZMQ_EVENT_ACCEPTED
  577.   event_accepted = ZMQ_EVENT_ACCEPTED,
  578. #endif
  579. #ifdef ZMQ_EVENT_ACCEPT_FAILED
  580.   event_accept_failed = ZMQ_EVENT_ACCEPT_FAILED,
  581. #endif
  582. #ifdef ZMQ_EVENT_CLOSED
  583.   event_closed = ZMQ_EVENT_CLOSED,
  584. #endif
  585. #ifdef ZMQ_EVENT_CLOSE_FAILED
  586.   event_close_failed = ZMQ_EVENT_CLOSE_FAILED,
  587. #endif
  588. #ifdef ZMQ_EVENT_DISCONNECTED
  589.   event_disconnected = ZMQ_EVENT_DISCONNECTED,
  590. #endif
  591. #ifdef ZMQ_EVENT_MONITOR_STOPPED
  592.   event_monitor_stopped = ZMQ_EVENT_MONITOR_STOPPED,
  593. #endif
  594. #ifdef ZMQ_EVENT_ALL
  595.   event_all = ZMQ_EVENT_ALL,
  596. #endif
  597. };
  598.  
  599. socket::socket(const context &ctx, socket_type type)
  600.     : ptr_{zmq_socket(static_cast<void *>(ctx), static_cast<int>(type)),
  601.            [](void *ptr) noexcept { zmq_close(ptr); }} {
  602.   if (!ptr_)
  603.     throw error();
  604. }
  605.  
  606. template <class T> void socket::set(socket_opt name, const T &value) const {
  607.   int rc =
  608.       zmq_setsockopt(ptr_.get(), static_cast<int>(name), &value, sizeof(T));
  609.   if (rc != 0)
  610.     throw error();
  611. }
  612. template <class T> void socket::get(socket_opt name, T &value) const {
  613.   std::size_t sz = sizeof(T);
  614.   int rc = zmq_getsockopt(ptr_.get(), static_cast<int>(name), &value, &sz);
  615.   if (rc != 0)
  616.     throw error();
  617. }
  618.  
  619. void socket::bind(const char *endpoint) const {
  620.   int rc = zmq_bind(ptr_.get(), endpoint);
  621.   if (rc != 0)
  622.     throw error();
  623. }
  624. void socket::connect(const char *endpoint) const {
  625.   int rc = zmq_connect(ptr_.get(), endpoint);
  626.   if (rc != 0)
  627.     throw error();
  628. }
  629. void socket::unbind(const char *endpoint) const {
  630.   int rc = zmq_unbind(ptr_.get(), endpoint);
  631.   if (rc != 0)
  632.     throw error();
  633. }
  634. void socket::disconnect(const char *endpoint) const {
  635.   int rc = zmq_disconnect(ptr_.get(), endpoint);
  636.   if (rc != 0)
  637.     throw error();
  638. }
  639.  
  640. int socket::send(message msg, bool wait) const {
  641.   int bytes = msg.send(*this, wait);
  642.   return bytes;
  643. }
  644. template <class T>
  645. int socket::send(std::size_t len, const T *data, int flags) const {
  646.   int bytes =
  647.       zmq_send_const(static_cast<void *>(*this), data, len * sizeof(T), flags);
  648.   if (bytes < 0)
  649.     throw error();
  650.   return bytes;
  651. }
  652.  
  653. message socket::receive(bool wait) const {
  654.   message msg;
  655.   msg.receive(*this, wait);
  656.   return msg;
  657. }
  658. template <class T>
  659. int socket::receive(std::size_t len, T *data, int flags) const {
  660.   int bytes =
  661.       zmq_recv(static_cast<void *>(*this), data, len * sizeof(T), flags);
  662.   if (bytes < 0)
  663.     throw error();
  664.   return bytes;
  665. }
  666.  
  667. socket socket::monitor(const context &ctx, const char *endpoint,
  668.                        monitor_event event) const {
  669.   return monitor(ctx, endpoint, static_cast<int>(event));
  670. }
  671. socket socket::monitor(const context &ctx, const char *endpoint,
  672.                        int events) const {
  673.   int rc = zmq_socket_monitor(static_cast<void *>(*this), endpoint, events);
  674.   if (rc < 0)
  675.     throw error();
  676.   socket listener(ctx, socket_type::pair);
  677.   listener.connect(endpoint);
  678.   return listener;
  679. }
  680.  
  681. socket::operator void *() const noexcept { return ptr_.get(); }
  682.  
  683. message::part::part() noexcept { zmq_msg_init(&msg_); }
  684. message::part::part(std::size_t size) {
  685.   int rc = zmq_msg_init_size(&msg_, size);
  686.   if (rc != 0)
  687.     throw error();
  688. }
  689. template <class T> message::part::part(const T &value) : part(sizeof(T)) {
  690.   std::memcpy(zmq_msg_data(&msg_), &value, sizeof(T));
  691. }
  692. template <class T>
  693. message::part::part(std::size_t len, const T *data) : part(len * sizeof(T)) {
  694.   std::memcpy(zmq_msg_data(&msg_), data, len * sizeof(T));
  695. }
  696. template <class T>
  697. message::part::part(std::size_t len, T *data, zmq_free_fn *ffn, void *hint) {
  698.   int rc = zmq_msg_init_data(&msg_, data, len * sizeof(T), ffn, hint);
  699.   if (rc != 0)
  700.     throw error();
  701. }
  702. template <class Iter> message::part::part(Iter begin, Iter end) {
  703.   using size_type = typename std::iterator_traits<Iter>::difference_type;
  704.   using value_type = typename std::iterator_traits<Iter>::value_type;
  705.   size_type sz = std::distance(begin, end) * sizeof(value_type);
  706.   int rc = zmq_msg_init_size(&msg_, sz);
  707.   if (rc != 0)
  708.     throw error();
  709.   value_type *dest = static_cast<value_type *>(zmq_msg_data(&msg_));
  710.   std::copy(begin, end, dest);
  711. }
  712.  
  713. message::part::part(const part &rhs) noexcept : part() {
  714.   zmq_msg_copy(&msg_, const_cast<zmq_msg_t *>(&rhs.msg_));
  715. }
  716. message::part &message::part::operator=(const part &rhs) noexcept {
  717.   clear();
  718.   zmq_msg_copy(&msg_, const_cast<zmq_msg_t *>(&rhs.msg_));
  719.   return *this;
  720. }
  721. message::part::part(part &&rhs) noexcept : part() {
  722.   zmq_msg_move(&msg_, &rhs.msg_);
  723. }
  724. message::part &message::part::operator=(part &&rhs) noexcept {
  725.   clear();
  726.   zmq_msg_move(&msg_, &rhs.msg_);
  727.   return *this;
  728. }
  729.  
  730. message::part::~part() { zmq_msg_close(&msg_); }
  731.  
  732. std::size_t message::part::size() const noexcept { return zmq_msg_size(&msg_); }
  733. message::part::operator void *() noexcept { return zmq_msg_data(&msg_); }
  734. void *message::part::data() noexcept { return static_cast<void *>(*this); }
  735. bool message::part::more() const noexcept { return zmq_msg_more(&msg_) == 1; }
  736.  
  737. int message::part::send(const socket &s, int flags) {
  738.   int bytes = zmq_msg_send(&msg_, static_cast<void *>(s), flags);
  739.   if (bytes < 0)
  740.     throw error();
  741.   return bytes;
  742. }
  743. int message::part::receive(const socket &s, int flags) {
  744.   clear();
  745.   int bytes = zmq_msg_recv(&msg_, static_cast<void *>(s), flags);
  746.   if (bytes < 0)
  747.     throw error();
  748.   return bytes;
  749. }
  750. template <class OutputIt>
  751. OutputIt message::part::read(OutputIt begin, OutputIt end) noexcept {
  752.   using value_type = typename std::iterator_traits<OutputIt>::value_type;
  753.   const std::size_t iterlen = std::distance(begin, end);
  754.   const std::size_t datalen = size() / sizeof(value_type);
  755.   value_type *source = static_cast<value_type *>(static_cast<void *>(*this));
  756.   return std::copy(source, source + std::min(iterlen, datalen), begin);
  757. }
  758. void message::part::clear() noexcept {
  759.   zmq_msg_close(&msg_);
  760.   zmq_msg_init(&msg_);
  761. }
  762.  
  763. message::message() noexcept(noexcept(std::vector<part>())) = default;
  764. message::message(std::vector<part> parts) noexcept : parts_{std::move(parts)} {}
  765.  
  766. std::size_t message::addpart() {
  767.   parts_.push_back(part());
  768.   return parts_.back().size();
  769. }
  770. std::size_t message::addpart(std::size_t size) {
  771.   parts_.push_back(part(size));
  772.   return parts_.back().size();
  773. }
  774. template <class T> std::size_t message::addpart(const T &value) {
  775.   part msg = value;
  776.   parts_.push_back(std::move(msg));
  777.   return parts_.back().size();
  778. }
  779. template <class T>
  780. std::size_t message::addpart(std::size_t len, const T *data) {
  781.   parts_.emplace_back(len, data);
  782.   return parts_.back().size();
  783. }
  784. template <class T>
  785. std::size_t message::addpart(std::size_t len, T *data, zmq_free_fn *ffn,
  786.                              void *hint) {
  787.   parts_.emplace_back(len, data, ffn, hint);
  788.   return parts_.back().size();
  789. }
  790. template <class Iter> std::size_t message::addpart(Iter begin, Iter end) {
  791.   parts_.emplace_back(begin, end);
  792.   return parts_.back().size();
  793. }
  794.  
  795. void *message::data(std::size_t pid) noexcept {
  796.   return pid < numparts() ? static_cast<void *>(parts_[pid]) : nullptr;
  797. }
  798.  
  799. std::size_t message::numparts() const noexcept { return parts_.size(); }
  800. std::size_t message::size() const noexcept {
  801.   std::size_t size_ = 0;
  802.   for (const auto &part : parts_)
  803.     size_ += part.size();
  804.   return size_;
  805. }
  806. std::size_t message::size(std::size_t pid) const noexcept {
  807.   return pid < numparts() ? parts_[pid].size() : 0;
  808. }
  809.  
  810. message::part &message::operator[](std::size_t pid) { return parts_[pid]; }
  811. const message::part &message::operator[](std::size_t pid) const {
  812.   return parts_[pid];
  813. }
  814.  
  815. void message::pop_back() { parts_.pop_back(); }
  816.  
  817. int message::send(const socket &s, bool wait) {
  818.   int flags = wait ? 0 : ZMQ_DONTWAIT;
  819.   int bytes, total{0};
  820.   const std::size_t len = parts_.size();
  821.   for (std::size_t idx = 0; idx < len; idx++) {
  822.     bytes = parts_[idx].send(s, (idx < len - 1) ? flags | ZMQ_SNDMORE : flags);
  823.     total += bytes;
  824.   }
  825.   return total;
  826. }
  827. int message::receive(const socket &s, bool wait) {
  828.   clear();
  829.   int flags = wait ? 0 : ZMQ_DONTWAIT;
  830.   int bytes, total{0};
  831.   part msgpart;
  832.  
  833.   bytes = msgpart.receive(s, flags);
  834.   total = bytes;
  835.   while (msgpart.more()) {
  836.     parts_.push_back(std::move(msgpart));
  837.     bytes = msgpart.receive(s, flags);
  838.     total += bytes;
  839.   }
  840.   parts_.push_back(std::move(msgpart));
  841.   return total;
  842. }
  843. template <class OutputIt>
  844. OutputIt message::read(std::size_t pid, OutputIt begin, OutputIt end) noexcept {
  845.   return pid < numparts() ? parts_[pid].read(begin, end) : begin;
  846. }
  847. void message::clear() noexcept { parts_.clear(); }
  848.  
  849. enum struct poll_event : short {
  850. #ifdef ZMQ_POLLIN
  851.   pollin = ZMQ_POLLIN,
  852. #endif
  853. #ifdef ZMQ_POLLOUT
  854.   pollout = ZMQ_POLLOUT,
  855. #endif
  856. #ifdef ZMQ_POLLERR
  857.   poller = ZMQ_POLLERR,
  858. #endif
  859. #ifdef ZMQ_POLLPRI
  860.   pollpri = ZMQ_POLLPRI,
  861. #endif
  862. };
  863.  
  864. poller::item::item(const zmq_pollitem_t item) noexcept : item_{item} {}
  865.  
  866. bool poller::item::belongsto(const socket &s) const noexcept {
  867.   return item_.socket == static_cast<void *>(s);
  868. }
  869. bool poller::item::isready() const noexcept {
  870.   return item_.events & item_.revents;
  871. }
  872.  
  873. poller::poller() noexcept(noexcept(std::vector<zmq_pollitem_t>())) = default;
  874.  
  875. void poller::additem(const socket &s, poll_event pe) {
  876.   additem(s, static_cast<short>(pe));
  877. }
  878. void poller::additem(const socket &s, short events) {
  879.   zmq_pollitem_t item;
  880.   item.socket = static_cast<void *>(s);
  881.   item.fd = 0;
  882.   item.events = events;
  883.   item.revents = 0;
  884.   items_.push_back(item);
  885. }
  886.  
  887. std::size_t poller::size() const noexcept { return items_.size(); }
  888. poller::item poller::operator[](std::size_t idx) const {
  889.   return item{items_[idx]};
  890. }
  891.  
  892. int poller::poll(long timeout) {
  893.   int rc = zmq_poll(&items_[0], items_.size(), timeout);
  894.   if (rc < 0)
  895.     throw error();
  896.   return rc;
  897. }
  898.  
  899. void poller::clear() noexcept { items_.clear(); }
  900. } // namespace zmq
  901.  
  902. #endif
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top