Advertisement
Guest User

UDP read test

a guest
Jan 15th, 2014
330
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C++ 9.76 KB | None | 0 0
  1.  
  2. #include <WinSock2.h>
  3. #include <WS2tcpip.h>
  4. // #include <Windows.h>
  5.  
  6. #include <conio.h>
  7. #include <string>
  8. #include <vector>
  9. #include <iostream>
  10. #include <sstream>
  11. #include <list>
  12.  
  13. #include "timer.h"
  14.  
  15. static SOCKET Connect(const std::string &interfaceIP, const std::string &bindIP, unsigned int port)
  16. {
  17.     // Create UDP socket fit for overlapped I/O.
  18.     SOCKET socket = WSASocket(AF_INET, SOCK_DGRAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
  19.     if (INVALID_SOCKET != socket)
  20.     {
  21.         // "Share" socket address.
  22.         int sockOpt = 1;
  23.         setsockopt(socket, SOL_SOCKET, SO_REUSEADDR, reinterpret_cast<char*>(&sockOpt), sizeof(int));
  24.    
  25.         // Disable UDP checksum (if possible).
  26.         sockOpt = 1;
  27.         setsockopt(socket, IPPROTO_UDP, UDP_NOCHECKSUM, reinterpret_cast<char*>(&sockOpt), sizeof(int));
  28.        
  29.         // Enlarge (or embiggen, if you will) recv. buffer.
  30.         sockOpt = 1024*512; // 0x80000;
  31.         setsockopt(socket, SOL_SOCKET, SO_RCVBUF, reinterpret_cast<char *>(&sockOpt), sizeof(int));
  32.  
  33.         // Bind to interface(s).
  34.         sockaddr_in address;
  35.         memset(&address, 0, sizeof(sockaddr_in));
  36.         address.sin_family = AF_INET;
  37.         address.sin_port = htons(port);
  38.         address.sin_addr.S_un.S_addr = inet_addr(interfaceIP.c_str());
  39.         int addrLen = sizeof(sockaddr_in);
  40.         if (-1 != bind(socket, reinterpret_cast<sockaddr *>(&address), addrLen))
  41.         {
  42.             // Join multicast.
  43.             ip_mreq multicast;
  44.             multicast.imr_multiaddr.S_un.S_addr = inet_addr(bindIP.c_str());
  45.             multicast.imr_interface = address.sin_addr;
  46.  
  47.             if (-1 != setsockopt(socket, IPPROTO_IP, IP_ADD_MEMBERSHIP, reinterpret_cast<char *>(&multicast), sizeof(ip_mreq)))
  48.             {
  49. //              char MTU[1500];
  50. //              int bytesRecv = recvfrom(socket, MTU, 1500, 0, reinterpret_cast<sockaddr *>(&address), &addrLen);
  51. //              if (bytesRecv > 0)
  52.                 {
  53.                     // Non-blocking socket, please.
  54.                     u_long mode = 1;
  55.                     ioctlsocket(socket, FIONBIO, &mode);
  56.  
  57.                     std::cout << "Socket joined multicast: " << bindIP.c_str() << ":" << port << std::endl;
  58.                     return socket;
  59.                 }
  60.             }
  61.         }
  62.  
  63.         closesocket(socket);
  64.     }
  65.  
  66.     std::cout << "Failed to join multicast: " << bindIP.c_str() << ":" << port << std::endl;
  67.  
  68.     return INVALID_SOCKET;
  69. }
  70.  
  71. const int kMTUSize = 1500;
  72.  
  73. class Stream
  74. {
  75. public:
  76.     Stream(const std::string &interfaceIP, const std::string &bindIP, unsigned int port, HANDLE IOPort) :
  77.         m_interfaceIP(interfaceIP)
  78. ,       m_bindIP(bindIP)
  79. ,       m_port(port)
  80. ,       m_IOPort(IOPort)
  81. ,       m_state(kDisconnected)
  82. ,       m_socket(INVALID_SOCKET)
  83.     {
  84.         memset(&m_wsaOvl, 0, sizeof(WSAOVERLAPPED));
  85.  
  86.         // Set this thing in motion.
  87.         Connect();
  88.         Recv();
  89.     }
  90.  
  91.     ~Stream()
  92.     {
  93.         if (INVALID_SOCKET != m_socket)
  94.             closesocket(m_socket);
  95.     }
  96.  
  97.     void Connect()
  98.     {
  99.         if (kDisconnected == m_state)
  100.         {
  101.             // Set up connection.
  102.             m_socket = ::Connect(m_interfaceIP, m_bindIP, m_port);
  103.             if (INVALID_SOCKET != m_socket)
  104.             {
  105.                 // Bind to global IO port.
  106.                 HANDLE IOPort = CreateIoCompletionPort(
  107.                     reinterpret_cast<HANDLE>(m_socket),
  108.                     m_IOPort,                          
  109.                     reinterpret_cast<ULONG_PTR>(this),
  110.                     0);                                
  111.                
  112.                 // Skip notifications for synchronous (immediate) reads.
  113.                 SetFileCompletionNotificationModes((HANDLE) m_socket, FILE_SKIP_COMPLETION_PORT_ON_SUCCESS);
  114.  
  115.                 if (NULL == IOPort)
  116.                 {
  117.                     std::cout << "Can't attach to IO completion port for stream bound to: " << m_bindIP.c_str() << ":" << m_port << std::endl;
  118.  
  119.                     // Failed: disconnect.
  120.                     closesocket(m_socket);
  121.                     m_socket = INVALID_SOCKET;
  122.                 }
  123.                 else
  124.                 {
  125.                     m_state = kConnected;
  126.                 }
  127.             }
  128.         }
  129.     }
  130.  
  131.     size_t Recv()
  132.     {
  133.         if (kConnected == m_state)
  134.         {
  135.             // Connected: read!
  136.             DWORD bytesRecv;
  137.             BOOL result = ReadFile((HANDLE) m_socket, m_buffer, kMTUSize, &bytesRecv, &m_wsaOvl);
  138.             if (FALSE == result && ERROR_IO_PENDING == GetLastError())
  139.             {
  140.                 // Pending.
  141.                 return 0;
  142.             }
  143.             else if (TRUE == result)
  144.             {
  145.                 // Synchronous.
  146. //              return 0;
  147.  
  148.                 // Recurse (only in conjunction w/FILE_SKIP_COMPLETION_PORT_ON_SUCCESS).
  149.                 return bytesRecv + Recv();
  150.             }
  151.             else
  152.             {
  153.                 const DWORD lastErr = GetLastError();
  154.  
  155.                 // Failed: disconnect.
  156.                 closesocket(m_socket);
  157.                 m_socket = INVALID_SOCKET;
  158.                 m_state = kDisconnected;
  159.  
  160.                 std::cout << "Socket bound to: " << m_bindIP.c_str() << ":" << m_port << " closed (" << lastErr << ")." << std::endl;
  161.             }
  162.         }
  163.  
  164.         return 0;
  165.     }
  166.  
  167.     bool IsConnected() const { return INVALID_SOCKET != m_socket; }
  168.  
  169. private:
  170.     const std::string m_interfaceIP;
  171.     const std::string m_bindIP;
  172.     const unsigned int m_port;
  173.     const HANDLE m_IOPort;
  174.  
  175.     enum State
  176.     {
  177.         kDisconnected,
  178.         kConnected
  179.     } m_state;
  180.  
  181.     char m_buffer[kMTUSize];
  182.     SOCKET m_socket;
  183.  
  184.     WSAOVERLAPPED m_wsaOvl;
  185.     DWORD m_recvFlags;
  186. };
  187.  
  188. class Worker
  189. {
  190. public:
  191.     Worker(HANDLE IOPort, size_t ID) :
  192.         m_IOPort(IOPort)
  193. ,       m_ID(ID)
  194. ,       m_hThread(INVALID_HANDLE_VALUE)
  195. ,       m_bytesRecv(0)
  196. ,       m_IOCount(0)
  197.     {
  198.     }
  199.  
  200.     ~Worker()
  201.     {
  202.         Stop();
  203.     }
  204.  
  205.     HANDLE Start()
  206.     {
  207.         m_stopThread = false;
  208.         m_hThread = CreateThread(NULL, 2*4096, reinterpret_cast<LPTHREAD_START_ROUTINE>(ThreadFunc), this, 0, NULL);
  209.         return m_hThread;
  210.     }
  211.  
  212.     size_t GetBytesRecv() const { return m_bytesRecv; }
  213.     size_t GetIOCount() const { return m_IOCount; }
  214.  
  215. private:
  216.     const HANDLE m_IOPort;
  217.     const size_t m_ID;
  218.    
  219.     HANDLE m_hThread;
  220.     bool m_stopThread;
  221.  
  222.     size_t m_bytesRecv;
  223.     size_t m_IOCount;
  224.  
  225.     void Stop()
  226.     {
  227.         DWORD exitCode = STILL_ACTIVE;
  228.         while (STILL_ACTIVE == exitCode)
  229.             GetExitCodeThread(m_hThread, &exitCode);
  230.  
  231.         CloseHandle(m_hThread);
  232.     }
  233.  
  234.     static DWORD WINAPI ThreadFunc(LPVOID lpParameter)
  235.     {
  236.         return reinterpret_cast<Worker *>(lpParameter)->Thread();
  237.     }
  238.  
  239.     DWORD Thread()
  240.     {
  241.         while (false == m_stopThread)
  242.         {
  243.             OVERLAPPED_ENTRY entries[1];
  244.             ULONG numEntries = 0;
  245.  
  246.             const BOOL result = GetQueuedCompletionStatusEx(
  247.                 m_IOPort,
  248.                 entries,
  249.                 1,          
  250.                 &numEntries,
  251.                 INFINITE,
  252.                 FALSE);
  253. //          const bool timeOut = WAIT_TIMEOUT == GetLastError();
  254.             if (TRUE == result)
  255.             {
  256.                 for (ULONG iEntry = 0; iEntry < numEntries; ++iEntry)
  257.                 {
  258.                     const OVERLAPPED_ENTRY &entry = entries[iEntry];
  259.                     if (NULL != entry.lpOverlapped)
  260.                     {
  261.                         m_bytesRecv += entry.dwNumberOfBytesTransferred;
  262.                         ++m_IOCount;
  263.  
  264.                         // Issue more
  265.                         Stream *pInst = reinterpret_cast<Stream *>(entry.lpCompletionKey);
  266.                         m_bytesRecv += pInst->Recv();
  267.                     }
  268.                     else
  269.                     {
  270.                         // Asked to stop?
  271.                         if (this == reinterpret_cast<Worker *>(entry.lpCompletionKey))
  272.                         {
  273.                             m_stopThread = true;
  274.                         }
  275.                     }
  276.                 }
  277.             }
  278.             else
  279.             {
  280.                 // This (most likely) means the port was closed, so terminate.
  281.                 // Attempts to repair a connection that broke on it's last Recv() are made by the main thread.
  282.                 m_stopThread = true;
  283.             }
  284.         }
  285.  
  286.         return 0;
  287.     }
  288. };
  289.  
  290. int main(int argC, char **argV)
  291. {
  292.     WSADATA wsaData;
  293.     if (0 != WSAStartup(MAKEWORD(2, 2), &wsaData))
  294.     {
  295.         std::cout << "Can not initialize WinSock" << std::endl;
  296.         return -1;
  297.     }
  298.  
  299.     const size_t kNumWorkers = 4;
  300.  
  301.     HANDLE IOPort = CreateIoCompletionPort(
  302.         INVALID_HANDLE_VALUE,
  303.         NULL,
  304.         NULL,
  305.         kNumWorkers);
  306.     {
  307.         // Create an amount of streams.
  308.         const size_t kNumStreams = 100; // 28;
  309.         Stream *pStreams[kNumStreams];
  310.  
  311. #if 1
  312.         // These (239.10.10.) are ~5 Mbit streams.
  313.         for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
  314.         {
  315.             std::stringstream bindIP;
  316.             bindIP << "239.10.10." << iStr+1;
  317.             pStreams[iStr] = new Stream("169.254.207.12", bindIP.str(), 1234, IOPort);
  318.         }
  319. #else
  320.         // 239.20.100.1/2/3/4 = ~38 Mbit
  321.         for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
  322.         {
  323.             std::stringstream bindIP;
  324.             bindIP << "239.20.100." << 1+(iStr%4);
  325.             pStreams[iStr] = new Stream("169.254.207.12", bindIP.str(), 1234, IOPort);
  326.         }
  327. #endif
  328.  
  329.         // Fire up the proletariat.
  330.         std::vector<Worker *> workers;
  331.         HANDLE threads[kNumWorkers];
  332.         for (size_t iWorker = 0; iWorker < kNumWorkers; ++iWorker)
  333.         {
  334.             workers.push_back(new Worker(IOPort, iWorker));
  335.             threads[iWorker] = workers.back()->Start();
  336.         }
  337.  
  338.         Timer timer;
  339.         float prevTime = 0.f;
  340.         int cycles = 0;
  341.         size_t prevTraffic = 0;
  342.  
  343.         while (cycles < 100) // Do a few cycles...
  344.         {
  345.             const float curTime = timer.Get();
  346.             if (curTime-prevTime > 1.f)
  347.             {
  348.                 // Attempt to revive dead streams.
  349.                 // This is safe since a disconnected stream won't be touched by the proletariat.
  350.                 for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
  351.                 {
  352.                     if (false == pStreams[iStr]->IsConnected())
  353.                     {
  354.                         pStreams[iStr]->Connect();
  355.                         pStreams[iStr]->Recv();
  356.                     }
  357.                 }
  358.  
  359.                 // Statistics.
  360.                 size_t curTraffic = 0;
  361.                 for (size_t iWorker = 0; iWorker < kNumWorkers; ++iWorker)
  362.                     curTraffic += workers[iWorker]->GetBytesRecv();
  363.                
  364.                 std::cout << "Current traffic: " << (curTraffic-prevTraffic)/(125.f*1024.f)/(timer.Get()-prevTime) << " Mbit/S." << std::endl;
  365.  
  366.                 prevTime = curTime;
  367.                 ++cycles;
  368.                 prevTraffic = curTraffic;
  369.             }
  370.  
  371.             SleepEx(500, true);
  372.         }
  373.  
  374.         // Close IO port (terminates threads & IO).
  375.         CloseHandle(IOPort);
  376.  
  377.         // Get rid of workers and do more statistics.
  378.         size_t totalTraffic = 0, totalIO = 0;
  379.         for (size_t iWorker = 0; iWorker < kNumWorkers; ++iWorker)
  380.         {
  381.             const size_t traffic = workers[iWorker]->GetBytesRecv();
  382.             const size_t IO = workers[iWorker]->GetIOCount();
  383.             std::cout << "Total traffic for worker #" << iWorker << ": " << traffic/1024 << " KB (" << IO << " IO events)." << std::endl;
  384.  
  385.             totalTraffic += traffic;
  386.             totalIO += IO;
  387.            
  388.             delete workers[iWorker];
  389.         }
  390.  
  391.         std::cout << "Total traffic: " << totalTraffic/(1024*1024) << " MB." << std::endl;
  392.         std::cout << "Total IO events: " << totalIO << "." << std::endl;
  393.        
  394.         // Get rid of streams.
  395.         for (size_t iStr = 0; iStr < kNumStreams; ++iStr)
  396.             delete pStreams[iStr];
  397.     }
  398.  
  399.     WSACleanup();
  400.  
  401.     getch();
  402.  
  403.     return 0;
  404. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement