Advertisement
Riskybiz

zactor termination

Dec 11th, 2017
137
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 8.72 KB | None | 0 0
  1. #ifndef LISTENER_H
  2. #define LISTENER_H
  3.  
  4. #include <atomic>
  5. #include <iostream>
  6. #include <map>
  7. #include <memory>
  8. #include <mutex>
  9. #include <set>
  10. #include <stdexcept>
  11. #include <string>
  12. #include <czmq.h>
  13. #include "client.h"
  14.  
  15.  
  16. class Listener
  17. {
  18. public:
  19.     //Constructor
  20.     Listener(const std::string ip_addr, const std::string curve_folder_path, const std::set<std::string> &ds) :
  21.         ip(ip_addr),
  22.         curve_folder(curve_folder_path),
  23.         desired_sources(ds),
  24.         cease(false),
  25.         finished(false),
  26.         uuidMap(),
  27.         map_protector()
  28.     {
  29.         try
  30.         {
  31.             //  Now start the detached threads; has its own ZeroMQ context.
  32.             listener_actor = zactor_new(static_listener_task, (void*) this);
  33.             //if (listener_actor != 0) { throw std::runtime_error("Error creating listener actor"); }
  34.  
  35.         }
  36.         catch (std::runtime_error &rte)
  37.         {
  38.             std::string err = rte.what();
  39.             std::string errConc = "Listener: Runtime Exception: " + err;
  40.             TextOutput(errConc);
  41.         }
  42.         catch (std::exception &ex)
  43.         {
  44.             std::string err = ex.what();
  45.             std::string errConc = "Listener: Exception: " + err;
  46.             TextOutput(errConc);
  47.         }
  48.         catch (...)
  49.         {
  50.             std::string errConc = "Listener: Unknown Exception: ";
  51.             TextOutput(errConc);
  52.         }
  53.     }
  54.  
  55.     //Destructor
  56.     ~Listener()
  57.     {
  58.         //  Free the memory we used
  59.         //  and coordinate process termination with actor termination
  60.         TextOutput("Listener destructor invoked...");
  61.         zactor_destroy(&listener_actor);
  62.         TextOutput("Listener destructor completed.");
  63.     }
  64.  
  65. private:
  66.     std::string ip;
  67.     std::string curve_folder;
  68.     zactor_t *listener_actor;
  69.     std::set<std::string> desired_sources;
  70.     std::atomic<bool> cease;
  71.     std::atomic<bool> finished;
  72.     std::map<std::string, std::shared_ptr<Client>> uuidMap;
  73.     std::mutex map_protector;
  74.  
  75.     //Static keyword : if it’s a static member function of a class, its type is the same as if it were an ordinary function: β€œint (*)(char,float)”.  It does not take on type of Class::Function.
  76.     //https://isocpp.org/wiki/faq/pointers-to-members
  77.     //https://stackoverflow.com/questions/19808054/convert-c-function-pointer-to-c-function-pointer/19809787
  78.     static void static_listener_task(zsock_t *pipe, void* ptr)
  79.     {
  80.         Listener *cPtr = static_cast<Listener*>(ptr);
  81.         cPtr->listener_task(pipe, cPtr);//Call the class-member-function via the pointer
  82.     }
  83.  
  84.     static int static_inbound_event(zloop_t *loop, zsock_t *reader, void *ptr)
  85.     {
  86.         Listener *cPtr = static_cast<Listener*>(ptr);
  87.         return cPtr->inbound_event(reader, cPtr);//Call the class-member-function via the pointer
  88.     }
  89.  
  90.     static int static_check_client_status_event(zloop_t *loop, int timer_id, void *ptr)
  91.     {
  92.         Listener *cPtr = static_cast<Listener*>(ptr);
  93.         return cPtr->check_client_status_event(cPtr);//Call the class-member-function via the pointer
  94.     }
  95.  
  96.     //  The client task runs in its own context, and receives the
  97.     //  server public key as an argument.
  98.  
  99.     static void
  100.         listener_task(zsock_t *pipe, Listener* cPtr)
  101.     {
  102.  
  103.         try
  104.         {//  Load our persistent certificate from disk
  105.         zcert_t *client_cert = zcert_load("client_cert.txt");
  106.         assert(client_cert);
  107.  
  108.         zcert_t *server_cert = zcert_load("server_cert.txt");
  109.         assert(server_cert);
  110.  
  111.         //  Create client socket and configure it to use full encryption
  112.         zsock_t *listener = zsock_new(ZMQ_SUB);
  113.         zsock_set_subscribe(listener, "");//Subscribe to all messages
  114.         assert(listener);
  115.         zcert_apply(client_cert, listener);
  116.         zsock_set_curve_serverkey(listener, zcert_public_txt(server_cert));
  117.  
  118.         //Set heartbeat interval
  119.         //zsock_set_heartbeat_ttl(client, 250);
  120.         //zsock_set_heartbeat_ivl(client, 2500);
  121.  
  122.  
  123.         //Setup socket monitoring
  124.         zactor_t *listenermon = zactor_new(zmonitor, listener);
  125.         assert(listenermon);
  126.  
  127.         zstr_sendx(listenermon, "VERBOSE", NULL);
  128.         zstr_sendx(listenermon, "LISTEN", "LISTENING", "ACCEPTED", "CONNECTED", "DISCONNECTED", NULL);
  129. #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEED)
  130.         zstr_sendx(clientmon, "LISTEN", "HANDSHAKE_SUCCEED", NULL);
  131. #endif
  132.         zstr_sendx(listenermon, "START", NULL);
  133.         zsock_wait(listenermon);
  134.  
  135.         //Connect
  136.         int rc1 = zsock_connect(listener, (cPtr->ip).c_str());
  137.         if (rc1 == -1)
  138.         {
  139.             throw std::runtime_error("Error binding listener socket; is the address valid e.g. tcp://127.0.0.1:9000");
  140.         }
  141.         else
  142.         {
  143.             cPtr->TextOutput("Listener SUB connected to port: " + std::to_string(rc1));
  144.         }
  145.  
  146.         //Signal OK to instantiator
  147.         zsock_signal(pipe, 0);     
  148.  
  149.         //Setup reactor
  150.         zloop_t *loop = zloop_new();
  151.         assert(loop);
  152.         zloop_set_verbose(loop, true);//verbose
  153.  
  154.         // Register the socket reader and its callback
  155.         int rc = zloop_reader(loop, listener, static_inbound_event, cPtr);
  156.         assert(rc == 0);
  157.        
  158.         //Register the timed event
  159.         int timer_id = zloop_timer(loop, 10000, 0, static_check_client_status_event, cPtr);//zero means run indefinately
  160.        
  161.         //zloop_reader_set_tolerant(loop, listener);
  162.         zloop_set_nonstop(loop, false);
  163.         zloop_start(loop);
  164.  
  165.  
  166.         // Monitor for shutdown signals
  167.         while (!cPtr->cease)
  168.         {
  169.             zclock_sleep(250);
  170.             cPtr->cease = cPtr->check_term(pipe);
  171.             if (cPtr->cease == true)
  172.             {
  173.                 cPtr->TextOutput("Ceasing on zactor termination.");
  174.                 break;
  175.             }
  176.  
  177.             zmsg_t *msg = zmsg_recv_nowait(listenermon);
  178.             if (msg)
  179.             {
  180.                 std::string event_type(zmsg_popstr(msg));
  181.                 cPtr->TextOutput("Event: " + event_type);
  182.  
  183.                 if (event_type == "DISCONNECTED")
  184.                 {
  185.                     cPtr->cease = true;
  186.                     cPtr->TextOutput("Ceasing on disconnection");
  187.                     //cPtr->TextOutput("DISCONNECTED");
  188.                     break;
  189.                 }
  190.  
  191.             }
  192.             zmsg_destroy(&msg);
  193.         }
  194.  
  195.         //cPtr->TextOutput("Freeing memory in listener task.");
  196.         //  Free all memory we used
  197.         zloop_reader_end(loop, listener);
  198.         zloop_timer_end(loop, timer_id);
  199.         zloop_destroy(&loop);
  200.         zactor_destroy(&listenermon);
  201.         zsock_destroy(&listener);
  202.         zcert_destroy(&client_cert);       
  203.         //Final act
  204.         cPtr->finished = true;
  205.         cPtr->TextOutput("Leaving listener task.");
  206.         }
  207.         catch (std::runtime_error &rte)
  208.         {
  209.             std::string err = rte.what();
  210.             std::string errConc = "Listener: Runtime Exception: " + err;
  211.             cPtr->TextOutput(errConc);
  212.         }
  213.         catch (std::exception &ex)
  214.         {
  215.             std::string err = ex.what();
  216.             std::string errConc = "Listener: Exception: " + err;
  217.             cPtr->TextOutput(errConc);
  218.         }
  219.         catch (...)
  220.         {
  221.             std::string errConc = "Listener: Unknown Exception: ";
  222.             cPtr->TextOutput(errConc);
  223.         }
  224.     }
  225.  
  226.     int inbound_event(zsock_t *reader, Listener* cPtr)
  227.     {
  228.         zmsg_t *msgIn = zmsg_recv_nowait(reader);
  229.         if (msgIn)
  230.         {
  231.             if (zmsg_size(msgIn) == 4)//i.e. number of frames (0 or more).
  232.             {
  233.                 std::string uuid(zmsg_popstr(msgIn));
  234.                 std::string type(zmsg_popstr(msgIn));
  235.                 std::string source(zmsg_popstr(msgIn));
  236.                 std::string port(zmsg_popstr(msgIn));
  237.                 //cPtr->TextOutput("Received: " + uuid + " " + type + " router port: " + port);
  238.  
  239.                 //If the uuid is not present in the map and this is an ADVERT message then instantiate a CloneConsumer to handle the Source.
  240.                 if (type == "ADVERT" && (cPtr->uuidMap).end() == (cPtr->uuidMap).find(uuid))
  241.                 {
  242.                     std::lock_guard<std::mutex> guard(cPtr->map_protector);
  243.                     auto found = (cPtr->desired_sources).find(source);
  244.                     if (found != (cPtr->desired_sources).end())//if the advert is from a desired source.
  245.                     {
  246.                         (cPtr->uuidMap).insert(std::make_pair(uuid, std::shared_ptr<Client>(new Client(cPtr->ip, port, cPtr->curve_folder))));
  247.                         cPtr->TextOutput("Listener Created: " + uuid);
  248.                     }
  249.                 }
  250.             }//size of message             
  251.              //zmsg_print(msgIn);
  252.         }//if msg
  253.         else
  254.             return 1;//no message
  255.         zmsg_destroy(&msgIn);
  256.         return 0;
  257.     }
  258.    
  259.     int check_client_status_event(Listener* cPtr)
  260.     {      
  261.         std::lock_guard<std::mutex> guard(cPtr->map_protector);
  262.         //TextOutput("Scheduled Check");
  263.         auto iter = uuidMap.begin();
  264.         while (iter != uuidMap.end())
  265.         {
  266.             //TextOutput("Scheduled Check: Iteration");
  267.             if ((iter->second)->isFinished() == true)
  268.             {
  269.                 cPtr->TextOutput("Removed client: UUID: " + iter->first);
  270.                 uuidMap.erase(iter++);//post increment; use the iterator value in the function, then increment it.
  271.             }
  272.             else
  273.             {
  274.                 ++iter;//advance
  275.             }
  276.         }      
  277.         return 0;
  278.     }
  279.  
  280.    
  281.     bool check_term(zsock_t *pipe)
  282.     {
  283.         bool terminated(false);
  284.  
  285.         zmsg_t *msg = zmsg_recv_nowait(pipe);
  286.         if (msg)
  287.         {
  288.             std::string command(zmsg_popstr(msg));
  289.             //  All actors must handle $TERM in this way
  290.             TextOutput("Command received: " + command);
  291.             if (command == "$TERM")
  292.                 terminated = true;
  293.             else
  294.             {
  295.                 TextOutput("E: invalid message to actor");
  296.             }
  297.         }
  298.         zmsg_destroy(&msg);
  299.         return terminated;
  300.     }
  301.  
  302.     //Helper function
  303.     inline
  304.         void TextOutput(const std::string &out)
  305.     {
  306. #if defined OUTPUTDEBUGVIEW    
  307.         OutputDebugStringA(("CloneClient: Listener: " + out).c_str());
  308. #else      
  309.         std::cout << "CloneClient: Listener: " << out.c_str() << std::endl;
  310. #endif
  311.     }
  312.  
  313. };
  314. #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement