Advertisement
szymski

Untitled

Dec 22nd, 2017
143
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.45 KB | None | 0 0
  1. module networking.connectionhandler;
  2.  
  3. import core.thread;
  4. import networking.packets.packet;
  5. import networking.packets.packetregistry;
  6. import logger;
  7. import std.string;
  8. import std.socket;
  9. import std.datetime;
  10. import std.algorithm;
  11. import std.math;
  12.  
  13. enum packetSize = 1024;
  14.  
  15. class ConnectionHandler {
  16. private bool _running = false;
  17. private bool _connected = false;
  18. private bool _closeRequested = false;
  19. private Socket _socket;
  20. private Address _address;
  21. private Thread _receiveThread;
  22. private Thread _sendThread;
  23.  
  24. alias PacketData = ubyte[];
  25. private PacketData[] _toSendQueue;
  26. private Object _queueSync;
  27.  
  28. private SysTime _lastKeepAlive;
  29. private StopWatch _keepAliveSw;
  30.  
  31. enum keepAliveDelaySecs = 2;
  32. enum statusPacketDelay = 2;
  33. enum maxTimeout = 15;
  34.  
  35. this() {
  36. _queueSync = new Object();
  37. }
  38.  
  39. /// Returns true if the networking thread is running.
  40. bool running() { return _running; }
  41.  
  42. /// Returns true if the connection has been established.
  43. bool connected() { return _connected; }
  44.  
  45. void connect(string hostname, ushort port) {
  46. _address = getAddress(hostname, port)[0];
  47. start();
  48. }
  49.  
  50. /// Asynchronously closes the networking thread.
  51. void close() {
  52. if(!_running)
  53. throwException("The networking thread isn't running");
  54.  
  55. _closeRequested = true;
  56. }
  57.  
  58. void sendPacket(OutPacket packet) {
  59. BinaryWriter w = new BinaryWriter();
  60. w.write!uint(0); // Reserved for packet size
  61. w.write!uint(packet.id);
  62. packet.write(w);
  63.  
  64. // Write the packet size, not including the size itself
  65. w.position = 0;
  66. w.write!uint(w.buffer.length - 4);
  67.  
  68. // Sending packets bigger than max packet size
  69. if(w.buffer.length > packetSize) {
  70. static uint bigPacketId = 0;
  71. bigPacketId++;
  72.  
  73. uint totalSize = w.buffer.length;
  74. alias chunkSize = BigPacket.chunkSize;
  75. uint totalChunks = cast(uint)ceil(totalSize / cast(float)chunkSize);
  76.  
  77. log("Sending big packet of total size %s, no of cunks %s", totalSize, totalChunks);
  78.  
  79. OutPacket[] packets;
  80.  
  81. for(int chunkId = 0; chunkId < totalChunks; chunkId++) {
  82. int chunkMin = chunkId * chunkSize;
  83. int chunkMax = min(totalSize, (chunkId + 1) * chunkSize);
  84. ubyte[] chunk = w.buffer[chunkMin .. chunkMax];
  85. BigPacket bigPacket = new BigPacket(bigPacketId, chunkId, totalSize, chunk);
  86. packets ~= bigPacket;
  87. }
  88.  
  89. foreach(bigPacket; packets)
  90. sendPacket(bigPacket);
  91.  
  92. return;
  93. }
  94.  
  95. synchronized(_queueSync) {
  96. _toSendQueue ~= w.buffer;
  97. }
  98. }
  99.  
  100. private void start() {
  101. _running = true;
  102.  
  103. _socket = new Socket(AddressFamily.INET, SocketType.STREAM);
  104. _socket.blocking = true;
  105. log("Connecting to %s", _address);
  106. while(true) {
  107. try {
  108. _socket.connect(_address);
  109. break;
  110. }
  111. catch(Throwable e) {
  112. log("Could not connect to the server. Retrying in a while.");
  113. Thread.sleep(seconds(5));
  114. }
  115. }
  116. _connected = true;
  117. log("Connected");
  118.  
  119. _sendThread = new Thread(&networkingThreadFuncWrapper!thread_sendFunc);
  120. _sendThread.start();
  121.  
  122. _receiveThread = new Thread(&networkingThreadFuncWrapper!thread_receiveFunc);
  123. _receiveThread.start();
  124. }
  125.  
  126. private void networkingThreadFuncWrapper(alias Func)() {
  127. try
  128. Func();
  129. catch(Throwable e) {
  130. log(e.toString);
  131. onConnectionLost();
  132. }
  133. }
  134.  
  135. private void onConnectionLost() {
  136. log("Connection lost");
  137.  
  138. _connected = false;
  139. _socket.shutdown(SocketShutdown.BOTH);
  140. _socket.close();
  141.  
  142. start();
  143. }
  144.  
  145. private void thread_sendFunc() {
  146. _keepAliveSw.start();
  147.  
  148. while(true) {
  149. synchronized(_queueSync) {
  150. while(_toSendQueue.length) {
  151. PacketData data = _toSendQueue[0];
  152. if(_socket.send(data, SocketFlags.NONE) == Socket.ERROR)
  153. throw new Exception("Could not send data.");
  154. _toSendQueue = _toSendQueue[1 .. $];
  155. // log("Sent packet of size %s", data.length);
  156. }
  157. }
  158.  
  159. if(_keepAliveSw.peek.seconds >= statusPacketDelay) {
  160. _keepAliveSw.reset();
  161. sendPacket(new StatusPacket());
  162. }
  163.  
  164. Thread.sleep(msecs(1));
  165. }
  166. }
  167.  
  168. private void thread_receiveFunc() {
  169. while(true) {
  170. long bytesReceived = -1; // -1 is no data received
  171. ubyte[packetSize] buffer;
  172. Address from;
  173.  
  174. bytesReceived = _socket.receiveFrom(buffer, SocketFlags.NONE, from);
  175. if(bytesReceived > 0) {
  176. log("Received data from %s, length %s", from, bytesReceived);
  177.  
  178. BinaryReader r = new BinaryReader(buffer[0 .. cast(int)bytesReceived]);
  179. uint size = r.read!uint;
  180. uint id = r.read!uint;
  181.  
  182. // Don't accept packets if connection not established
  183. if(!_connected) {
  184. // Handshake procedure
  185. if(id == 2) {
  186. log("Server wants to establish connection");
  187.  
  188. HandshakePacket inPacket = new HandshakePacket();
  189. inPacket.read(r);
  190.  
  191. if(inPacket.serverProtocol != app.protocolVersion) {
  192. sendPacket(new HandshakePacket(false, "Protocol versions aren't the same. Server is %s. Client is %s.".format(inPacket.serverProtocol, app.protocolVersion)));
  193. continue;
  194. }
  195.  
  196. // TODO: Password verification
  197.  
  198. sendPacket(new HandshakePacket(true, "At your service."));
  199. log("Connection established");
  200. _connected = true;
  201. _lastKeepAlive = Clock.currTime();
  202. _keepAliveSw.reset();
  203. _keepAliveSw.start();
  204. }
  205.  
  206. continue;
  207. }
  208.  
  209. if(id == 0) {
  210. _lastKeepAlive = Clock.currTime();
  211. continue;
  212. }
  213.  
  214. auto packet = instantiatePacket(id);
  215. packet.read(r);
  216. }
  217.  
  218. Thread.sleep(msecs(1));
  219. }
  220. }
  221. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement