Advertisement
Riskybiz

paranoidPirateWorker

Aug 25th, 2014
61
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.08 KB | None | 0 0
  1. //
  2. //  Paranoid Pirate worker
  3. //
  4. //
  5. //     Andreas Hoelzlwimmer <andreas.hoelzlwimmer@fh-hagenberg.at>
  6. //
  7. #include "zmsg.hpp"
  8.  
  9. #include <iomanip>
  10.  
  11. #define HEARTBEAT_LIVENESS  3       //  3-5 is reasonable
  12. #define HEARTBEAT_INTERVAL  1000    //  msecs
  13. #define INTERVAL_INIT       1000    //  Initial reconnect
  14. #define INTERVAL_MAX       32000    //  After exponential backoff
  15.  
  16. //  Helper function that returns a new configured socket
  17. //  connected to the Hello World server
  18. //
  19. std::string identity;
  20.  
  21. static zmq::socket_t *
  22. s_worker_socket (zmq::context_t &context) {
  23.     zmq::socket_t * worker = new zmq::socket_t(context, ZMQ_DEALER);
  24.  
  25.     //  Set random identity to make tracing easier
  26.     identity = s_set_id(*worker);
  27.     worker->connect ("tcp://localhost:5556");
  28.  
  29.     //  Configure socket to not wait at close time
  30.     int linger = 0;
  31.     worker->setsockopt (ZMQ_LINGER, &linger, sizeof (linger));
  32.  
  33.     //  Tell queue we're ready for work
  34.     std::cout << "I: (" << identity << ") worker ready" << std::endl;
  35.     s_send (*worker, "READY");
  36.  
  37.     return worker;
  38. }
  39.  
  40. int main (void)
  41. {
  42.     s_version_assert (2, 1);
  43.     srandom ((unsigned) time (NULL));
  44.  
  45.     zmq::context_t context (1);
  46.     zmq::socket_t * worker = s_worker_socket (context);
  47.  
  48.     //  If liveness hits zero, queue is considered disconnected
  49.     size_t liveness = HEARTBEAT_LIVENESS;
  50.     size_t interval = INTERVAL_INIT;
  51.  
  52.     //  Send out heartbeats at regular intervals
  53.     int64_t heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
  54.  
  55.     int cycles = 0;
  56.     while (1) {
  57.         zmq::pollitem_t items [] = { { *worker,  0, ZMQ_POLLIN, 0 } };
  58.         zmq::poll (items, 1, HEARTBEAT_INTERVAL);//HEARTBEAT_INTERVAL * 1000
  59.  
  60.         if (items [0].revents & ZMQ_POLLIN) {
  61.             //  Get message
  62.             //  - 3-part envelope + content -> request
  63.             //  - 1-part "HEARTBEAT" -> heartbeat
  64.             zmsg msg (*worker);
  65.  
  66.             if (msg.parts () == 3) {
  67.                 //  Simulate various problems, after a few cycles
  68.                 cycles++;
  69.                 if (cycles > 3 && within (5) == 0) {
  70.                     std::cout << "I: worker (" << identity << ") simulating a crash" << std::endl;
  71.                     msg.clear ();
  72.                     break;
  73.                 }
  74.                 else {
  75.                    if (cycles > 3 && within (5) == 0) {
  76.                       std::cout << "I: worker (" << identity << ") simulating CPU overload" << std::endl;
  77.                        sleep (5);
  78.                    }
  79.                 }
  80.                 std::cout << "I: worker (" << identity << ") received normal reply - " << msg.body() << std::endl;
  81.                 msg.send (*worker);
  82.                 liveness = HEARTBEAT_LIVENESS;
  83.                 sleep (1);              //  Do some heavy work
  84.             }
  85.             else {
  86.                if (msg.parts () == 1
  87.                && strcmp (msg.body (), "HEARTBEAT") == 0) {
  88.                    liveness = HEARTBEAT_LIVENESS;
  89.                }
  90.                else {
  91.                    std::cout << "E: worker (" << identity << ") recieved invalid message" << std::endl;
  92.                    msg.dump ();
  93.                }
  94.             }
  95.             interval = INTERVAL_INIT;
  96.         }
  97.         else
  98.         if (--liveness == 0) {
  99.             std::cout << "W: worker (" << identity << ") heartbeat failure, can't reach queue" << std::endl;
  100.             std::cout << "W: worker (" << identity << ") reconnecting in " << interval << " msec..." << std::endl;
  101.             s_sleep (interval);
  102.  
  103.             if (interval < INTERVAL_MAX) {
  104.                 interval *= 2;
  105.             }
  106.             delete worker;
  107.             worker = s_worker_socket (context);
  108.             liveness = HEARTBEAT_LIVENESS;
  109.         }
  110.  
  111.         //  Send heartbeat to queue if it's time
  112.         if (s_clock () > heartbeat_at) {
  113.             heartbeat_at = s_clock () + HEARTBEAT_INTERVAL;
  114.             std::cout << "I: worker (" << identity << ") sends heartbeat" << std::endl;
  115.             s_send (*worker, "HEARTBEAT");
  116.         }
  117.     }
  118.     delete worker;
  119.     return 0;
  120. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement