Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #include <WinSock2.h>
- #include <WS2tcpip.h>
- // #include <Windows.h>
- #include <conio.h>
- #include <string>
- #include <vector>
- #include <iostream>
- #include <sstream>
- #include <list>
- #include "timer.h"
- static SOCKET Connect(const std::string &interfaceIP, const std::string &bindIP, unsigned int port)
- {
- // Create UDP socket fit for overlapped I/O.
- SOCKET socket = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
- if (INVALID_SOCKET != socket)
- {
- // "Share" socket address.
- int sockOpt = 1;
- setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&sockOpt), sizeof(int));
- // Disable UDP checksum (if possible).
- sockOpt = 1;
- setsockopt(socket, IPPROTO_UDP, UDP_NOCHECKSUM, reinterpret_cast<char*>(&sockOpt), sizeof(int));
- // Enlarge (or embiggen, if you will) recv. buffer.
- sockOpt = 1024*512; // 0x80000;
- setsockopt(socket, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char *>(&sockOpt), sizeof(int));
- // Bind to interface(s).
- sockaddr_in address;
- memset(&address, 0, sizeof(sockaddr_in));
- address.sin_family = AF_INET;
- address.sin_port = htons(port);
- address.sin_addr.S_un.S_addr = inet_addr(interfaceIP.c_str());
- int addrLen = sizeof(sockaddr_in);
- if (-1 != bind(socket, reinterpret_cast<sockaddr *>(&address), addrLen))
- {
- // Join multicast.
- ip_mreq multicast;
- multicast.imr_multiaddr.S_un.S_addr = inet_addr(bindIP.c_str());
- multicast.imr_interface = address.sin_addr;
- if (-1 != setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast<char *>(&multicast), sizeof(ip_mreq)))
- {
- // char MTU[1500];
- // int bytesRecv = recvfrom(socket, MTU, 1500, 0, reinterpret_cast<sockaddr *>(&address), &addrLen);
- // if (bytesRecv > 0)
- {
- // Non-blocking socket, please.
- u_long mode = 1;
- ioctlsocket(socket, FIONBIO, &mode);
- std::cout << "Socket joined multicast: " << bindIP.c_str() << ":" << port << std::endl;
- return socket;
- }
- }
- }
- closesocket(socket);
- }
- std::cout << "Failed to join multicast: " << bindIP.c_str() << ":" << port << std::endl;
- return INVALID_SOCKET;
- }
- const int kMTUSize = 1500;
- class Stream
- {
- public:
- Stream(const std::string &interfaceIP, const std::string &bindIP, unsigned int port, HANDLE IOPort) :
- m_interfaceIP(interfaceIP)
- , m_bindIP(bindIP)
- , m_port(port)
- , m_IOPort(IOPort)
- , m_state(kDisconnected)
- , m_socket(INVALID_SOCKET)
- {
- memset(&m_wsaOvl, 0, sizeof(WSAOVERLAPPED));
- // Set this thing in motion.
- Connect();
- Recv();
- }
- ~Stream()
- {
- if (INVALID_SOCKET != m_socket)
- closesocket(m_socket);
- }
- void Connect()
- {
- if (kDisconnected == m_state)
- {
- // Set up connection.
- m_socket = ::Connect(m_interfaceIP, m_bindIP, m_port);
- if (INVALID_SOCKET != m_socket)
- {
- // Bind to global IO port.
- HANDLE IOPort = CreateIoCompletionPort(
- reinterpret_cast<HANDLE>(m_socket),
- m_IOPort,
- reinterpret_cast<ULONG_PTR>(this),
- 0);
- // Skip notifications for synchronous (immediate) reads.
- SetFileCompletionNotificationModes((HANDLE) m_socket, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
- if (NULL == IOPort)
- {
- std::cout << "Can't attach to IO completion port for stream bound to: " << m_bindIP.c_str() << ":" << m_port << std::endl;
- // Failed: disconnect.
- closesocket(m_socket);
- m_socket = INVALID_SOCKET;
- }
- else
- {
- m_state = kConnected;
- }
- }
- }
- }
- size_t Recv()
- {
- if (kConnected == m_state)
- {
- // Connected: read!
- DWORD bytesRecv;
- BOOL result = ReadFile((HANDLE) m_socket, m_buffer, kMTUSize, &bytesRecv, &m_wsaOvl);
- if (FALSE == result && ERROR_IO_PENDING == GetLastError())
- {
- // Pending.
- return 0;
- }
- else if (TRUE == result)
- {
- // Synchronous.
- // return 0;
- // Recurse (only in conjunction w/FILE_SKIP_COMPLETION_PORT_ON_SUCCESS).
- return bytesRecv + Recv();
- }
- else
- {
- const DWORD lastErr = GetLastError();
- // Failed: disconnect.
- closesocket(m_socket);
- m_socket = INVALID_SOCKET;
- m_state = kDisconnected;
- std::cout << "Socket bound to: " << m_bindIP.c_str() << ":" << m_port << " closed (" << lastErr << ")." << std::endl;
- }
- }
- return 0;
- }
- bool IsConnected() const { return INVALID_SOCKET != m_socket; }
- private:
- const std::string m_interfaceIP;
- const std::string m_bindIP;
- const unsigned int m_port;
- const HANDLE m_IOPort;
- enum State
- {
- kDisconnected,
- kConnected
- } m_state;
- char m_buffer[kMTUSize];
- SOCKET m_socket;
- WSAOVERLAPPED m_wsaOvl;
- DWORD m_recvFlags;
- };
- class Worker
- {
- public:
- Worker(HANDLE IOPort, size_t ID) :
- m_IOPort(IOPort)
- , m_ID(ID)
- , m_hThread(INVALID_HANDLE_VALUE)
- , m_bytesRecv(0)
- , m_IOCount(0)
- {
- }
- ~Worker()
- {
- Stop();
- }
- HANDLE Start()
- {
- m_stopThread = false;
- m_hThread = CreateThread(NULL, 2*4096, reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadFunc), this, 0, NULL);
- return m_hThread;
- }
- size_t GetBytesRecv() const { return m_bytesRecv; }
- size_t GetIOCount() const { return m_IOCount; }
- private:
- const HANDLE m_IOPort;
- const size_t m_ID;
- HANDLE m_hThread;
- bool m_stopThread;
- size_t m_bytesRecv;
- size_t m_IOCount;
- void Stop()
- {
- DWORD exitCode = STILL_ACTIVE;
- while (STILL_ACTIVE == exitCode)
- GetExitCodeThread(m_hThread, &exitCode);
- CloseHandle(m_hThread);
- }
- static DWORD WINAPI ThreadFunc(LPVOID lpParameter)
- {
- return reinterpret_cast<Worker *>(lpParameter)->Thread();
- }
- DWORD Thread()
- {
- while (false == m_stopThread)
- {
- OVERLAPPED_ENTRY entries[1];
- ULONG numEntries = 0;
- const BOOL result = GetQueuedCompletionStatusEx(
- m_IOPort,
- entries,
- 1,
- &numEntries,
- INFINITE,
- FALSE);
- // const bool timeOut = WAIT_TIMEOUT == GetLastError();
- if (TRUE == result)
- {
- for (ULONG iEntry = 0; iEntry < numEntries; ++iEntry)
- {
- const OVERLAPPED_ENTRY &entry = entries[iEntry];
- if (NULL != entry.lpOverlapped)
- {
- m_bytesRecv += entry.dwNumberOfBytesTransferred;
- ++m_IOCount;
- // Issue more
- Stream *pInst = reinterpret_cast<Stream *>(entry.lpCompletionKey);
- m_bytesRecv += pInst->Recv();
- }
- else
- {
- // Asked to stop?
- if (this == reinterpret_cast<Worker *>(entry.lpCompletionKey))
- {
- m_stopThread = true;
- }
- }
- }
- }
- else
- {
- // This (most likely) means the port was closed, so terminate.
- // Attempts to repair a connection that broke on it's last Recv() are made by the main thread.
- m_stopThread = true;
- }
- }
- return 0;
- }
- };
- int main(int argC, char **argV)
- {
- WSADATA wsaData;
- if (0 != WSAStartup(MAKEWORD(2, 2), &wsaData))
- {
- std::cout << "Can not initialize WinSock" << std::endl;
- return -1;
- }
- const size_t kNumWorkers = 4;
- HANDLE IOPort = CreateIoCompletionPort(
- INVALID_HANDLE_VALUE,
- NULL,
- NULL,
- kNumWorkers);
- {
- // Create an amount of streams.
- const size_t kNumStreams = 100; // 28;
- Stream *pStreams[kNumStreams];
- #if 1
- // These (239.10.10.) are ~5 Mbit streams.
- for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
- {
- std::stringstream bindIP;
- bindIP << "239.10.10." << iStr+1;
- pStreams[iStr] = new Stream("169.254.207.12", bindIP.str(), 1234, IOPort);
- }
- #else
- // 239.20.100.1/2/3/4 = ~38 Mbit
- for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
- {
- std::stringstream bindIP;
- bindIP << "239.20.100." << 1+(iStr%4);
- pStreams[iStr] = new Stream("169.254.207.12", bindIP.str(), 1234, IOPort);
- }
- #endif
- // Fire up the proletariat.
- std::vector<Worker *> workers;
- HANDLE threads[kNumWorkers];
- for (size_t iWorker = 0; iWorker < kNumWorkers; ++iWorker)
- {
- workers.push_back(new Worker(IOPort, iWorker));
- threads[iWorker] = workers.back()->Start();
- }
- Timer timer;
- float prevTime = 0.f;
- int cycles = 0;
- size_t prevTraffic = 0;
- while (cycles < 100) // Do a few cycles...
- {
- const float curTime = timer.Get();
- if (curTime-prevTime > 1.f)
- {
- // Attempt to revive dead streams.
- // This is safe since a disconnected stream won't be touched by the proletariat.
- for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
- {
- if (false == pStreams[iStr]->IsConnected())
- {
- pStreams[iStr]->Connect();
- pStreams[iStr]->Recv();
- }
- }
- // Statistics.
- size_t curTraffic = 0;
- for (size_t iWorker = 0; iWorker < kNumWorkers; ++iWorker)
- curTraffic += workers[iWorker]->GetBytesRecv();
- std::cout << "Current traffic: " << (curTraffic-prevTraffic)/(125.f*1024.f)/(timer.Get()-prevTime) << " Mbit/S." << std::endl;
- prevTime = curTime;
- ++cycles;
- prevTraffic = curTraffic;
- }
- SleepEx(500, true);
- }
- // Close IO port (terminates threads & IO).
- CloseHandle(IOPort);
- // Get rid of workers and do more statistics.
- size_t totalTraffic = 0, totalIO = 0;
- for (size_t iWorker = 0; iWorker < kNumWorkers; ++iWorker)
- {
- const size_t traffic = workers[iWorker]->GetBytesRecv();
- const size_t IO = workers[iWorker]->GetIOCount();
- std::cout << "Total traffic for worker #" << iWorker << ": " << traffic/1024 << " KB (" << IO << " IO events)." << std::endl;
- totalTraffic += traffic;
- totalIO += IO;
- delete workers[iWorker];
- }
- std::cout << "Total traffic: " << totalTraffic/(1024*1024) << " MB." << std::endl;
- std::cout << "Total IO events: " << totalIO << "." << std::endl;
- // Get rid of streams.
- for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
- delete pStreams[iStr];
- }
- WSACleanup();
- getch();
- return 0;
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement