Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #define ASIO_STANDALONE
- #define ASIO_HAS_STD_ADDRESSOF
- #define ASIO_HAS_STD_ARRAY
- #define ASIO_HAS_CSTDINT
- #define ASIO_HAS_STD_SHARED_PTR
- #define ASIO_HAS_STD_TYPE_TRAITS
- #include<asio.hpp>
- #include<asio/placeholders.hpp>
- #include<vector>
- #include<thread>
- #include<string>
- #include<iostream>
- #include<iomanip>
- #include<chrono>
- #include<sstream>
- #include<random>
- #include<functional>
- #include<fstream>
- #include<memory>
- #include<exception>
- #include<set>
- #include<tuple>
- #include<iostream>
- #include<array>
- #include<map>
- #include "Concurrent Queue.h"
- #include "Concurrent Map.h"
- namespace server {
- constexpr inline unsigned long long power_function(const unsigned long long prime, const unsigned long long iterations) {
- return
- /*iterations >= 1'048'576 ? prime * power_function(prime, 1'048'575) * power_function(prime, iterations - 1'048'576) :
- iterations >= 65'536 ? prime * power_function(prime, 65'535) * power_function(prime, iterations - 65'536) :
- iterations >= 4'096 ? prime * power_function(prime, 4'095) * power_function(prime, iterations - 4'096) :*/
- iterations >= 256 ? prime * power_function(prime, 255) * power_function(prime, iterations - 256) :
- iterations >= 16 ? prime * power_function(prime, 15) * power_function(prime, iterations - 16) :
- iterations > 0 ? prime * power_function(prime, iterations - 1) : 1;
- }
- inline unsigned long long getSeed() {
- return std::chrono::steady_clock::now().time_since_epoch().count();
- }
- union connection_id {
- private:
- static constexpr unsigned long long prime = power_function(-363, 30'000);
- unsigned long long permute_seed(unsigned long long seed) {
- return (seed + 1) * 59 * prime;
- }
- public:
- unsigned long long l;
- unsigned char c[8];
- connection_id() :
- l(0)
- {
- }
- connection_id(unsigned long long seed) :
- l(permute_seed(seed))
- {
- }
- bool operator<(const connection_id & c) const {
- return l < c.l;
- }
- bool operator==(const connection_id & c) const {
- return l == c.l;
- }
- connection_id & operator=(unsigned long long seed) {
- l = permute_seed(seed);
- return *this;
- }
- connection_id & operator=(const connection_id & id) {
- if (this == &id) return *this;
- l = id.l;
- return *this;
- }
- std::string str() const {
- std::stringstream ss;
- static const std::set<int> dash_indexes = {0, 5, 10};
- ss << std::hex;
- ss << std::uppercase;
- for (int i = 0; i < 16; i++) {
- int byte_index = i / 2;
- int byte_half = (i + 1) % 2;
- unsigned int value = (c[byte_index] >> (byte_half * 4)) & 0xF;
- ss << value;
- if (dash_indexes.count(i)) ss << "-";
- }
- return ss.str();
- }
- operator std::string() const {
- return str();
- }
- friend std::ostream & operator<<(std::ostream & os, const connection_id & id);
- };
- std::ostream & operator<<(std::ostream & os, const connection_id & id) {
- return os << id.str();
- }
- typedef std::unique_ptr<asio::ip::tcp::socket> socket_ptr;
- static const size_t MAX_BUFFER_SIZE = 1'048'576;
- typedef std::array<unsigned char, MAX_BUFFER_SIZE> data_buffer_type;
- typedef std::unique_ptr<data_buffer_type> data_buffer;
- class connection {
- private:
- socket_ptr socket;
- connection_id id;
- std::string ip_address;
- std::string port;
- public:
- data_buffer read_buffer;
- connection(unsigned long long seed, asio::io_service& service, std::string ip, std::string _port) :
- id(seed),
- socket(socket_ptr(new asio::ip::tcp::socket(service))),
- ip_address(ip),
- port(_port),
- read_buffer(new data_buffer_type())
- {
- }
- connection(const connection_id & _id, asio::io_service & service, std::string ip, std::string _port) :
- id(_id),
- socket(socket_ptr(new asio::ip::tcp::socket(service))),
- ip_address(ip),
- port(_port),
- read_buffer(new data_buffer_type())
- {
- }
- bool operator<(const connection & c) const {
- return id < c.id;
- }
- bool operator==(const connection & c) const {
- return id == c.id;
- }
- asio::ip::tcp::socket & get_socket() const {
- return *socket;
- }
- std::string get_ip_address() {
- return ip_address;
- }
- std::string get_port() {
- return port;
- }
- connection_id get_id() const { return id; }
- void regen_id(unsigned long long seed) {
- id = seed;
- }
- };
- typedef std::vector<unsigned char> data_vector;
- typedef std::pair<connection_id, data_vector> data_pair;
- typedef concurrent::queue<data_pair> data_queue;
- typedef std::shared_ptr<connection> connection_ptr;
- typedef concurrent::map<connection_id, connection_ptr> connection_map;
- typedef unsigned long long buffer_id;
- typedef concurrent::map<buffer_id, data_vector> write_buffer_map;
- typedef std::unique_ptr<asio::io_service> service_ptr;
- typedef std::unique_ptr<asio::io_service::work> work_ptr;
- typedef std::unique_ptr<asio::ip::tcp::acceptor> acceptor_ptr;
- class basic_server {
- private:
- service_ptr service;
- work_ptr work;
- acceptor_ptr acceptor;
- connection_id client_connection;
- connection_map connections;
- std::mutex mutex;
- std::condition_variable cond;
- volatile bool running;
- data_queue read_queue;
- write_buffer_map write_buffers;
- buffer_id current_buffer_id;
- std::ofstream lout;
- std::thread service_thread;
- buffer_id get_next_buffer_id() {
- return current_buffer_id++;
- }
- void service_function() {
- asio::error_code ec;
- while (!running) std::this_thread::yield();
- while (running) {
- try {
- service->run(ec);
- break;
- }
- catch (std::exception & e) {
- lout << "ERROR: " << e.what() << std::endl;
- }
- }
- }
- void accept(const asio::error_code & ec, const connection_id & id) {
- if (ec) {
- connections.erase(id);
- return;
- }
- using namespace std::chrono_literals;
- connection_ptr conn = connections[id];
- conn->get_socket().async_read_some(
- asio::buffer(
- &conn->read_buffer->front(),
- MAX_BUFFER_SIZE
- ),
- std::bind(
- &basic_server::read,
- this,
- std::placeholders::_1,
- std::placeholders::_2,
- id
- )
- );
- connection_id nid(getSeed());
- while (connections.contains(nid)) {
- std::this_thread::sleep_for(std::chrono::milliseconds(1));
- nid = getSeed();
- }
- connections[nid] = connection_ptr(new connection(nid, *service, conn->get_ip_address(), conn->get_port()));
- connection_ptr nconn = connections[nid];
- acceptor->async_accept(
- nconn->get_socket(),
- std::bind(
- &basic_server::accept,
- this,
- std::placeholders::_1,
- nid
- )
- );
- }
- void connect(const asio::error_code & ec, const connection_id & id) {
- if (ec) {
- connections.erase(id);
- return;
- }
- connection_ptr conn = connections[id];
- conn->get_socket().async_read_some(
- asio::buffer(
- &conn->read_buffer->front(),
- MAX_BUFFER_SIZE
- ),
- std::bind(
- &basic_server::read,
- this,
- std::placeholders::_1,
- std::placeholders::_2,
- id
- )
- );
- }
- void read(const asio::error_code & ec, size_t bytes_read, const connection_id & id) {
- if (ec) {
- connections.erase(id);
- return;
- }
- connection_ptr conn = connections[id];
- data_vector queue_data;
- queue_data.insert(queue_data.end(), conn->read_buffer->begin(), conn->read_buffer->begin() + bytes_read);
- read_queue.push(data_pair(id, queue_data));
- conn->get_socket().async_read_some(
- asio::buffer(
- &conn->read_buffer->front(),
- MAX_BUFFER_SIZE
- ),
- std::bind(
- &basic_server::read,
- this,
- std::placeholders::_1,
- std::placeholders::_2,
- id
- )
- );
- }
- void write(const asio::error_code & ec, size_t bytes_written, const connection_id & id, buffer_id buf_id) {
- write_buffers.erase(buf_id);
- if (ec) {
- connections.erase(id);
- }
- }
- public:
- basic_server() :
- service(new asio::io_service()),
- work(new asio::io_service::work(*service)),
- acceptor(new asio::ip::tcp::acceptor(*service)),
- running(true),
- service_thread(std::bind(&basic_server::service_function, this)),
- lout("server.log"),
- current_buffer_id(0)
- {
- }
- void start_listening(const std::string & ip_address, const std::string & port) {
- asio::ip::tcp::resolver resolver(*service);
- asio::ip::tcp::resolver::query query(ip_address, port);
- asio::ip::tcp::endpoint endpoint = *(resolver.resolve(query));
- acceptor->open(endpoint.protocol());
- acceptor->bind(endpoint);
- acceptor->listen(asio::socket_base::max_connections);
- connection_id id(getSeed());
- connections[id] = connection_ptr(new connection(id, *service, ip_address, port));
- connection_ptr conn = connections[id];
- acceptor->async_accept(
- conn->get_socket(),
- std::bind(
- &basic_server::accept,
- this,
- std::placeholders::_1,
- id
- )
- );
- }
- void start_connecting(const std::string & ip_address, const std::string & port) {
- asio::ip::tcp::resolver resolver(*service);
- asio::ip::tcp::resolver::query query(ip_address, port);
- asio::ip::tcp::endpoint endpoint = *(resolver.resolve(query));
- connection_id id(getSeed());
- connections[id] = connection_ptr(new connection(id, *service, ip_address, port));
- connection_ptr conn = connections[id];
- client_connection = id;
- conn->get_socket().async_connect(
- endpoint,
- std::bind(
- &basic_server::connect,
- this,
- std::placeholders::_1,
- id
- )
- );
- }
- ~basic_server() {
- stop();
- service_thread.join();
- }
- bool read_from_queue(data_pair & data) {
- return read_queue.try_pop(data);
- }
- void write_to_connection(const connection_id & id, const data_vector & data) {
- if (connections.contains(id)) {
- buffer_id bid = get_next_buffer_id();
- write_buffers[bid] = data;
- data_vector & saved_buffer = write_buffers[bid];
- connection_ptr conn = connections[id];
- conn->get_socket().async_write_some(
- asio::buffer(
- &saved_buffer.front(),
- saved_buffer.size()
- ),
- std::bind(
- &basic_server::write,
- this,
- std::placeholders::_1,
- std::placeholders::_2,
- id,
- bid
- )
- );
- }
- }
- void write_to_connection(const connection_id & id, const void * data, size_t size_of_data) {
- data_vector vector(size_of_data);
- const unsigned char * data_ptr = static_cast<const unsigned char *>(data);
- std::copy(data_ptr, data_ptr + size_of_data, vector.begin());
- write_to_connection(id, vector);
- }
- void client_write(const data_vector & data) {
- connection_id id = client_connection;
- write_to_connection(id, data);
- }
- void stop() {
- running = false;
- work.reset();
- service->stop();
- read_queue.wake_all();
- }
- std::set<connection_id> get_all_connection_ids() const {
- return connections.getKeySet();
- }
- bool is_connected(const connection_id & id) const {
- return connections.contains(id);
- }
- const connection_ptr get_connection(const connection_id & id) const {
- return connections.at(id);
- }
- };
- }
Add Comment
Please, Sign In to add comment