Advertisement
schreiberstein

Boost asio unix domain socket async server 01

Jun 5th, 2019
132
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 5.08 KB | None | 0 0
  1. #include <boost/asio.hpp>
  2.  
  3. #include <thread>
  4. #include <atomic>
  5. #include <memory>
  6. #include <iostream>
  7.  
  8. using namespace boost;
  9.  
  10. class Service {
  11. public:
  12.     Service(std::shared_ptr<boost::asio::local::stream_protocol::socket> sock) :
  13.         m_sock(sock)
  14.     {}
  15.  
  16.     void StartHandling() {
  17.  
  18.         asio::async_read_until(*m_sock.get(),
  19.             m_request,
  20.             '\n',
  21.             [this](
  22.             const boost::system::error_code& ec,
  23.             std::size_t bytes_transferred)
  24.         {
  25.             onRequestReceived(ec,
  26.                 bytes_transferred);
  27.         });
  28.     }
  29.  
  30. private:
  31.     void onRequestReceived(const boost::system::error_code& ec,
  32.         std::size_t bytes_transferred) {
  33.         if (ec.value() != 0) {
  34.             std::cout << "Error occured! Error code = "
  35.                 << ec.value()
  36.                 << ". Message: " << ec.message();
  37.  
  38.             onFinish();
  39.             return;
  40.         }
  41.  
  42.         // Process the request.
  43.         m_response = ProcessRequest(m_request);
  44.  
  45.         // Initiate asynchronous write operation.
  46.         asio::async_write(*m_sock.get(),
  47.             asio::buffer(m_response),
  48.             [this](
  49.             const boost::system::error_code& ec,
  50.             std::size_t bytes_transferred)
  51.         {
  52.             onResponseSent(ec,
  53.                 bytes_transferred);
  54.         });
  55.     }
  56.  
  57.     void onResponseSent(const boost::system::error_code& ec,
  58.         std::size_t bytes_transferred) {
  59.         if (ec.value() != 0) {
  60.             std::cout << "Error occured! Error code = "
  61.                 << ec.value()
  62.                 << ". Message: " << ec.message();
  63.         }
  64.  
  65.         onFinish();
  66.     }
  67.  
  68.     // Here we perform the cleanup.
  69.     void onFinish() {
  70.         delete this;
  71.     }
  72.  
  73.     std::string ProcessRequest(asio::streambuf& request) {
  74.  
  75.         // In this method we parse the request, process it
  76.         // and prepare the request.
  77.  
  78.         // Emulate CPU-consuming operations.
  79.         // int i = 0;
  80.         // while (i != 1000000)
  81.         //  i++;
  82.  
  83.         // Emulate operations that block the thread
  84.         // (e.g. synch I/O operations).
  85.         std::this_thread::sleep_for(
  86.             std::chrono::milliseconds(100));
  87.  
  88.         // Prepare and return the response message.
  89.         std::string response = "Response\n";
  90.         return response;
  91.     }
  92.  
  93. private:
  94.     std::shared_ptr<boost::asio::local::stream_protocol::socket> m_sock;
  95.     std::string m_response;
  96.     asio::streambuf m_request;
  97. };
  98.  
  99. class Acceptor {
  100. public:
  101.     Acceptor(asio::io_service& ios, std::string sock_path) :
  102.         m_ios(ios),
  103.         m_acceptor(m_ios,
  104.                 boost::asio::local::stream_protocol::endpoint(sock_path)),
  105.         m_isStopped(false)
  106.     {}
  107.  
  108.     // Start accepting incoming connection requests.
  109.     void Start() {
  110.         m_acceptor.listen();
  111.         InitAccept();
  112.     }
  113.  
  114.     // Stop accepting incoming connection requests.
  115.     void Stop() {
  116.         m_isStopped.store(true);
  117.     }
  118.  
  119. private:
  120.     void InitAccept() {
  121.         std::shared_ptr<boost::asio::local::stream_protocol::socket>
  122.             sock(new boost::asio::local::stream_protocol::socket(m_ios));
  123.  
  124.         m_acceptor.async_accept(*sock.get(),
  125.             [this, sock](
  126.             const boost::system::error_code& error)
  127.         {
  128.             onAccept(error, sock);
  129.         });
  130.     }
  131.  
  132.     void onAccept(const boost::system::error_code& ec,
  133.         std::shared_ptr<boost::asio::local::stream_protocol::socket> sock)
  134.     {
  135.         if (ec.value() == 0) {
  136.             (new Service(sock))->StartHandling();
  137.         }
  138.         else {
  139.             std::cout << "Error occured! Error code = "
  140.                 << ec.value()
  141.                 << ". Message: " << ec.message();
  142.         }
  143.  
  144.         // Init next async accept operation if
  145.         // acceptor has not been stopped yet.
  146.         if (!m_isStopped.load()) {
  147.             InitAccept();
  148.         }
  149.         else {
  150.             // Stop accepting incoming connections
  151.             // and free allocated resources.
  152.             m_acceptor.close();
  153.         }
  154.     }
  155.  
  156. private:
  157.     asio::io_service& m_ios;
  158.     boost::asio::local::stream_protocol::acceptor m_acceptor;
  159.     std::atomic<bool> m_isStopped;
  160. };
  161.  
  162. class Server {
  163. public:
  164.     Server() {
  165.         m_work.reset(new asio::io_service::work(m_ios));
  166.     }
  167.  
  168.     // Start the server.
  169.     void Start(std::string socket_name,
  170.         unsigned int thread_pool_size) {
  171.  
  172.         assert(thread_pool_size > 0);
  173.  
  174.         // Create and start Acceptor.
  175.         acc.reset(new Acceptor(m_ios, socket_name));
  176.         acc->Start();
  177.  
  178.         // Create specified number of threads and
  179.         // add them to the pool.
  180.         for (unsigned int i = 0; i < thread_pool_size; i++) {
  181.             std::unique_ptr<std::thread> th(
  182.                 new std::thread([this]()
  183.                 {
  184.                     m_ios.run();
  185.                 }));
  186.  
  187.             m_thread_pool.push_back(std::move(th));
  188.         }
  189.     }
  190.  
  191.     // Stop the server.
  192.     void Stop() {
  193.         acc->Stop();
  194.         m_ios.stop();
  195.  
  196.         for (auto& th : m_thread_pool) {
  197.             th->join();
  198.         }
  199.     }
  200.  
  201. private:
  202.     asio::io_service m_ios;
  203.     std::unique_ptr<asio::io_service::work> m_work;
  204.     std::unique_ptr<Acceptor> acc;
  205.     std::vector<std::unique_ptr<std::thread>> m_thread_pool;
  206. };
  207.  
  208.  
  209. const unsigned int DEFAULT_THREAD_POOL_SIZE = 2;
  210.  
  211. int main()
  212. {
  213.     std::string socket_name = "/tmp/soccket";
  214.     ::unlink(socket_name.c_str());
  215.  
  216.     try {
  217.         Server srv;
  218.  
  219.         unsigned int thread_pool_size =
  220.             std::thread::hardware_concurrency() * 2;
  221.  
  222.         if (thread_pool_size == 0)
  223.             thread_pool_size = DEFAULT_THREAD_POOL_SIZE;
  224.  
  225.         srv.Start(socket_name, thread_pool_size);
  226.  
  227.         //std::this_thread::sleep_for(std::chrono::seconds(60));
  228.  
  229.         //srv.Stop();
  230.     }
  231.     catch (system::system_error &e) {
  232.         std::cout << "Error occured! Error code = "
  233.             << e.code() << ". Message: "
  234.             << e.what();
  235.     }
  236.  
  237.     return 0;
  238. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement