Advertisement
Riskybiz

paranoidPirateQueue

Aug 25th, 2014
74
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 6.04 KB | None | 0 0
  1. //
  2. //  Paranoid Pirate queue
  3. //
  4. //     Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
  5. //
  6. #include "zmsg.hpp"
  7.  
  8. #include <stdint.h>
  9. #include <vector>
  10.  
  11. #define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable
  12. #define HEARTBEAT_INTERVAL  1000    //  msecs
  13.  
  14. //  This defines one active worker in our worker queue
  15.  
  16. typedef struct {
  17.     std::string identity;           //  Address of worker
  18.     int64_t     expiry;             //  Expires at this time
  19. } worker_t;
  20.  
  21. //  Insert worker at end of queue, reset expiry
  22. //  Worker must not already be in queue
  23. static void
  24. s_worker_append (std::vector<worker_t> &queue, std::string &identity)
  25. {
  26.     bool found = false;
  27.     for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
  28.         if (it->identity.compare(identity) == 0) {
  29.             std::cout << "E: queue has duplicate worker identity " << identity.c_str() << std::endl;
  30.             found = true;
  31.             break;
  32.         }
  33.     }
  34.     if (!found) {
  35.         worker_t worker;
  36.         worker.identity = identity;
  37.         worker.expiry = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
  38.         queue.push_back(worker);
  39.         std::cout << "I: queue appends worker (" << identity << ")" << std::endl;
  40.     }
  41. }
  42.  
  43. //  Remove worker from queue, if present
  44. static void
  45. s_worker_delete (std::vector<worker_t> &queue, std::string &identity)
  46. {
  47.     for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
  48.         if (it->identity.compare(identity) == 0) {
  49.             it = queue.erase(it);
  50.             std::cout << "I: queue deletes worker (" << identity << ")" << std::endl;
  51.             break;
  52.          }
  53.     }
  54. }
  55.  
  56. //  Reset worker expiry, worker must be present
  57. static void
  58. s_worker_refresh (std::vector<worker_t> &queue, std::string &identity)
  59. {
  60.     bool found = false;
  61.     for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
  62.         if (it->identity.compare(identity) == 0) {
  63.            it->expiry = s_clock ()
  64.                  + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
  65.            found = true;
  66.            std::cout << "I: queue refreshes worker (" << identity << ")" << std::endl;
  67.            break;
  68.         }
  69.     }
  70.     if (!found) {
  71.        std::cout << "E: queue reports worker " << identity << " not ready" << std::endl;
  72.     }
  73. }
  74.  
  75. //  Pop next available worker off queue, return identity
  76. static std::string
  77. s_worker_dequeue (std::vector<worker_t> &queue)
  78. {
  79.     assert (queue.size());
  80.     std::string identity = queue[0].identity;
  81.     queue.erase(queue.begin());
  82.     std::cout << "I: queue accesses next available worker (" << identity << ")" << std::endl;
  83.     return identity;
  84. }
  85.  
  86. //  Look for & kill expired workers
  87. static void
  88. s_queue_purge (std::vector<worker_t> &queue)
  89. {
  90.     int64_t clock = s_clock();
  91.     for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
  92.         if (clock > it->expiry) {
  93.            it = queue.erase(it)-1;
  94.            std::cout << "I: queue purges worker (" << it->identity << ")" << std::endl;
  95.         }
  96.     }
  97. }
  98.  
  99. int main (void)
  100. {
  101.     s_version_assert (2, 1);
  102.  
  103.     //  Prepare our context and sockets
  104.     zmq::context_t context(1);
  105.     zmq::socket_t frontend(context, ZMQ_ROUTER);
  106.     zmq::socket_t backend (context, ZMQ_ROUTER);
  107.     frontend.bind("tcp://*:5555");    //  For clients
  108.     backend.bind ("tcp://*:5556");    //  For workers
  109.  
  110.     //  Queue of available workers
  111.     std::vector<worker_t> queue;
  112.  
  113.     //  Send out heartbeats at regular intervals
  114.     int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
  115.  
  116.     while (1) {
  117.         zmq::pollitem_t items [] = {
  118.             { backend,  0, ZMQ_POLLIN, 0 },
  119.             { frontend, 0, ZMQ_POLLIN, 0 }
  120.         };
  121.         //  Poll frontend only if we have available workers
  122.         if (queue.size()) {
  123.             zmq::poll (items, 2, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL * 1000
  124.         } else {
  125.             zmq::poll (items, 1, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL * 1000
  126.         }
  127.  
  128.         //  Handle worker activity on backend
  129.         if (items [0].revents & ZMQ_POLLIN) {
  130.             zmsg msg (backend);
  131.             std::string identity(msg.unwrap ());
  132.  
  133.             //  Return reply to client if it's not a control message
  134.             if (msg.parts () == 1) {
  135.                 if (strcmp (msg.address (), "READY") == 0) {
  136.                     s_worker_delete (queue, identity);
  137.                     s_worker_append (queue, identity);
  138.                 }
  139.                 else {
  140.                    if (strcmp (msg.address (), "HEARTBEAT") == 0) {
  141.                        s_worker_refresh (queue, identity);
  142.                    } else {
  143.                        std::cout << "E: queue recieved invalid message from " << identity << std::endl;
  144.                        msg.dump ();
  145.                    }
  146.                 }
  147.             }
  148.             else {
  149.                 msg.send (frontend);
  150.                 std::cout << "I: queue sends message via frontend (" << identity << ")" << std::endl;
  151.                 s_worker_append (queue, identity);
  152.             }
  153.         }
  154.         if (items [1].revents & ZMQ_POLLIN) {
  155.             //  Now get next client request, route to next worker
  156.             zmsg msg (frontend);
  157.             std::string identity = std::string(s_worker_dequeue (queue));
  158.             msg.push_front((char*)identity.c_str());
  159.             msg.send (backend);
  160.         }
  161.  
  162.         //  Send heartbeats to idle workers if it's time
  163.         if (s_clock () > heartbeat_at) {
  164.             for (std::vector<worker_t>::iterator it = queue.begin(); it < queue.end(); it++) {
  165.                 zmsg msg ("HEARTBEAT");
  166.                 msg.wrap (it->identity.c_str(), NULL);
  167.                 msg.send (backend);
  168.                 std::cout << "I: queue sends heartbeat to worker (" << it->identity << ") via backend" << std::endl;
  169.             }
  170.             heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
  171.         }
  172.         s_queue_purge(queue);
  173.     }
  174.     //  We never exit the main loop
  175.     //  But pretend to do the right shutdown anyhow
  176.     queue.clear();
  177.     return 0;
  178. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement