Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include "HttpProxy.h"
- #include "Log.h"
- #include <boost/bind.hpp>
- #include <boost/noncopyable.hpp>
- #include <boost/enable_shared_from_this.hpp>
- HttpProxy::HttpProxy() :
- _proxyCallback{},
- _ioService{},
- _resolver{},
- _ioServiceRunning(false),
- _acceptor{},
- _acceptSocket{},
- _ioThreadPool{},
- _backendEndpoint{}
- {}
- HttpProxy::~HttpProxy()
- {}
- void HttpProxy::start(std::size_t listenPort, std::size_t threadsAmount, ProxyCallback cb)
- {
- _proxyCallback = cb;
- _ioService.reset(new io_service());
- _resolver.reset(new ip::tcp::resolver(*_ioService));
- _acceptor.reset(new ip::tcp::acceptor(*_ioService, ip::tcp::endpoint(ip::tcp::v4(), listenPort)));
- _acceptor->set_option(ip::tcp::acceptor::reuse_address(true));
- _acceptSocket.reset(new ip::tcp::socket(*_ioService));
- _acceptor->async_accept(*_acceptSocket, boost::bind(&HttpProxy::acceptHandler, this, _1));
- DEBUG(log()) << "Starting service";
- for (size_t i = 0; i < threadsAmount; ++i) {
- _ioThreadPool.create_thread([this] () {
- try {
- _ioService->run();
- } catch (std::exception& e) {
- CRITICAL(log()) << "I/O service execution error: " << e.what();
- }
- });
- }
- _ioServiceRunning = true;
- DEBUG(log()) << "Service started";
- }
- void HttpProxy::stop()
- {
- _ioServiceRunning = false;
- DEBUG(log()) << "Stopping accepting requests";
- _acceptor->cancel();
- _acceptor->close();
- // TODO: Awaiting for all current sessions to complete?
- DEBUG(log()) << "Stopping I/O-service";
- _ioService->stop();
- _ioThreadPool.join_all();
- }
- void HttpProxy::acceptHandler(const error_code& error)
- {
- if (error) {
- if (_ioServiceRunning) {
- CRITICAL(log()) << "Accepting connection error: " << error.message();
- } else {
- DEBUG(log()) << "Accepting connection stopped";
- }
- return;
- }
- DEBUG(log()) << "Connection accepted";
- boost::shared_ptr<HttpTask> task(new HttpTask(*this, _acceptSocket));
- _acceptSocket.reset(new ip::tcp::socket(*_ioService));
- _acceptor->async_accept(*_acceptSocket, boost::bind(&HttpProxy::acceptHandler, this, _1));
- task->async_execute();
- }
- //------------------------------------------------------------------------------
- HttpProxy::HttpTask::HttpTask(HttpProxy& throttle, std::unique_ptr<ip::tcp::socket>& s) :
- _proxy(throttle),
- _proxyAct{false, std::string{}, 0U, std::string{}},
- _clientSocket{std::move(s)},
- _requestParser{},
- _requestParserSettings{},
- _requestMethod{},
- _requestUri{},
- _parsingRequestHeaderField{false},
- _requestHeaderField{},
- _requestHeaderValue{},
- _requestHeaders{InitialHeadersContainerSize}, // Reserve elements
- _requestHeadersComplete{false},
- _requestComplete{false},
- _messageBuffer{},
- _requestSize{0U},
- _bytesSentToBackend{0U},
- _backendSocket{},
- _responseParser{},
- _responseParserSettings{},
- _responseComplete{false},
- _responseSize{0U},
- _bytesSentToClient{0U}
- {
- _requestHeaders.clear();
- http_parser_init(&_requestParser, HTTP_REQUEST);
- _requestParser.data = this;
- _requestParserSettings.on_url = HttpProxy::HttpTask::onUrlCallback;
- _requestParserSettings.on_header_field = HttpProxy::HttpTask::onRequestHeaderFieldCallback;
- _requestParserSettings.on_header_value = HttpProxy::HttpTask::onRequestHeaderValueCallback;
- _requestParserSettings.on_headers_complete = HttpProxy::HttpTask::onRequestHeadersCompleteCallback;
- _requestParserSettings.on_message_complete = HttpProxy::HttpTask::onRequestCompleteCallback;
- http_parser_init(&_responseParser, HTTP_RESPONSE);
- _responseParser.data = this;
- _responseParserSettings.on_message_complete = HttpProxy::HttpTask::onResponseCompleteCallback;
- DEBUG(log()) << "Serving of client has been started";
- }
- HttpProxy::HttpTask::~HttpTask()
- {
- _clientSocket->close();
- // TODO: Close backend service socket if opened
- DEBUG(log()) << "Serving of client has been completed";
- }
- void HttpProxy::HttpTask::async_execute()
- {
- _clientSocket->async_receive(buffer(_messageBuffer), boost::bind(&HttpTask::onReadRequest,
- shared_from_this(), _1, _2));
- }
- void HttpProxy::HttpTask::onReadRequest(const error_code& error, std::size_t nreceived)
- {
- if (error) {
- CRITICAL(log()) << "Error reading HTTP-request from the client: " << error.message();
- return;
- }
- DEBUG(log()) << nreceived << " bytes have been received from the client";
- int nparsed = http_parser_execute(&_requestParser, &_requestParserSettings, _messageBuffer + _requestSize, nreceived);
- _requestSize += nreceived;
- DEBUG(log()) << _requestSize << " total bytes have been received/parsed from the client";
- DEBUG(log()) << "HTTP-request parser errno: \"" << http_errno_name(HTTP_PARSER_ERRNO(&_requestParser)) << "\", \"" <<
- http_errno_description(HTTP_PARSER_ERRNO(&_requestParser)) << '"';
- if (HTTP_PARSER_ERRNO(&_requestParser) != HPE_OK) {
- // TODO: Return bad request to the client
- CRITICAL(log()) << "HTTP-request parsing error: " << http_errno_description(HTTP_PARSER_ERRNO(&_requestParser));
- return;
- }
- if (_requestHeadersComplete) {
- _proxyAct = _proxy._proxyCallback(_requestMethod, _requestUri, _requestHeaders);
- if (!_proxyAct.shouldProxy) {
- // TODO: Send 'proxyAct.httpResponse' value to the client
- return;
- }
- }
- if (_requestComplete) {
- DEBUG(log()) << "HTTP-request is complete -> resolving " << _proxyAct.backendHost << " backend service FQDN";
- _proxy._resolver->async_resolve(ip::tcp::resolver::query(_proxyAct.backendHost, "http"),
- boost::bind(&HttpTask::onBackendHostResolved, shared_from_this(), _1, _2));
- return;
- }
- // TODO: Special processing if nparsed < nreceived
- DEBUG(log()) << "HTTP-request is not complete -> reading more data";
- std::size_t capacity = HttpMessageBufferSize - _requestSize;
- if (capacity == 0U) {
- CRITICAL(log()) << "HTTP-request buffer overflow detected";
- // TODO: Return error to client
- return;
- }
- _clientSocket->async_receive(buffer(_messageBuffer + _requestSize, capacity), boost::bind(&HttpTask::onReadRequest,
- shared_from_this(), _1, _2));
- }
- void HttpProxy::HttpTask::onBackendHostResolved(const error_code& error, ip::tcp::resolver::iterator iter)
- {
- if (error) {
- CRITICAL(log()) << "Error resolving \"" << _proxyAct.backendHost << "\" backend service FQDN: " << error.message();
- return;
- }
- _proxy._backendEndpoint = iter->endpoint();
- _proxy._backendEndpoint.port(_proxyAct.backendPort);
- DEBUG(log()) << "Backend service FQDN \"" << _proxyAct.backendHost << "\" has been resolved -> connecting to " <<
- _proxy._backendEndpoint << " backend service endpoint";
- _backendSocket.reset(new ip::tcp::socket(*_proxy._ioService));
- _backendSocket->async_connect(_proxy._backendEndpoint, boost::bind(&HttpTask::onBackendConnected, shared_from_this(), _1));
- }
- void HttpProxy::HttpTask::onBackendConnected(const error_code& error)
- {
- if (error) {
- CRITICAL(log()) << "Error connecting to " << _proxy._backendEndpoint << " backend service endpoint: " << error.message();
- return;
- }
- DEBUG(log()) << "Connected to " << _proxy._backendEndpoint << " backend service endpoint -> sending HTTP-request";
- _backendSocket->async_send(buffer(_messageBuffer, _requestSize),
- boost::bind(&HttpTask::onWriteRequest, shared_from_this(), _1, _2));
- }
- void HttpProxy::HttpTask::onWriteRequest(const error_code& error, std::size_t nsent)
- {
- if (error) {
- CRITICAL(log()) << "Error sending HTTP-request to " << _proxy._backendEndpoint << " backend service: " << error.message();
- return;
- }
- DEBUG(log()) << nsent << " bytes have been sent to backend service";
- _bytesSentToBackend += nsent;
- DEBUG(log()) << _bytesSentToBackend << " total bytes have been sent to backend service";
- if (_bytesSentToBackend < _requestSize) {
- DEBUG(log()) << "Sending the remaining " << (_requestSize - _bytesSentToBackend) << " bytes to backend service";
- _backendSocket->async_send(buffer(_messageBuffer + _bytesSentToBackend, _requestSize - _bytesSentToBackend),
- boost::bind(&HttpTask::onWriteRequest, shared_from_this(), _1, _2));
- return;
- } else if (_bytesSentToBackend > _requestSize) {
- CRITICAL(log()) << "Total bytes sent to " << _proxy._backendEndpoint << " backend service (" <<
- _bytesSentToBackend << " is greater then HTTP-request size (" << _requestSize << ')';
- return;
- }
- DEBUG(log()) << "Client HTTP-request has been sent to backend service -> reading HTTP-response";
- _backendSocket->async_receive(buffer(_messageBuffer), boost::bind(&HttpTask::onReadResponse,
- shared_from_this(), _1, _2));
- }
- void HttpProxy::HttpTask::onReadResponse(const error_code& error, std::size_t nreceived)
- {
- if (error) {
- CRITICAL(log()) << "Error reading HTTP-response from " << _proxy._backendEndpoint << " backend service: " << error.message();
- return;
- }
- DEBUG(log()) << nreceived << " bytes have been received from " << _proxy._backendEndpoint << " backend service";
- int nparsed = http_parser_execute(&_responseParser, &_responseParserSettings, _messageBuffer + _responseSize, nreceived);
- DEBUG(log()) << nparsed << '/' << nreceived << " bytes of HTTP-response are parsed";
- _responseSize += nreceived;
- DEBUG(log()) << _responseSize << " total bytes have been received/parsed from " << _proxy._backendEndpoint << " backend service";
- DEBUG(log()) << "HTTP-response parser errno: \"" << http_errno_name(HTTP_PARSER_ERRNO(&_responseParser)) << "\", \"" <<
- http_errno_description(HTTP_PARSER_ERRNO(&_responseParser)) << '"';
- if (HTTP_PARSER_ERRNO(&_responseParser) != HPE_OK) {
- CRITICAL(log()) << "HTTP-response parsing error: " << http_errno_name(HTTP_PARSER_ERRNO(&_responseParser));
- // TODO: Return error to the client
- return;
- }
- if (_responseComplete) {
- DEBUG(log()) << "HTTP-response is complete -> sending it back to client";
- _clientSocket->async_send(buffer(_messageBuffer, _responseSize),
- boost::bind(&HttpTask::onWriteResponse, shared_from_this(), _1, _2));
- return;
- }
- // TODO: Special processing if nparsed < nreceived
- DEBUG(log()) << "HTTP-response is not complete -> reading more data";
- std::size_t capacity = HttpMessageBufferSize - _responseSize;
- if (capacity == 0U) {
- CRITICAL(log()) << "HTTP-response buffer overflow detected";
- // TODO: Return error to client
- return;
- }
- _backendSocket->async_receive(buffer(_messageBuffer + _responseSize, capacity), boost::bind(&HttpTask::onReadResponse,
- shared_from_this(), _1, _2));
- }
- void HttpProxy::HttpTask::onWriteResponse(const error_code& error, std::size_t nsent)
- {
- if (error) {
- CRITICAL(log()) << "Error sending HTTP-response to client: " << error.message();
- return;
- }
- DEBUG(log()) << nsent << " bytes have been sent to client";
- _bytesSentToClient += nsent;
- DEBUG(log()) << _bytesSentToClient << " total bytes have been sent to client";
- if (_bytesSentToClient < _responseSize) {
- DEBUG(log()) << "Sending the remaining " << (_responseSize - _bytesSentToClient) << " bytes to client";
- _backendSocket->async_send(buffer(_messageBuffer + _bytesSentToClient, _responseSize - _bytesSentToClient),
- boost::bind(&HttpTask::onWriteRequest, shared_from_this(), _1, _2));
- return;
- } else if (_bytesSentToClient > _responseSize) {
- CRITICAL(log()) << "Total bytes sent to client (" << _bytesSentToClient << " is greater then HTTP-response size (" << _responseSize << ')';
- return;
- }
- }
- int HttpProxy::HttpTask::onUrlCallback(http_parser * parser, const char * at, size_t length)
- {
- HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
- const char * rm = http_method_str(static_cast<http_method>(parser->method));
- httpTask->_requestMethod = BufferChunk(rm, std::strlen(rm));
- httpTask->_requestUri = BufferChunk(at, length);
- DEBUG(log()) << "HTTP-request method is: \"" << std::string(httpTask->_requestMethod.ptr, httpTask->_requestMethod.len) << '"';
- DEBUG(log()) << "HTTP-request URL is: \"" << std::string(at, length) << '"';
- return 0;
- }
- int HttpProxy::HttpTask::onRequestHeaderFieldCallback(http_parser* parser, const char *at, size_t length)
- {
- HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
- DEBUG(log()) << "HttpProxy::HttpTask::onRequestHeaderFieldCallback(): Method called on : \"" << std::string(at, length) << "\" string";
- if (httpTask->_parsingRequestHeaderField) {
- httpTask->_requestHeaderField.len += length;
- } else {
- // Adding previous header to httpTask->_requestHeaders
- if (httpTask->_requestHeaderField.ptr != 0) {
- httpTask->_requestHeaders.push_back(Header(httpTask->_requestHeaderField, httpTask->_requestHeaderValue));
- }
- httpTask->_requestHeaderField.ptr = at;
- httpTask->_requestHeaderField.len = length;
- httpTask->_parsingRequestHeaderField = true;
- }
- return 0;
- }
- int HttpProxy::HttpTask::onRequestHeaderValueCallback(http_parser* parser, const char *at, size_t length)
- {
- HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
- DEBUG(log()) << "HttpProxy::HttpTask::onRequestHeaderValueCallback(): Method called on : \"" << std::string(at, length) << "\" string";
- if (httpTask->_parsingRequestHeaderField) {
- httpTask->_requestHeaderValue.ptr = at;
- httpTask->_requestHeaderValue.len = length;
- httpTask->_parsingRequestHeaderField = false;
- } else {
- httpTask->_requestHeaderValue.len += length;
- }
- return 0;
- }
- int HttpProxy::HttpTask::onRequestHeadersCompleteCallback(http_parser* parser)
- {
- HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
- if (httpTask->_requestHeaderField.ptr != 0) {
- httpTask->_requestHeaders.push_back(Header(httpTask->_requestHeaderField, httpTask->_requestHeaderValue));
- }
- httpTask->_requestHeadersComplete = true;
- DEBUG(log()) << httpTask->_requestHeaders.size() << " HTTP-request headers have beed received";
- for (std::size_t i = 0U; i < httpTask->_requestHeaders.size(); ++i) {
- DEBUG(log()) << "\t- " << std::string(httpTask->_requestHeaders[i].first.ptr, httpTask->_requestHeaders[i].first.len) <<
- ": " << std::string(httpTask->_requestHeaders[i].second.ptr, httpTask->_requestHeaders[i].second.len);
- }
- return 0;
- }
- int HttpProxy::HttpTask::onRequestCompleteCallback(http_parser* parser)
- {
- HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
- httpTask->_requestComplete = true;
- DEBUG(log()) << "Complete HTTP-request has beed received";
- return 0;
- }
- int HttpProxy::HttpTask::onResponseCompleteCallback(http_parser* parser)
- {
- HttpProxy::HttpTask * httpTask = reinterpret_cast<HttpProxy::HttpTask *>(parser->data);
- httpTask->_responseComplete = true;
- DEBUG(log()) << "Complete HTTP-response has beed received";
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement