Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #pragma once
- #include <algorithm>
- #include <cassert>
- #include <mutex>
- #include <stdlib.h>
- #include <thread>
- #include <Windows.h>
- #include "../Shared/Defer.h"
- #ifdef max
- #undef max
- #endif
- struct IPC_Packet
- {
- DWORD to; // recipient
- DWORD from; // sender
- DWORD id; // the id of this packet
- DWORD size; // size of data
- BYTE data[ 0 ]; // actual data
- };
- inline DWORD SizeOfIpcPacket( IPC_Packet *data )
- {
- return data->size + sizeof( IPC_Packet );
- }
- template <typename T>
- T *TrailingAllocate( DWORD extraSize )
- {
- return (T *)( new BYTE[ sizeof( T ) + extraSize ] );
- }
- // TODO: why is this not in the IPC_Peer class?
- struct IPC_Header
- {
- HANDLE mutex;
- DWORD lastClient;
- // client --> server packets
- DWORD incomingSize;
- // server --> client packets
- DWORD outgoingSize;
- DWORD lastId;
- };
- #define IPC_HEADER_SIZE ( sizeof( IPC_Header ) )
- // TODO: list
- // - add an internal method where packets are ACKed when recieved (could only be compiled on debug builds?)
- // - packets shouldnt be dropped but if they are then the user should know about it
- // - should the incoming & outgoing buffer sizes be the same??
- // - linux implementation
- // - track and cleanup packets that havent been collected
- enum class IPC_Type
- {
- Server,
- Client
- };
- template <IPC_Type Type>
- class IPC_Peer
- {
- // integral peer data
- IPC_Header *header; // represents the entire buffer
- PVOID incoming; // recieved
- PVOID outgoing; // sent
- DWORD totalSize;
- HANDLE fileHandle;
- bool valid = false;
- // the id of this peer
- DWORD peerId;
- char objName[ MAX_PATH ];
- std::function<void( IPC_Packet * )> callback;
- // debug function
- void Warning( const char *fmt, ... )
- {
- va_list vlist;
- char buf[ 1024 ];
- va_start( vlist, fmt );
- vsnprintf( buf, 1024, fmt, vlist );
- va_end( vlist );
- OutputDebugStringA( buf );
- printf( buf );
- }
- public:
- IPC_Peer( const char *objectName, DWORD incomingSize, DWORD outgoingSize, std::function<void( IPC_Packet * )> callback );
- ~IPC_Peer()
- {
- if ( Type == IPC_Type::Client )
- {
- }
- // This is so that we dont cause a deadlock
- if ( hasMutex == true )
- ReleaseMutex();
- if ( valid )
- {
- UnmapViewOfFile( header );
- CloseHandle( fileHandle );
- }
- }
- bool IsValid()
- {
- return valid;
- }
- void ProcessIncoming()
- {
- assert( valid );
- ObtainMutex();
- Defer( ReleaseMutex() );
- DWORD unprocessedOffset = 0;
- BYTE *unprocessed = new BYTE[ header->incomingSize ];
- Defer( delete[] unprocessed );
- memset( unprocessed, 0, header->incomingSize );
- DWORD offset = 0;
- while ( true )
- {
- IPC_Packet *dat = (IPC_Packet *)( (DWORD)incoming + offset );
- if ( dat->size == 0 )
- break; // end of data
- DWORD packetSize = SizeOfIpcPacket( dat );
- offset += packetSize;
- if ( peerId == dat->to )
- {
- if ( Type == IPC_Type::Client )
- Warning( "Client: Recieve %d --> %d (%d)\n", dat->from, dat->to, dat->id );
- else if ( Type == IPC_Type::Server )
- Warning( "Server: Recieve %d --> %d (%d)\n", dat->from, dat->to, dat->id );
- callback( dat );
- }
- else
- {
- // write to unprocessed buffer
- if ( unprocessedOffset + packetSize > header->incomingSize )
- {
- Warning( "Unprocessed Buffer Overflow\n\nPURGING!!!!\n\n" );
- memset( unprocessed, 0, header->incomingSize );
- break;
- }
- memcpy( unprocessed + unprocessedOffset, dat, packetSize );
- unprocessedOffset += packetSize;
- }
- }
- // all incoming packets for us have been dealt with
- // copy unprocessed packets back into buffer
- memcpy( incoming, unprocessed, header->incomingSize );
- }
- void SendPacket( IPC_Packet *p )
- {
- assert( valid );
- ObtainMutex();
- Defer( ReleaseMutex() );
- header->lastId += 1;
- p->id = header->lastId;
- if ( Type == IPC_Type::Client )
- Warning( "Client: Send %d --> %d (%d)\n", p->from, p->to, p->id );
- else if ( Type == IPC_Type::Server )
- Warning( "Server: Send %d --> %d (%d)\n", p->from, p->to, p->id );
- // get the last outgoing packet
- DWORD offset = 0;
- IPC_Packet *dat = (IPC_Packet *)outgoing;
- while ( true )
- {
- if ( dat->size == 0 )
- break; // end of data
- offset += SizeOfIpcPacket( dat );
- dat = (IPC_Packet *)( (DWORD)outgoing + offset );
- }
- // if this goes off then you overflowed the buffer
- assert( ( offset + p->size ) < header->outgoingSize );
- // fill out packet
- memcpy( dat, p, SizeOfIpcPacket( p ) );
- }
- // Create and send a packet
- template <typename T>
- void CreateAndSendPacket( T *data );
- template <typename T>
- void CreateAndSendPacketTo( T *data, DWORD target );
- void SetCallback( std::function<void( IPC_Packet * )> c )
- {
- callback = c;
- }
- // TODO: remove
- static void AllocConsole()
- {
- ::AllocConsole();
- freopen( "CONIN$", "r", stdin );
- freopen( "CONOUT$", "w", stdout );
- freopen( "CONOUT$", "w", stderr );
- }
- private:
- bool hasMutex = false;
- void ObtainMutex()
- {
- // we alrady have the mutex so dont wait for it (which would result in deadlock)
- if ( hasMutex == true )
- return;
- while ( WaitForSingleObject( header->mutex, 2 * 1000 ) == WAIT_TIMEOUT )
- {
- Warning( "Timeout waiting for mutex '%s'\n", objName );
- }
- hasMutex = true;
- }
- public:
- void ReleaseMutex()
- {
- hasMutex = false;
- if (::ReleaseMutex( header->mutex ) == 0 )
- {
- Warning( "Error releasing mutex: %d\n", GetLastError() );
- }
- }
- };
- template <>
- inline IPC_Peer<IPC_Type::Server>::IPC_Peer( const char *objectName, DWORD incomingSize, DWORD outgoingSize, std::function<void( IPC_Packet * )> callback )
- : totalSize( incomingSize + outgoingSize + IPC_HEADER_SIZE ), callback( callback )
- {
- sprintf( objName, "Local\\%s", objectName );
- fileHandle = CreateFileMappingA(
- INVALID_HANDLE_VALUE, // use paging file
- NULL, // default security
- PAGE_READWRITE, // read/write access
- 0, // maximum object size (high-order DWORD)
- totalSize + 50, // maximum object size (low-order DWORD)
- objName ); // name of mapping object
- if ( fileHandle == NULL || fileHandle == INVALID_HANDLE_VALUE )
- {
- this->Warning( "Could not create file mapping object (%d).\n",
- GetLastError() );
- return;
- }
- header = (IPC_Header *)MapViewOfFile( fileHandle, FILE_MAP_ALL_ACCESS, 0, 0, totalSize );
- if ( header == NULL )
- {
- this->Warning( "Could not map header (%d).\n", GetLastError() );
- CloseHandle( fileHandle );
- return;
- }
- incoming = ( (BYTE *)header ) + IPC_HEADER_SIZE;
- if ( incoming == NULL )
- {
- this->Warning( "Could not map incoming (%d).\n", GetLastError() );
- CloseHandle( fileHandle );
- return;
- }
- outgoing = ( (BYTE *)header ) + IPC_HEADER_SIZE + incomingSize;
- if ( outgoing == NULL )
- {
- this->Warning( "Could not map outgoing (%d).\n", GetLastError() );
- CloseHandle( fileHandle );
- return;
- }
- peerId = 0;
- // clean out buffers
- memset( incoming, 0, incomingSize );
- memset( outgoing, 0, outgoingSize );
- header->mutex = CreateMutexA( NULL, FALSE, NULL );
- header->lastClient = 1;
- header->incomingSize = incomingSize;
- header->outgoingSize = outgoingSize;
- valid = true;
- }
- template <>
- inline IPC_Peer<IPC_Type::Client>::IPC_Peer( const char *objectName, DWORD incomingSize, DWORD outgoingSize, std::function<void( IPC_Packet * )> callback )
- : totalSize( incomingSize + outgoingSize + IPC_HEADER_SIZE ), callback( callback )
- {
- sprintf( objName, "Local\\%s", objectName );
- fileHandle = OpenFileMappingA(
- FILE_MAP_ALL_ACCESS, // read/write access
- FALSE, // do not inherit the name
- objName ); // name of mapping object
- if ( fileHandle == NULL || fileHandle == INVALID_HANDLE_VALUE )
- {
- this->Warning( "Could not create file mapping object (%d).\n",
- GetLastError() );
- return;
- }
- header = (IPC_Header *)MapViewOfFile( fileHandle, FILE_MAP_ALL_ACCESS, 0, 0, totalSize );
- if ( header == NULL )
- {
- this->Warning( "Could not map header (%d).\n", GetLastError() );
- CloseHandle( fileHandle );
- return;
- }
- // in the client the incoming and outgoing buffers are flipped
- incoming = ( (BYTE *)header ) + IPC_HEADER_SIZE + incomingSize;
- if ( incoming == NULL )
- {
- this->Warning( "Could not map incoming (%d).\n", GetLastError() );
- CloseHandle( fileHandle );
- return;
- }
- outgoing = ( (BYTE *)header ) + IPC_HEADER_SIZE;
- if ( outgoing == NULL )
- {
- this->Warning( "Could not map outgoing (%d).\n", GetLastError() );
- CloseHandle( fileHandle );
- return;
- }
- ObtainMutex();
- {
- peerId = header->lastClient;
- header->lastClient += 1;
- }
- ReleaseMutex();
- valid = true;
- }
- // Create and send a packet
- template <>
- template <typename T>
- inline void IPC_Peer<IPC_Type::Client>::CreateAndSendPacket( T *data )
- {
- assert( valid );
- IPC_Packet *p = TrailingAllocate<IPC_Packet>( sizeof( T ) );
- Defer( delete[] p );
- p->from = peerId;
- p->to = 0;
- p->size = sizeof( T );
- memcpy( p->data, data, sizeof( T ) );
- SendPacket( p );
- }
- template <>
- template <typename T>
- inline void IPC_Peer<IPC_Type::Server>::CreateAndSendPacketTo( T *data, DWORD target )
- {
- assert( valid );
- IPC_Packet *p = TrailingAllocate<IPC_Packet>( sizeof( T ) );
- Defer( delete[] p );
- p->from = peerId;
- p->to = target;
- p->size = sizeof( T );
- memcpy( p->data, data, sizeof( T ) );
- SendPacket( p );
- }
- using IPC_Server = IPC_Peer<IPC_Type::Server>;
- using IPC_Client = IPC_Peer<IPC_Type::Client>;
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement