Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <sys/socket.h>
- #include <sys/types.h>
- #include <sys/un.h>
- #include <stdlib.h>
- #include <stdio.h>
- #include <unistd.h>
- #include <stdio.h>
- #include <netdb.h>
- #include <arpa/inet.h>
- #include <stdexcept>
- #include <vector>
- #include <errno.h>
- #include <iostream>
- #include <thread>
- #include <mutex>
- #include <chrono>
- #include <atomic>
- #include <memory>
- #include <vector>
- #include <list>
- #include <set>
- #include <cstdlib>
- #include <ctime>
- #include <string>
- typedef std::vector<char> Buffer;
- void appendTo (Buffer *buffer, char *begin, size_t size) {
- buffer->insert(buffer->end(), begin, begin + size);
- }
- class OkOrException {
- private:
- bool is_good_;
- std::runtime_error exception_;
- public:
- OkOrException() :
- is_good_(true),
- exception_(std::runtime_error(""))
- {}
- void setException(const std::runtime_error &exception) {
- is_good_ = false;
- exception_ = exception;
- }
- void check() const {
- if (!is_good_) {
- throw exception_;
- }
- }
- };
- typedef OkOrException Status;
- struct InternetAddress {
- private:
- std::string address_;
- int port_;
- public:
- explicit InternetAddress(std::string address, int port) :
- address_(address),
- port_(port)
- {}
- struct sockaddr getSocketAddress() const {
- struct sockaddr_in address;
- address.sin_family = AF_INET;
- address.sin_port = htons(port_);
- address.sin_addr.s_addr = inet_addr(address_.c_str());
- memset(address.sin_zero, '\0', sizeof address.sin_zero);
- return *((sockaddr *) &address);
- }
- ~InternetAddress() {}
- };
- class ConnectionSocket {
- private:
- int connection_fd_;
- Status status_;
- bool have_resources_;
- static constexpr int BUFFER_SIZE = 1024;
- public:
- ConnectionSocket()
- : have_resources_(true) {
- connection_fd_ = ::socket(AF_INET, SOCK_STREAM, 0);
- }
- ConnectionSocket(int connection_fd) :
- connection_fd_(connection_fd),
- have_resources_(true)
- {}
- ConnectionSocket(ConnectionSocket &&other) {
- connection_fd_ = other.connection_fd_;
- status_ = std::move(other.status_);
- other.have_resources_ = false;
- have_resources_ = true;
- }
- ConnectionSocket(const ConnectionSocket &) = delete;
- ConnectionSocket &operator=(const ConnectionSocket &) = delete;
- std::string getIP() const {
- struct sockaddr_in addr;
- socklen_t addrlen;
- getpeername(connection_fd_, (sockaddr*) &addr, &addrlen);
- return inet_ntoa(addr.sin_addr);
- }
- Buffer read() {
- char buffer[BUFFER_SIZE];
- Buffer result;
- while (true) {
- memset(buffer, 0, BUFFER_SIZE);
- int readed = ::recv(connection_fd_, buffer, BUFFER_SIZE, 0);
- appendTo(&result, buffer, BUFFER_SIZE);
- if (readed < BUFFER_SIZE) {
- break;
- }
- }
- return std::move(result);
- }
- void write(Buffer buffer) {
- send(connection_fd_, buffer.data(), buffer.size(), 0);
- if (errno != 0) {
- perror("Failed to write data");
- }
- }
- void readToSocket(ConnectionSocket &dest) {
- char buf[BUFFER_SIZE];
- while (true) {
- int dataLength = recv(connection_fd_, buf, BUFFER_SIZE, 0);
- if (dataLength <= 0) {
- break;
- }
- Buffer output(buf, buf + dataLength);
- dest.write(output);
- }
- }
- void connect(const std::string &host, int port) {
- struct sockaddr_in address = { AF_INET, htons(port) };
- struct hostent *h = gethostbyname(host.c_str());
- if (!h) {
- throw std::runtime_error("Failed to create socket");
- }
- if (inet_aton(inet_ntoa(*((struct in_addr *)h->h_addr)), &address.sin_addr) <= 0) {
- throw std::runtime_error("Failed to connect");
- }
- if (::connect(connection_fd_, (struct sockaddr *) &address, sizeof(address)) < 0) {
- throw std::runtime_error("Failed to connect");
- }
- }
- ~ConnectionSocket() {
- if (have_resources_) {
- ::close(connection_fd_);
- }
- }
- };
- class ServerSocket {
- private:
- int fd_;
- Status status_;
- void initSocket() {
- fd_ = ::socket(AF_INET, SOCK_STREAM, 0);
- if (fd_ < 0) {
- throw std::runtime_error("Failed to create socket");
- }
- }
- void bindToAddress(const InternetAddress &internet_address) {
- struct sockaddr address = internet_address.getSocketAddress();
- int bound = ::bind(fd_, (struct sockaddr *) &address, sizeof(address));
- if (bound == -1) {
- throw std::runtime_error("Failed to bind socket");
- }
- }
- void listenTo() {
- int listen_result = ::listen(fd_, 10);
- if (listen_result == -1) {
- throw std::runtime_error("Failed to listen");
- }
- }
- void setReuseAddress() {
- int yes = 1;
- if (::setsockopt(fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)) == -1 ) {
- throw std::runtime_error("Failed to set socket options");
- }
- }
- void closeSocket() {
- ::close(fd_);
- }
- public:
- ServerSocket(const InternetAddress &internet_address) {
- try {
- initSocket();
- setReuseAddress();
- bindToAddress(internet_address);
- listenTo();
- } catch(const std::runtime_error &exception) {
- status_.setException(exception);
- }
- }
- ConnectionSocket newConnection() {
- int new_fd = ::accept(fd_, 0, 0);
- if (new_fd < 0) {
- throw std::runtime_error("Failed to accept socket");
- }
- return std::move(ConnectionSocket(new_fd));
- }
- ServerSocket(const ServerSocket &) = delete;
- ServerSocket &operator=(const ServerSocket &) = delete;
- void check() const {
- status_.check();
- }
- ~ServerSocket() {
- closeSocket();
- }
- };
- std::string getURL(const Buffer& buffer) {
- std::string URL;
- size_t index = 0;
- while (buffer[index++] != ' ');
- while (buffer[index] != ' ') {
- URL += buffer[index];
- index++;
- }
- return URL;
- }
- std::string getHost(const std::string URL) {
- char buffer[256];
- sscanf(URL.c_str() + 7, "%[^/]", buffer);
- return buffer;
- }
- class TTask {
- public:
- virtual ~TTask() {}
- virtual void Do(std::atomic<bool> &stopFlag) = 0;
- };
- class TThreadPool {
- private:
- typedef std::shared_ptr< std::thread > TThreadPtr;
- typedef std::shared_ptr< TTask > TTaskPtr;
- std::atomic<bool> StopFlag;
- std::vector<TThreadPtr> Workers;
- std::mutex Mutex;
- std::condition_variable Condition;
- std::list<TTaskPtr> Tasks;
- std::atomic<size_t> ActiveTasks;
- void DoTopTask(std::unique_lock<std::mutex> &lock) {
- TTaskPtr task = Tasks.front();
- Tasks.pop_front();
- ++ActiveTasks;
- lock.unlock();
- task->Do(StopFlag);
- if (--ActiveTasks == 0)
- Condition.notify_all();
- }
- void ThreadFunc() {
- while (!StopFlag) {
- std::unique_lock<std::mutex> lock(Mutex);
- while (!StopFlag && Tasks.empty())
- Condition.wait(lock);
- if (!StopFlag && !Tasks.empty())
- DoTopTask(lock);
- }
- }
- public:
- TThreadPool(size_t workersCount)
- : StopFlag(false)
- , ActiveTasks(0)
- {
- Workers.reserve(workersCount);
- for (size_t i = 0; i < workersCount; ++i)
- Workers.push_back(TThreadPtr(new std::thread([this]() { ThreadFunc(); })));
- }
- ~TThreadPool() {
- try {
- Stop(false);
- } catch (...) {
- }
- }
- void Stop(bool finishTasks) {
- StopFlag.store(true);
- Condition.notify_all();
- if (finishTasks) {
- std::unique_lock<std::mutex> lock(Mutex);
- while (!Tasks.empty()) {
- DoTopTask(lock);
- lock.lock();
- }
- }
- for (auto w : Workers) {
- w->join();
- }
- }
- bool AddTask(const TTaskPtr &task) {
- if (StopFlag)
- return false;
- std::lock_guard<std::mutex> lock(Mutex);
- Tasks.push_back(task);
- Condition.notify_one();
- return true;
- }
- bool WaitAllTasks() {
- std::unique_lock<std::mutex> lock(Mutex);
- while (!StopFlag && (!Tasks.empty() || ActiveTasks)) {
- Condition.wait(lock);
- }
- return !StopFlag;
- }
- };
- class MyTask : public TTask {
- private:
- static constexpr int PORT = 80;
- ConnectionSocket connectionSocket;
- public:
- explicit MyTask(ConnectionSocket &&other)
- : connectionSocket(std::move(other))
- {}
- virtual void Do(std::atomic<bool> &stopFlag) {
- try {
- Buffer buffer = connectionSocket.read();
- ConnectionSocket workSocket;
- std::string URL = getURL(buffer);
- std::string host = getHost(URL);
- std::cout << "Client IP : " << connectionSocket.getIP() << std::endl;
- std::cout << "Requested URL: " << URL << std::endl;
- workSocket.connect(host, PORT);
- workSocket.write(buffer);
- workSocket.readToSocket(connectionSocket);
- } catch (const std::exception &ex) {
- std::cerr << ex.what() << std::endl;
- }
- }
- };
- int main() {
- static constexpr char LOCALHOST[] = "127.0.0.1";
- static constexpr int DEFAULT_PORT = 8992;
- static constexpr size_t POOL_SIZE = 128;
- InternetAddress address = InternetAddress(LOCALHOST, DEFAULT_PORT);
- ServerSocket serverSocket(address);
- try {
- serverSocket.check();
- } catch (const std::exception &ex) {
- std::cerr << ex.what() << std::endl;
- return 1;
- }
- TThreadPool pool(POOL_SIZE);
- try {
- while (true) {
- pool.AddTask(std::shared_ptr<TTask>(new MyTask(serverSocket.newConnection())));
- }
- } catch (const std::exception &ex) {
- std::cerr << ex.what() << std::endl;
- return 1;
- }
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment