Advertisement
Riskybiz

zmsg.hpp

Aug 25th, 2014
57
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 11.74 KB | None | 0 0
  1. /*  =========================================================================
  2.     zmsg.hpp
  3.  
  4.     Multipart message class for example applications.
  5.  
  6.     Follows the ZFL class conventions and is further developed as the ZFL
  7.     zfl_msg class.  See http://zfl.zeromq.org for more details.
  8.  
  9.     -------------------------------------------------------------------------
  10.     Copyright (c) 1991-2010 iMatix Corporation <www.imatix.com>
  11.     Copyright other contributors as noted in the AUTHORS file.
  12.  
  13.     This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
  14.  
  15.     This is free software; you can redistribute it and/or modify it under the
  16.     terms of the GNU Lesser General Public License as published by the Free
  17.     Software Foundation; either version 3 of the License, or (at your option)
  18.     any later version.
  19.  
  20.     This software is distributed in the hope that it will be useful, but
  21.     WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABIL-
  22.     ITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General
  23.     Public License for more details.
  24.  
  25.     You should have received a copy of the GNU Lesser General Public License
  26.     along with this program. If not, see <http://www.gnu.org/licenses/>.
  27.     =========================================================================
  28.  
  29.     Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
  30. */
  31.  
  32. #ifndef __ZMSG_H_INCLUDED__
  33. #define __ZMSG_H_INCLUDED__
  34.  
  35. #include "zhelpers.hpp"
  36.  
  37. #include <vector>
  38. #include <string>
  39. #include <stdarg.h>
  40.  
  41. class zmsg {
  42. public:
  43.     typedef std::basic_string<unsigned char> ustring;
  44.  
  45.     zmsg() {
  46.     }
  47.  
  48.    //  --------------------------------------------------------------------------
  49.    //  Constructor, sets initial body
  50.    zmsg(char const *body) {
  51.        body_set(body);
  52.    }
  53.  
  54.    //  -------------------------------------------------------------------------
  55.    //  Constructor, sets initial body and sends message to socket
  56.    zmsg(char const *body, zmq::socket_t &socket) {
  57.        body_set(body);
  58.        send(socket);
  59.    }
  60.  
  61.    //  --------------------------------------------------------------------------
  62.    //  Constructor, calls first receive automatically
  63.    zmsg(zmq::socket_t &socket) {
  64.        recv(socket);
  65.    }
  66.  
  67.    //  --------------------------------------------------------------------------
  68.    //  Copy Constructor, equivalent to zmsg_dup
  69.    zmsg(zmsg &msg) {
  70.        m_part_data.resize(msg.m_part_data.size());
  71.        std::copy(msg.m_part_data.begin(), msg.m_part_data.end(), m_part_data.begin());
  72.    }
  73.  
  74.    virtual ~zmsg() {
  75.       clear();
  76.    }
  77.  
  78.    //  --------------------------------------------------------------------------
  79.    //  Erases all messages
  80.    void clear() {
  81.        m_part_data.clear();
  82.    }
  83.  
  84.    void set_part(size_t part_nbr, unsigned char *data) {
  85.        if (part_nbr < m_part_data.size() && part_nbr >= 0) {
  86.            m_part_data[part_nbr] = data;
  87.        }
  88.    }
  89.  
  90.    bool recv(zmq::socket_t & socket) {
  91.       clear();
  92.       while(1) {
  93.          zmq::message_t message(0);
  94.          try {
  95.             if (!socket.recv(&message, 0)) {
  96.                return false;
  97.             }
  98.          } catch (zmq::error_t error) {
  99.             std::cout << "E: " << error.what() << std::endl;
  100.             return false;
  101.          }
  102.          ustring data = (unsigned char*) message.data();
  103.          //std::cerr << "recv: \"" << (unsigned char*) message.data() << "\", size " << message.size() << std::endl;
  104.          if (message.size() == 17 && ((unsigned char *)message.data())[0] == 0) {
  105.             char *uuidstr = encode_uuid((unsigned char*) message.data());
  106.             push_back(uuidstr);
  107.             delete[] uuidstr;
  108.          }
  109.          else {
  110.             data[message.size()] = 0;
  111.             push_back((char *)data.c_str());
  112.          }
  113.          //int64_t more = 0;
  114.          int more = 0;
  115.          size_t more_size = sizeof(more);
  116.          socket.getsockopt(ZMQ_RCVMORE, &more, &more_size);
  117.          if (!more) {
  118.             break;
  119.          }
  120.       }
  121.       return true;
  122.    }
  123.  
  124.    void send(zmq::socket_t & socket) {
  125.        for (size_t part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) {
  126.           zmq::message_t message;
  127.           ustring data = m_part_data[part_nbr];
  128.           if (data.size() == 33 && data [0] == '@') {
  129.              unsigned char * uuidbin = decode_uuid ((char *) data.c_str());
  130.              message.rebuild(17);
  131.              memcpy(message.data(), uuidbin, 17);
  132.              delete uuidbin;
  133.           }
  134.           else {
  135.              message.rebuild(data.size());
  136.              memcpy(message.data(), data.c_str(), data.size());
  137.           }
  138.           try {
  139.              socket.send(message, part_nbr < m_part_data.size() - 1 ? ZMQ_SNDMORE : 0);
  140.           } catch (zmq::error_t error) {
  141.              assert(error.num()!=0);
  142.           }
  143.        }
  144.        clear();
  145.    }
  146.  
  147.    size_t parts() {
  148.       return m_part_data.size();
  149.    }
  150.  
  151.    void body_set(const char *body) {
  152.       if (m_part_data.size() > 0) {
  153.          m_part_data.erase(m_part_data.end()-1);
  154.       }
  155.       push_back((char*)body);
  156.    }
  157.  
  158.    void
  159.    body_fmt (const char *format, ...)
  160.    {
  161.        char value [255 + 1];
  162.        va_list args;
  163.  
  164.        va_start (args, format);
  165.        vsnprintf (value, 255, format, args);
  166.        va_end (args);
  167.  
  168.        body_set (value);
  169.    }
  170.  
  171.    char * body ()
  172.    {
  173.        if (m_part_data.size())
  174.            return ((char *) m_part_data [m_part_data.size() - 1].c_str());
  175.        else
  176.            return 0;
  177.    }
  178.  
  179.    // zmsg_push
  180.    void push_front(char *part) {
  181.       m_part_data.insert(m_part_data.begin(), (unsigned char*)part);
  182.    }
  183.  
  184.    // zmsg_append
  185.    void push_back(char *part) {
  186.       m_part_data.push_back((unsigned char*)part);
  187.    }
  188.  
  189.    //  --------------------------------------------------------------------------
  190.    //  Formats 17-byte UUID as 33-char string starting with '@'
  191.    //  Lets us print UUIDs as C strings and use them as addresses
  192.    //
  193.    static char *
  194.    encode_uuid (unsigned char *data)
  195.    {
  196.        static char
  197.            hex_char [] = "0123456789ABCDEF";
  198.  
  199.        assert (data [0] == 0);
  200.        char *uuidstr = new char[34];
  201.        uuidstr [0] = '@';
  202.        int byte_nbr;
  203.        for (byte_nbr = 0; byte_nbr < 16; byte_nbr++) {
  204.            uuidstr [byte_nbr * 2 + 1] = hex_char [data [byte_nbr + 1] >> 4];
  205.            uuidstr [byte_nbr * 2 + 2] = hex_char [data [byte_nbr + 1] & 15];
  206.        }
  207.        uuidstr [33] = 0;
  208.        return (uuidstr);
  209.    }
  210.  
  211.  
  212.    // --------------------------------------------------------------------------
  213.    // Formats 17-byte UUID as 33-char string starting with '@'
  214.    // Lets us print UUIDs as C strings and use them as addresses
  215.    //
  216.    static unsigned char *
  217.    decode_uuid (char *uuidstr)
  218.    {
  219.        static char
  220.            hex_to_bin [128] = {
  221.               -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
  222.               -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
  223.               -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
  224.                0, 1, 2, 3, 4, 5, 6, 7, 8, 9,-1,-1,-1,-1,-1,-1, /* 0..9 */
  225.               -1,10,11,12,13,14,15,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* A..F */
  226.               -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* */
  227.               -1,10,11,12,13,14,15,-1,-1,-1,-1,-1,-1,-1,-1,-1, /* a..f */
  228.               -1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1,-1 }; /* */
  229.  
  230.        assert (strlen (uuidstr) == 33);
  231.        assert (uuidstr [0] == '@');
  232.        unsigned char *data = new unsigned char[17];
  233.        int byte_nbr;
  234.        data [0] = 0;
  235.        for (byte_nbr = 0; byte_nbr < 16; byte_nbr++)
  236.            data [byte_nbr + 1]
  237.                = (hex_to_bin [uuidstr [byte_nbr * 2 + 1] & 127] << 4)
  238.                + (hex_to_bin [uuidstr [byte_nbr * 2 + 2] & 127]);
  239.  
  240.        return (data);
  241.    }
  242.  
  243.    // zmsg_pop
  244.    ustring pop_front() {
  245.       if (m_part_data.size() == 0) {
  246.          return 0;
  247.       }
  248.       ustring part = m_part_data.front();
  249.       m_part_data.erase(m_part_data.begin());
  250.       return part;
  251.    }
  252.  
  253.    void append (const char *part)
  254.    {
  255.        assert (part);
  256.        push_back((char*)part);
  257.    }
  258.  
  259.    char *address() {
  260.       if (m_part_data.size()>0) {
  261.          return (char*)m_part_data[0].c_str();
  262.       } else {
  263.          return 0;
  264.       }
  265.    }
  266.  
  267.    void wrap(const char *address, const char *delim) {
  268.       if (delim) {
  269.          push_front((char*)delim);
  270.       }
  271.       push_front((char*)address);
  272.    }
  273.  
  274.    char * unwrap() {
  275.       if (m_part_data.size() == 0) {
  276.          return NULL;
  277.       }
  278.       char *addr = (char*)pop_front().c_str();
  279.       if (address() && *address() == 0) {
  280.          pop_front();
  281.       }
  282.       return addr;
  283.    }
  284.  
  285.    void dump() {
  286.       std::cerr << "--------------------------------------" << std::endl;
  287.       for (unsigned int part_nbr = 0; part_nbr < m_part_data.size(); part_nbr++) {
  288.           ustring data = m_part_data [part_nbr];
  289.  
  290.           // Dump the message as text or binary
  291.           int is_text = 1;
  292.           for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++)
  293.               if (data [char_nbr] < 32 || data [char_nbr] > 127)
  294.                   is_text = 0;
  295.  
  296.           std::cerr << "[" << std::setw(3) << std::setfill('0') << (int) data.size() << "] ";
  297.           for (unsigned int char_nbr = 0; char_nbr < data.size(); char_nbr++) {
  298.               if (is_text) {
  299.                   std::cerr << (char) data [char_nbr];
  300.               } else {
  301.                   std::cerr << std::hex << std::setw(2) << std::setfill('0') << (short int) data [char_nbr];
  302.               }
  303.           }
  304.           std::cerr << std::endl;
  305.       }
  306.    }
  307.  
  308.    static int
  309.    test(int verbose)
  310.    {
  311.       zmq::context_t context(1);
  312.       zmq::socket_t output(context, ZMQ_DEALER);
  313.       try {
  314.          output.bind("ipc://zmsg_selftest.ipc");
  315.       } catch (zmq::error_t error) {
  316.          assert(error.num()!=0);
  317.       }
  318.       zmq::socket_t input(context, ZMQ_ROUTER);
  319.       try {
  320.          input.connect("ipc://zmsg_selftest.ipc");
  321.       } catch (zmq::error_t error) {
  322.          assert(error.num()!=0);
  323.       }
  324.  
  325.       zmsg zm;
  326.       zm.body_set((char *)"Hello");
  327.       assert (strcmp(zm.body(), "Hello") == 0);
  328.  
  329.       zm.send(output);
  330.       assert(zm.parts()==0);
  331.  
  332.       zm.recv(input);
  333.       assert (zm.parts() == 2);
  334.       if(verbose) {
  335.          zm.dump();
  336.       }
  337.  
  338.       assert (strcmp(zm.body(), "Hello") == 0);
  339.  
  340.       zm.clear();
  341.       zm.body_set("Hello");
  342.       zm.wrap("address1", "");
  343.       zm.wrap("address2", 0);
  344.       assert (zm.parts() == 4);
  345.       zm.send(output);
  346.  
  347.       zm.recv(input);
  348.       if (verbose) {
  349.          zm.dump();
  350.       }
  351.       assert (zm.parts() == 5);
  352.       assert (strlen(zm.address()) == 33);
  353.       zm.unwrap();
  354.       assert (strcmp(zm.address(), "address2") == 0);
  355.       zm.body_fmt ("%c%s", 'W', "orld");
  356.       zm.send(output);
  357.  
  358.       zm.recv (input);
  359.       zm.unwrap ();
  360.       assert (zm.parts () == 4);
  361.       assert (strcmp (zm.body (), "World") == 0);
  362.       std::string part = zm.unwrap ();
  363.       assert (part.compare("address2") == 0);
  364.  
  365.       // Pull off address 1, check that empty part was dropped
  366.       part = zm.unwrap ();
  367.       assert (part.compare("address1") == 0);
  368.       assert (zm.parts () == 1);
  369.  
  370.       // Check that message body was correctly modified
  371.       part = (char*)zm.pop_front ().c_str();
  372.       assert (part.compare("World") == 0);
  373.       assert (zm.parts () == 0);
  374.  
  375.       // Check append method
  376.       zm.append ("Hello");
  377.       zm.append ("World!");
  378.       assert (zm.parts () == 2);
  379.       assert (strcmp (zm.body (), "World!") == 0);
  380.  
  381.       zm.clear();
  382.       assert (zm.parts() == 0);
  383.  
  384.       std::cout << "OK" << std::endl;
  385.       return 0;
  386.    }
  387.  
  388. private:
  389.    std::vector<ustring> m_part_data;
  390. };
  391.  
  392. #endif /* ZMSG_H_ */
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement