Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #ifndef LISTENER_H
- #define LISTENER_H
- #include <atomic>
- #include <iostream>
- #include <map>
- #include <memory>
- #include <mutex>
- #include <set>
- #include <stdexcept>
- #include <string>
- #include <czmq.h>
- #include "client.h"
- class Listener
- {
- public:
- //Constructor
- Listener(const std::string ip_addr, const std::string curve_folder_path, const std::set<std::string> &ds) :
- ip(ip_addr),
- curve_folder(curve_folder_path),
- desired_sources(ds),
- cease(false),
- finished(false),
- uuidMap(),
- map_protector()
- {
- try
- {
- // Now start the detached threads; has its own ZeroMQ context.
- listener_actor = zactor_new(static_listener_task, (void*) this);
- //if (listener_actor != 0) { throw std::runtime_error("Error creating listener actor"); }
- }
- catch (std::runtime_error &rte)
- {
- std::string err = rte.what();
- std::string errConc = "Listener: Runtime Exception: " + err;
- TextOutput(errConc);
- }
- catch (std::exception &ex)
- {
- std::string err = ex.what();
- std::string errConc = "Listener: Exception: " + err;
- TextOutput(errConc);
- }
- catch (...)
- {
- std::string errConc = "Listener: Unknown Exception: ";
- TextOutput(errConc);
- }
- }
- //Destructor
- ~Listener()
- {
- // Free the memory we used
- // and coordinate process termination with actor termination
- TextOutput("Listener destructor invoked...");
- zactor_destroy(&listener_actor);
- TextOutput("Listener destructor completed.");
- }
- private:
- std::string ip;
- std::string curve_folder;
- zactor_t *listener_actor;
- std::set<std::string> desired_sources;
- std::atomic<bool> cease;
- std::atomic<bool> finished;
- std::map<std::string, std::shared_ptr<Client>> uuidMap;
- std::mutex map_protector;
- //Static keyword : if itβs a static member function of a class, its type is the same as if it were an ordinary function: βint (*)(char,float)β. It does not take on type of Class::Function.
- //https://isocpp.org/wiki/faq/pointers-to-members
- //https://stackoverflow.com/questions/19808054/convert-c-function-pointer-to-c-function-pointer/19809787
- static void static_listener_task(zsock_t *pipe, void* ptr)
- {
- Listener *cPtr = static_cast<Listener*>(ptr);
- cPtr->listener_task(pipe, cPtr);//Call the class-member-function via the pointer
- }
- static int static_inbound_event(zloop_t *loop, zsock_t *reader, void *ptr)
- {
- Listener *cPtr = static_cast<Listener*>(ptr);
- return cPtr->inbound_event(reader, cPtr);//Call the class-member-function via the pointer
- }
- static int static_check_client_status_event(zloop_t *loop, int timer_id, void *ptr)
- {
- Listener *cPtr = static_cast<Listener*>(ptr);
- return cPtr->check_client_status_event(cPtr);//Call the class-member-function via the pointer
- }
- // The client task runs in its own context, and receives the
- // server public key as an argument.
- static void
- listener_task(zsock_t *pipe, Listener* cPtr)
- {
- try
- {// Load our persistent certificate from disk
- zcert_t *client_cert = zcert_load("client_cert.txt");
- assert(client_cert);
- zcert_t *server_cert = zcert_load("server_cert.txt");
- assert(server_cert);
- // Create client socket and configure it to use full encryption
- zsock_t *listener = zsock_new(ZMQ_SUB);
- zsock_set_subscribe(listener, "");//Subscribe to all messages
- assert(listener);
- zcert_apply(client_cert, listener);
- zsock_set_curve_serverkey(listener, zcert_public_txt(server_cert));
- //Set heartbeat interval
- //zsock_set_heartbeat_ttl(client, 250);
- //zsock_set_heartbeat_ivl(client, 2500);
- //Setup socket monitoring
- zactor_t *listenermon = zactor_new(zmonitor, listener);
- assert(listenermon);
- zstr_sendx(listenermon, "VERBOSE", NULL);
- zstr_sendx(listenermon, "LISTEN", "LISTENING", "ACCEPTED", "CONNECTED", "DISCONNECTED", NULL);
- #if defined (ZMQ_EVENT_HANDSHAKE_SUCCEED)
- zstr_sendx(clientmon, "LISTEN", "HANDSHAKE_SUCCEED", NULL);
- #endif
- zstr_sendx(listenermon, "START", NULL);
- zsock_wait(listenermon);
- //Connect
- int rc1 = zsock_connect(listener, (cPtr->ip).c_str());
- if (rc1 == -1)
- {
- throw std::runtime_error("Error binding listener socket; is the address valid e.g. tcp://127.0.0.1:9000");
- }
- else
- {
- cPtr->TextOutput("Listener SUB connected to port: " + std::to_string(rc1));
- }
- //Signal OK to instantiator
- zsock_signal(pipe, 0);
- //Setup reactor
- zloop_t *loop = zloop_new();
- assert(loop);
- zloop_set_verbose(loop, true);//verbose
- // Register the socket reader and its callback
- int rc = zloop_reader(loop, listener, static_inbound_event, cPtr);
- assert(rc == 0);
- //Register the timed event
- int timer_id = zloop_timer(loop, 10000, 0, static_check_client_status_event, cPtr);//zero means run indefinately
- //zloop_reader_set_tolerant(loop, listener);
- zloop_set_nonstop(loop, false);
- zloop_start(loop);
- // Monitor for shutdown signals
- while (!cPtr->cease)
- {
- zclock_sleep(250);
- cPtr->cease = cPtr->check_term(pipe);
- if (cPtr->cease == true)
- {
- cPtr->TextOutput("Ceasing on zactor termination.");
- break;
- }
- zmsg_t *msg = zmsg_recv_nowait(listenermon);
- if (msg)
- {
- std::string event_type(zmsg_popstr(msg));
- cPtr->TextOutput("Event: " + event_type);
- if (event_type == "DISCONNECTED")
- {
- cPtr->cease = true;
- cPtr->TextOutput("Ceasing on disconnection");
- //cPtr->TextOutput("DISCONNECTED");
- break;
- }
- }
- zmsg_destroy(&msg);
- }
- //cPtr->TextOutput("Freeing memory in listener task.");
- // Free all memory we used
- zloop_reader_end(loop, listener);
- zloop_timer_end(loop, timer_id);
- zloop_destroy(&loop);
- zactor_destroy(&listenermon);
- zsock_destroy(&listener);
- zcert_destroy(&client_cert);
- //Final act
- cPtr->finished = true;
- cPtr->TextOutput("Leaving listener task.");
- }
- catch (std::runtime_error &rte)
- {
- std::string err = rte.what();
- std::string errConc = "Listener: Runtime Exception: " + err;
- cPtr->TextOutput(errConc);
- }
- catch (std::exception &ex)
- {
- std::string err = ex.what();
- std::string errConc = "Listener: Exception: " + err;
- cPtr->TextOutput(errConc);
- }
- catch (...)
- {
- std::string errConc = "Listener: Unknown Exception: ";
- cPtr->TextOutput(errConc);
- }
- }
- int inbound_event(zsock_t *reader, Listener* cPtr)
- {
- zmsg_t *msgIn = zmsg_recv_nowait(reader);
- if (msgIn)
- {
- if (zmsg_size(msgIn) == 4)//i.e. number of frames (0 or more).
- {
- std::string uuid(zmsg_popstr(msgIn));
- std::string type(zmsg_popstr(msgIn));
- std::string source(zmsg_popstr(msgIn));
- std::string port(zmsg_popstr(msgIn));
- //cPtr->TextOutput("Received: " + uuid + " " + type + " router port: " + port);
- //If the uuid is not present in the map and this is an ADVERT message then instantiate a CloneConsumer to handle the Source.
- if (type == "ADVERT" && (cPtr->uuidMap).end() == (cPtr->uuidMap).find(uuid))
- {
- std::lock_guard<std::mutex> guard(cPtr->map_protector);
- auto found = (cPtr->desired_sources).find(source);
- if (found != (cPtr->desired_sources).end())//if the advert is from a desired source.
- {
- (cPtr->uuidMap).insert(std::make_pair(uuid, std::shared_ptr<Client>(new Client(cPtr->ip, port, cPtr->curve_folder))));
- cPtr->TextOutput("Listener Created: " + uuid);
- }
- }
- }//size of message
- //zmsg_print(msgIn);
- }//if msg
- else
- return 1;//no message
- zmsg_destroy(&msgIn);
- return 0;
- }
- int check_client_status_event(Listener* cPtr)
- {
- std::lock_guard<std::mutex> guard(cPtr->map_protector);
- //TextOutput("Scheduled Check");
- auto iter = uuidMap.begin();
- while (iter != uuidMap.end())
- {
- //TextOutput("Scheduled Check: Iteration");
- if ((iter->second)->isFinished() == true)
- {
- cPtr->TextOutput("Removed client: UUID: " + iter->first);
- uuidMap.erase(iter++);//post increment; use the iterator value in the function, then increment it.
- }
- else
- {
- ++iter;//advance
- }
- }
- return 0;
- }
- bool check_term(zsock_t *pipe)
- {
- bool terminated(false);
- zmsg_t *msg = zmsg_recv_nowait(pipe);
- if (msg)
- {
- std::string command(zmsg_popstr(msg));
- // All actors must handle $TERM in this way
- TextOutput("Command received: " + command);
- if (command == "$TERM")
- terminated = true;
- else
- {
- TextOutput("E: invalid message to actor");
- }
- }
- zmsg_destroy(&msg);
- return terminated;
- }
- //Helper function
- inline
- void TextOutput(const std::string &out)
- {
- #if defined OUTPUTDEBUGVIEW
- OutputDebugStringA(("CloneClient: Listener: " + out).c_str());
- #else
- std::cout << "CloneClient: Listener: " << out.c_str() << std::endl;
- #endif
- }
- };
- #endif
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement