Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <sys/mman.h>
- #include <sys/types.h>
- #include <sys/socket.h>
- #include <netinet/in.h>
- #include <netinet/tcp.h>
- #include <unistd.h>
- #include <sched.h>
- #include <exception>
- #include <string.h>
- #include <sys/epoll.h>
- #include <iostream>
- #include <array>
- #include <vector>
- int safe_call(int ret, const char * ctx) {
- // fprintf(stderr, "%s == %d, errno(%d) == %s\n", ctx, ret, errno, strerror(errno));
- if((ret == -1) && (errno != EINPROGRESS)) throw std::runtime_error(std::string(ctx) + " == " + std::to_string(ret) + ", errno(" + std::to_string(errno) + ')' + " == " + strerror(errno));
- return ret;
- }
- #define safe_call(call) safe_call(call, #call)
- int connect_to(in_addr_t addr, uint16_t port) {
- sockaddr_in saddr{};
- saddr.sin_family = AF_INET;
- saddr.sin_port = htons(port);
- saddr.sin_addr.s_addr = htonl(addr);
- int fd = safe_call(socket(AF_INET, SOCK_NONBLOCK | SOCK_STREAM, 0));
- safe_call(connect(fd, (__CONST_SOCKADDR_ARG)&saddr, sizeof(saddr)));
- return fd;
- }
- class alignas(64) client {
- public:
- void connect(in_addr_t addr, uint16_t port) {
- // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
- this->addr = addr;
- this->port = port;
- fd = connect_to(addr, port);
- add_to_epoll();
- ++active_connections;
- }
- void close() {
- // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
- remove_from_epoll();
- safe_call(::close(fd));
- --active_connections;
- }
- size_t get_rq() {
- return count;
- }
- client() = default;
- client(client && o) = delete;
- client(client & o) = delete;
- client & operator=(const client & o) = delete;
- client & operator=(client && o) = delete;
- ~client() = default;
- void run() {
- // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
- size_t data = fd;
- safe_call(write(fd, &data, sizeof(data)));
- update_events(EPOLLIN | EPOLLRDHUP | EPOLLET);
- }
- protected:
- void add_to_epoll() {
- epoll_event eevent;
- eevent.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET;
- eevent.data.ptr = (void *)this;
- safe_call(epoll_ctl(epoll_fd, EPOLL_CTL_ADD, fd, &eevent));
- }
- void update_events(uint32_t ev) {
- epoll_event eevent;
- eevent.events = ev;
- eevent.data.ptr = (void *)this;
- safe_call(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &eevent));
- }
- void remove_from_epoll() {
- safe_call(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, fd, nullptr));
- }
- void update_next() {
- // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
- uint64_t * it = (uint64_t *)buff, * end = it + (sizeof(buff) / sizeof(uint64_t));
- do {
- if(*it != size_t(fd)) throw std::runtime_error{"bad data"};
- } while(++it < end);
- buff_cur = 0; memset(buff, 0, sizeof(buff)); ++count;
- }
- void update_buff() {
- // fprintf(stderr, "%s\n", __PRETTY_FUNCTION__);
- while(ssize_t r = read(fd, buff, (sizeof(buff) - buff_cur))) {
- if(r == -1) return;
- buff_cur += r;
- if(buff_cur == sizeof(buff)) update_next();
- }
- }
- protected:
- in_addr_t addr;
- uint16_t port;
- int fd;
- size_t buff_cur = 0, count = 0;
- uint8_t buff[1024];
- protected:
- static constexpr size_t epoll_size = 1024, epoll_event_size = 1024;
- static inline int epoll_fd = safe_call(epoll_create(epoll_size));
- static inline std::array<epoll_event, epoll_event_size> edata;
- static inline size_t active_connections = 0;
- public:
- static void run(size_t timeout) {
- while(1) {
- size_t n = safe_call(epoll_wait(epoll_fd, std::begin(edata), edata.size(), timeout));
- if(!active_connections) return;
- for(auto it = std::begin(edata), end = it + n; it < end; ++it) {
- ((client *)(it->data.ptr))->wait_handler(it->events);
- }
- }
- }
- protected:
- void wait_handler(uint32_t events) try {
- // fprintf(stderr, "%s, events == %x\n", __PRETTY_FUNCTION__, events);
- if(events & EPOLLOUT) run();
- if(events & EPOLLIN) update_buff();
- if(events & EPOLLRDHUP) close();
- } catch(const std::exception & e) { std::cerr << e.what() << std::endl; std::terminate();}
- };
- int main() {
- in_addr_t addr = INADDR_LOOPBACK; uint16_t port = 8888;
- std::vector<client> v(10000);
- for(auto & x : v) x.connect(addr, port);
- client::run(50);
- size_t rq = 0;
- for(auto & x : v) rq += x.get_rq();
- fprintf(stderr, "rq = %lu\n", rq);
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement