Advertisement
Guest User

boost::asio & nodejs HTTP parser-based C++ HTTP-proxy exampl

a guest
May 21st, 2019
276
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 15.60 KB | None | 0 0
  1. #include "HttpProxy.h"
  2. #include "Log.h"
  3.  
  4. #include <boost/bind.hpp>
  5. #include <boost/noncopyable.hpp>
  6. #include <boost/enable_shared_from_this.hpp>
  7.  
  8. HttpProxy::HttpProxy() :
  9.     _proxyCallback{},
  10.     _ioService{},
  11.     _resolver{},
  12.     _ioServiceRunning(false),
  13.     _acceptor{},
  14.     _acceptSocket{},
  15.     _ioThreadPool{},
  16.     _backendEndpoint{}
  17. {}
  18.  
  19. HttpProxy::~HttpProxy()
  20. {}
  21.  
  22. void HttpProxy::start(std::size_t listenPort, std::size_t threadsAmount, ProxyCallback cb)
  23. {
  24.     _proxyCallback = cb;
  25.  
  26.     _ioService.reset(new io_service());
  27.     _resolver.reset(new ip::tcp::resolver(*_ioService));
  28.  
  29.     _acceptor.reset(new ip::tcp::acceptor(*_ioService, ip::tcp::endpoint(ip::tcp::v4(), listenPort)));
  30.     _acceptor->set_option(ip::tcp::acceptor::reuse_address(true));
  31.     _acceptSocket.reset(new ip::tcp::socket(*_ioService));
  32.     _acceptor->async_accept(*_acceptSocket, boost::bind(&HttpProxy::acceptHandler, this, _1));
  33.     DEBUG(log()) << "Starting service";
  34.     for (size_t i = 0;  i < threadsAmount; ++i) {
  35.         _ioThreadPool.create_thread([this] () {
  36.             try {
  37.                 _ioService->run();
  38.             } catch (std::exception& e) {
  39.                 CRITICAL(log()) << "I/O service execution error: " << e.what();
  40.             }
  41.         });
  42.     }
  43.     _ioServiceRunning = true;
  44.     DEBUG(log()) << "Service started";
  45. }
  46.  
  47. void HttpProxy::stop()
  48. {
  49.     _ioServiceRunning = false;
  50.  
  51.     DEBUG(log()) << "Stopping accepting requests";
  52.     _acceptor->cancel();
  53.     _acceptor->close();
  54.  
  55.     // TODO: Awaiting for all current sessions to complete?
  56.  
  57.     DEBUG(log()) << "Stopping I/O-service";
  58.     _ioService->stop();
  59.     _ioThreadPool.join_all();
  60. }
  61.  
  62. void HttpProxy::acceptHandler(const error_code& error)
  63. {
  64.     if (error) {
  65.         if (_ioServiceRunning) {
  66.             CRITICAL(log()) << "Accepting connection error: " << error.message();
  67.         } else {
  68.             DEBUG(log()) << "Accepting connection stopped";
  69.         }
  70.         return;
  71.     }
  72.     DEBUG(log()) << "Connection accepted";
  73.  
  74.     boost::shared_ptr<HttpTask> task(new HttpTask(*this, _acceptSocket));
  75.     _acceptSocket.reset(new ip::tcp::socket(*_ioService));
  76.     _acceptor->async_accept(*_acceptSocket, boost::bind(&HttpProxy::acceptHandler, this, _1));
  77.     task->async_execute();
  78. }
  79.  
  80. //------------------------------------------------------------------------------
  81.  
  82. HttpProxy::HttpTask::HttpTask(HttpProxy& throttle, std::unique_ptr<ip::tcp::socket>& s) :
  83.     _proxy(throttle),
  84.     _proxyAct{false, std::string{}, 0U, std::string{}},
  85.     _clientSocket{std::move(s)},
  86.     _requestParser{},
  87.     _requestParserSettings{},
  88.     _requestMethod{},
  89.     _requestUri{},
  90.     _parsingRequestHeaderField{false},
  91.     _requestHeaderField{},
  92.     _requestHeaderValue{},
  93.     _requestHeaders{InitialHeadersContainerSize}, // Reserve elements
  94.     _requestHeadersComplete{false},
  95.     _requestComplete{false},
  96.     _messageBuffer{},
  97.     _requestSize{0U},
  98.     _bytesSentToBackend{0U},
  99.     _backendSocket{},
  100.     _responseParser{},
  101.     _responseParserSettings{},
  102.     _responseComplete{false},
  103.     _responseSize{0U},
  104.     _bytesSentToClient{0U}
  105. {
  106.     _requestHeaders.clear();
  107.  
  108.     http_parser_init(&_requestParser, HTTP_REQUEST);
  109.     _requestParser.data = this;
  110.     _requestParserSettings.on_url = HttpProxy::HttpTask::onUrlCallback;
  111.     _requestParserSettings.on_header_field = HttpProxy::HttpTask::onRequestHeaderFieldCallback;
  112.     _requestParserSettings.on_header_value = HttpProxy::HttpTask::onRequestHeaderValueCallback;
  113.     _requestParserSettings.on_headers_complete = HttpProxy::HttpTask::onRequestHeadersCompleteCallback;
  114.     _requestParserSettings.on_message_complete = HttpProxy::HttpTask::onRequestCompleteCallback;
  115.  
  116.     http_parser_init(&_responseParser, HTTP_RESPONSE);
  117.     _responseParser.data = this;
  118.     _responseParserSettings.on_message_complete = HttpProxy::HttpTask::onResponseCompleteCallback;
  119.  
  120.     DEBUG(log()) << "Serving of client has been started";
  121. }
  122.  
  123. HttpProxy::HttpTask::~HttpTask()
  124. {
  125.     _clientSocket->close();
  126.     // TODO: Close backend service socket if opened
  127.     DEBUG(log()) << "Serving of client has been completed";
  128. }
  129.  
  130. void HttpProxy::HttpTask::async_execute()
  131. {
  132.     _clientSocket->async_receive(buffer(_messageBuffer), boost::bind(&HttpTask::onReadRequest,
  133.                 shared_from_this(), _1, _2));
  134. }
  135.  
  136. void HttpProxy::HttpTask::onReadRequest(const error_code& error, std::size_t nreceived)
  137. {
  138.     if (error) {
  139.         CRITICAL(log()) << "Error reading HTTP-request from the client: " << error.message();
  140.         return;
  141.     }
  142.     DEBUG(log()) << nreceived << " bytes have been received from the client";
  143.  
  144.     int nparsed = http_parser_execute(&_requestParser, &_requestParserSettings, _messageBuffer + _requestSize, nreceived);
  145.  
  146.     _requestSize += nreceived;
  147.     DEBUG(log()) << _requestSize << " total bytes have been received/parsed from the client";
  148.  
  149.     DEBUG(log()) << "HTTP-request parser errno: \"" << http_errno_name(HTTP_PARSER_ERRNO(&_requestParser)) << "\", \"" <<
  150.         http_errno_description(HTTP_PARSER_ERRNO(&_requestParser)) << '"';
  151.     if (HTTP_PARSER_ERRNO(&_requestParser) != HPE_OK) {
  152.         // TODO: Return bad request to the client
  153.         CRITICAL(log()) << "HTTP-request parsing error: " << http_errno_description(HTTP_PARSER_ERRNO(&_requestParser));
  154.         return;
  155.     }
  156.  
  157.     if (_requestHeadersComplete) {
  158.         _proxyAct = _proxy._proxyCallback(_requestMethod, _requestUri, _requestHeaders);
  159.         if (!_proxyAct.shouldProxy) {
  160.             // TODO: Send 'proxyAct.httpResponse' value to the client
  161.             return;
  162.         }
  163.     }
  164.  
  165.     if (_requestComplete) {
  166.         DEBUG(log()) << "HTTP-request is complete -> resolving " << _proxyAct.backendHost << " backend service FQDN";
  167.         _proxy._resolver->async_resolve(ip::tcp::resolver::query(_proxyAct.backendHost, "http"),
  168.                 boost::bind(&HttpTask::onBackendHostResolved, shared_from_this(), _1, _2));
  169.         return;
  170.     }
  171.  
  172.     // TODO: Special processing if nparsed < nreceived
  173.     DEBUG(log()) << "HTTP-request is not complete -> reading more data";
  174.     std::size_t capacity = HttpMessageBufferSize - _requestSize;
  175.     if (capacity == 0U) {
  176.         CRITICAL(log()) << "HTTP-request buffer overflow detected";
  177.         // TODO: Return error to client
  178.         return;
  179.     }
  180.     _clientSocket->async_receive(buffer(_messageBuffer + _requestSize, capacity), boost::bind(&HttpTask::onReadRequest,
  181.                 shared_from_this(), _1, _2));
  182. }
  183.  
  184. void HttpProxy::HttpTask::onBackendHostResolved(const error_code& error, ip::tcp::resolver::iterator iter)
  185. {
  186.     if (error) {
  187.         CRITICAL(log()) << "Error resolving \"" << _proxyAct.backendHost << "\" backend service FQDN: " << error.message();
  188.         return;
  189.     }
  190.     _proxy._backendEndpoint = iter->endpoint();
  191.     _proxy._backendEndpoint.port(_proxyAct.backendPort);
  192.     DEBUG(log()) << "Backend service FQDN \"" << _proxyAct.backendHost << "\" has been resolved -> connecting to " <<
  193.         _proxy._backendEndpoint << " backend service endpoint";
  194.     _backendSocket.reset(new ip::tcp::socket(*_proxy._ioService));
  195.     _backendSocket->async_connect(_proxy._backendEndpoint, boost::bind(&HttpTask::onBackendConnected, shared_from_this(), _1));
  196. }
  197.  
  198. void HttpProxy::HttpTask::onBackendConnected(const error_code& error)
  199. {
  200.     if (error) {
  201.         CRITICAL(log()) << "Error connecting to " << _proxy._backendEndpoint << " backend service endpoint: " << error.message();
  202.         return;
  203.     }
  204.     DEBUG(log()) << "Connected to " << _proxy._backendEndpoint << " backend service endpoint -> sending HTTP-request";
  205.     _backendSocket->async_send(buffer(_messageBuffer, _requestSize),
  206.             boost::bind(&HttpTask::onWriteRequest, shared_from_this(), _1, _2));
  207. }
  208.  
  209. void HttpProxy::HttpTask::onWriteRequest(const error_code& error, std::size_t nsent)
  210. {
  211.     if (error) {
  212.         CRITICAL(log()) << "Error sending HTTP-request to " << _proxy._backendEndpoint << " backend service: " << error.message();
  213.         return;
  214.     }
  215.     DEBUG(log()) << nsent << " bytes have been sent to backend service";
  216.     _bytesSentToBackend += nsent;
  217.     DEBUG(log()) << _bytesSentToBackend << " total bytes have been sent to backend service";
  218.     if (_bytesSentToBackend < _requestSize) {
  219.         DEBUG(log()) << "Sending the remaining " << (_requestSize - _bytesSentToBackend) << " bytes to backend service";
  220.         _backendSocket->async_send(buffer(_messageBuffer + _bytesSentToBackend, _requestSize - _bytesSentToBackend),
  221.                 boost::bind(&HttpTask::onWriteRequest, shared_from_this(), _1, _2));
  222.         return;
  223.     } else if (_bytesSentToBackend > _requestSize) {
  224.         CRITICAL(log()) << "Total bytes sent to " << _proxy._backendEndpoint << " backend service (" <<
  225.             _bytesSentToBackend << " is greater then HTTP-request size (" << _requestSize << ')';
  226.         return;
  227.     }
  228.  
  229.     DEBUG(log()) << "Client HTTP-request has been sent to backend service -> reading HTTP-response";
  230.     _backendSocket->async_receive(buffer(_messageBuffer), boost::bind(&HttpTask::onReadResponse,
  231.                 shared_from_this(), _1, _2));
  232. }
  233.  
  234. void HttpProxy::HttpTask::onReadResponse(const error_code& error, std::size_t nreceived)
  235. {
  236.     if (error) {
  237.         CRITICAL(log()) << "Error reading HTTP-response from " << _proxy._backendEndpoint << " backend service: " << error.message();
  238.         return;
  239.     }
  240.     DEBUG(log()) << nreceived << " bytes have been received from " << _proxy._backendEndpoint << " backend service";
  241.  
  242.     int nparsed = http_parser_execute(&_responseParser, &_responseParserSettings, _messageBuffer + _responseSize, nreceived);
  243.     DEBUG(log()) << nparsed << '/' << nreceived << " bytes of HTTP-response are parsed";
  244.  
  245.     _responseSize += nreceived;
  246.     DEBUG(log()) << _responseSize << " total bytes have been received/parsed from " << _proxy._backendEndpoint << " backend service";
  247.  
  248.     DEBUG(log()) << "HTTP-response parser errno: \"" << http_errno_name(HTTP_PARSER_ERRNO(&_responseParser)) << "\", \"" <<
  249.         http_errno_description(HTTP_PARSER_ERRNO(&_responseParser)) << '"';
  250.     if (HTTP_PARSER_ERRNO(&_responseParser) != HPE_OK) {
  251.         CRITICAL(log()) << "HTTP-response parsing error: " << http_errno_name(HTTP_PARSER_ERRNO(&_responseParser));
  252.         // TODO: Return error to the client
  253.         return;
  254.     }
  255.     if (_responseComplete) {
  256.         DEBUG(log()) << "HTTP-response is complete -> sending it back to client";
  257.         _clientSocket->async_send(buffer(_messageBuffer, _responseSize),
  258.                 boost::bind(&HttpTask::onWriteResponse, shared_from_this(), _1, _2));
  259.         return;
  260.     }
  261.  
  262.     // TODO: Special processing if nparsed < nreceived
  263.     DEBUG(log()) << "HTTP-response is not complete -> reading more data";
  264.     std::size_t capacity = HttpMessageBufferSize - _responseSize;
  265.     if (capacity == 0U) {
  266.         CRITICAL(log()) << "HTTP-response buffer overflow detected";
  267.         // TODO: Return error to client
  268.         return;
  269.     }
  270.     _backendSocket->async_receive(buffer(_messageBuffer + _responseSize, capacity), boost::bind(&HttpTask::onReadResponse,
  271.                 shared_from_this(), _1, _2));
  272. }
  273.  
  274. void HttpProxy::HttpTask::onWriteResponse(const error_code& error, std::size_t nsent)
  275. {
  276.     if (error) {
  277.         CRITICAL(log()) << "Error sending HTTP-response to client: " << error.message();
  278.         return;
  279.     }
  280.     DEBUG(log()) << nsent << " bytes have been sent to client";
  281.     _bytesSentToClient += nsent;
  282.     DEBUG(log()) << _bytesSentToClient << " total bytes have been sent to client";
  283.     if (_bytesSentToClient < _responseSize) {
  284.         DEBUG(log()) << "Sending the remaining " << (_responseSize - _bytesSentToClient) << " bytes to client";
  285.         _backendSocket->async_send(buffer(_messageBuffer + _bytesSentToClient, _responseSize - _bytesSentToClient),
  286.                 boost::bind(&HttpTask::onWriteRequest, shared_from_this(), _1, _2));
  287.         return;
  288.     } else if (_bytesSentToClient > _responseSize) {
  289.         CRITICAL(log()) << "Total bytes sent to client (" << _bytesSentToClient << " is greater then HTTP-response size (" << _responseSize << ')';
  290.         return;
  291.     }
  292. }
  293.  
  294. int HttpProxy::HttpTask::onUrlCallback(http_parser * parser, const char * at, size_t length)
  295. {
  296.     HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
  297.     const char * rm = http_method_str(static_cast<http_method>(parser->method));
  298.     httpTask->_requestMethod = BufferChunk(rm, std::strlen(rm));
  299.     httpTask->_requestUri = BufferChunk(at, length);
  300.     DEBUG(log()) << "HTTP-request method is: \"" << std::string(httpTask->_requestMethod.ptr, httpTask->_requestMethod.len) << '"';
  301.     DEBUG(log()) << "HTTP-request URL is: \"" << std::string(at, length) << '"';
  302.     return 0;
  303. }
  304.  
  305. int HttpProxy::HttpTask::onRequestHeaderFieldCallback(http_parser* parser, const char *at, size_t length)
  306. {
  307.     HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
  308.     DEBUG(log()) << "HttpProxy::HttpTask::onRequestHeaderFieldCallback(): Method called on : \"" << std::string(at, length) << "\" string";
  309.  
  310.     if (httpTask->_parsingRequestHeaderField) {
  311.         httpTask->_requestHeaderField.len += length;
  312.     } else {
  313.         // Adding previous header to httpTask->_requestHeaders
  314.         if (httpTask->_requestHeaderField.ptr != 0) {
  315.             httpTask->_requestHeaders.push_back(Header(httpTask->_requestHeaderField, httpTask->_requestHeaderValue));
  316.         }
  317.         httpTask->_requestHeaderField.ptr = at;
  318.         httpTask->_requestHeaderField.len = length;
  319.         httpTask->_parsingRequestHeaderField = true;
  320.     }
  321.  
  322.     return 0;
  323. }
  324.  
  325. int HttpProxy::HttpTask::onRequestHeaderValueCallback(http_parser* parser, const char *at, size_t length)
  326. {
  327.     HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
  328.     DEBUG(log()) << "HttpProxy::HttpTask::onRequestHeaderValueCallback(): Method called on : \"" << std::string(at, length) << "\" string";
  329.  
  330.     if (httpTask->_parsingRequestHeaderField) {
  331.         httpTask->_requestHeaderValue.ptr = at;
  332.         httpTask->_requestHeaderValue.len = length;
  333.         httpTask->_parsingRequestHeaderField = false;
  334.     } else {
  335.         httpTask->_requestHeaderValue.len += length;
  336.     }
  337.  
  338.     return 0;
  339. }
  340.  
  341. int HttpProxy::HttpTask::onRequestHeadersCompleteCallback(http_parser* parser)
  342. {
  343.     HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
  344.     if (httpTask->_requestHeaderField.ptr != 0) {
  345.         httpTask->_requestHeaders.push_back(Header(httpTask->_requestHeaderField, httpTask->_requestHeaderValue));
  346.     }
  347.     httpTask->_requestHeadersComplete = true;
  348.     DEBUG(log()) << httpTask->_requestHeaders.size() << " HTTP-request headers have beed received";
  349.     for (std::size_t i = 0U; i < httpTask->_requestHeaders.size(); ++i) {
  350.         DEBUG(log()) << "\t- " << std::string(httpTask->_requestHeaders[i].first.ptr, httpTask->_requestHeaders[i].first.len) <<
  351.             ": " << std::string(httpTask->_requestHeaders[i].second.ptr, httpTask->_requestHeaders[i].second.len);
  352.     }
  353.     return 0;
  354. }
  355.  
  356. int HttpProxy::HttpTask::onRequestCompleteCallback(http_parser* parser)
  357. {
  358.     HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
  359.     httpTask->_requestComplete = true;
  360.     DEBUG(log()) << "Complete HTTP-request has beed received";
  361.     return 0;
  362. }
  363.  
  364. int HttpProxy::HttpTask::onResponseCompleteCallback(http_parser* parser)
  365. {
  366.     HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
  367.     httpTask->_responseComplete = true;
  368.     DEBUG(log()) << "Complete HTTP-response has beed received";
  369.     return 0;
  370. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement