Advertisement
Guest User

Untitled

a guest
Mar 30th, 2018
312
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 4.47 KB | None | 0 0
  1. #include <websocketpp/config/asio_no_tls.hpp>
  2.  
  3. #include <websocketpp/server.hpp>
  4.  
  5. #include <iostream>
  6. #include <set>
  7.  
  8. /*#include <boost/thread.hpp>
  9. #include <boost/thread/mutex.hpp>
  10. #include <boost/thread/condition_variable.hpp>*/
  11. #include <websocketpp/common/thread.hpp>
  12.  
  13. typedef websocketpp::server<websocketpp::config::asio> server;
  14.  
  15. using websocketpp::connection_hdl;
  16. using websocketpp::lib::placeholders::_1;
  17. using websocketpp::lib::placeholders::_2;
  18. using websocketpp::lib::bind;
  19.  
  20. using websocketpp::lib::thread;
  21. using websocketpp::lib::mutex;
  22. using websocketpp::lib::lock_guard;
  23. using websocketpp::lib::unique_lock;
  24. using websocketpp::lib::condition_variable;
  25.  
  26. /* on_open insert connection_hdl into channel
  27.  * on_close remove connection_hdl from channel
  28.  * on_message queue send to all channels
  29.  */
  30.  
  31. enum action_type {
  32.     SUBSCRIBE,
  33.     UNSUBSCRIBE,
  34.     MESSAGE
  35. };
  36.  
  37. struct action {
  38.     action(action_type t, connection_hdl h) : type(t), hdl(h) {}
  39.     action(action_type t, connection_hdl h, server::message_ptr m)
  40.       : type(t), hdl(h), msg(m) {}
  41.  
  42.     action_type type;
  43.     websocketpp::connection_hdl hdl;
  44.     server::message_ptr msg;
  45. };
  46.  
  47. class broadcast_server {
  48. public:
  49.     broadcast_server() {
  50.         // Initialize Asio Transport
  51.         m_server.init_asio();
  52.  
  53.         // Register handler callbacks
  54.         m_server.set_open_handler(bind(&broadcast_server::on_open,this,::_1));
  55.         m_server.set_close_handler(bind(&broadcast_server::on_close,this,::_1));
  56.         m_server.set_message_handler(bind(&broadcast_server::on_message,this,::_1,::_2));
  57.     }
  58.  
  59.     void run(uint16_t port) {
  60.         // listen on specified port
  61.         m_server.listen(port);
  62.  
  63.         // Start the server accept loop
  64.         m_server.start_accept();
  65.  
  66.         // Start the ASIO io_service run loop
  67.         try {
  68.             m_server.run();
  69.         } catch (const std::exception & e) {
  70.             std::cout << e.what() << std::endl;
  71.         }
  72.     }
  73.  
  74.     void on_open(connection_hdl hdl) {
  75.         {
  76.             lock_guard<mutex> guard(m_action_lock);
  77.             //std::cout << "on_open" << std::endl;
  78.             m_actions.push(action(SUBSCRIBE,hdl));
  79.         }
  80.         m_action_cond.notify_one();
  81.     }
  82.  
  83.     void on_close(connection_hdl hdl) {
  84.         {
  85.             lock_guard<mutex> guard(m_action_lock);
  86.             //std::cout << "on_close" << std::endl;
  87.             m_actions.push(action(UNSUBSCRIBE,hdl));
  88.         }
  89.         m_action_cond.notify_one();
  90.     }
  91.  
  92.     void on_message(connection_hdl hdl, server::message_ptr msg) {
  93.         // queue message up for sending by processing thread
  94.         {
  95.             lock_guard<mutex> guard(m_action_lock);
  96.             //std::cout << "on_message" << std::endl;
  97.             m_actions.push(action(MESSAGE,hdl,msg));
  98.         }
  99.         m_action_cond.notify_one();
  100.     }
  101.  
  102.     void process_messages() {
  103.         while(1) {
  104.             unique_lock<mutex> lock(m_action_lock);
  105.  
  106.             while(m_actions.empty()) {
  107.                 m_action_cond.wait(lock);
  108.             }
  109.  
  110.             action a = m_actions.front();
  111.             m_actions.pop();
  112.  
  113.             lock.unlock();
  114.  
  115.             if (a.type == SUBSCRIBE) {
  116.                 lock_guard<mutex> guard(m_connection_lock);
  117.                 m_connections.insert(a.hdl);
  118.             } else if (a.type == UNSUBSCRIBE) {
  119.                 lock_guard<mutex> guard(m_connection_lock);
  120.                 m_connections.erase(a.hdl);
  121.             } else if (a.type == MESSAGE) {
  122.                 lock_guard<mutex> guard(m_connection_lock);
  123.  
  124.                 con_list::iterator it;
  125.                 for (it = m_connections.begin(); it != m_connections.end(); ++it) {
  126.                     m_server.send(*it,a.msg);
  127.                 }
  128.             } else {
  129.                 // undefined.
  130.             }
  131.         }
  132.     }
  133. private:
  134.     typedef std::set<connection_hdl,std::owner_less<connection_hdl> > con_list;
  135.  
  136.     server m_server;
  137.     con_list m_connections;
  138.     std::queue<action> m_actions;
  139.  
  140.     mutex m_action_lock;
  141.     mutex m_connection_lock;
  142.     condition_variable m_action_cond;
  143. };
  144.  
  145. int main() {
  146.     try {
  147.     broadcast_server server_instance;
  148.  
  149.     // Start a thread to run the processing loop
  150.     thread t(bind(&broadcast_server::process_messages,&server_instance));
  151.  
  152.     // Run the asio loop with the main thread
  153.     server_instance.run(9002);
  154.  
  155.     t.join();
  156.  
  157.     } catch (websocketpp::exception const & e) {
  158.         std::cout << e.what() << std::endl;
  159.     }
  160. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement