Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- module networking.connectionhandler;
- import core.thread;
- import networking.packets.packet;
- import networking.packets.packetregistry;
- import logger;
- import std.string;
- import std.socket;
- import std.datetime;
- import std.algorithm;
- import std.math;
- enum packetSize = 1024;
- class ConnectionHandler {
- private bool _running = false;
- private bool _connected = false;
- private bool _closeRequested = false;
- private Socket _socket;
- private Address _address;
- private Thread _receiveThread;
- private Thread _sendThread;
- alias PacketData = ubyte[];
- private PacketData[] _toSendQueue;
- private Object _queueSync;
- private SysTime _lastKeepAlive;
- private StopWatch _keepAliveSw;
- enum keepAliveDelaySecs = 2;
- enum statusPacketDelay = 2;
- enum maxTimeout = 15;
- this() {
- _queueSync = new Object();
- }
- /// Returns true if the networking thread is running.
- bool running() { return _running; }
- /// Returns true if the connection has been established.
- bool connected() { return _connected; }
- void connect(string hostname, ushort port) {
- _address = getAddress(hostname, port)[0];
- start();
- }
- /// Asynchronously closes the networking thread.
- void close() {
- if(!_running)
- throwException("The networking thread isn't running");
- _closeRequested = true;
- }
- void sendPacket(OutPacket packet) {
- BinaryWriter w = new BinaryWriter();
- w.write!uint(0); // Reserved for packet size
- w.write!uint(packet.id);
- packet.write(w);
- // Write the packet size, not including the size itself
- w.position = 0;
- w.write!uint(w.buffer.length - 4);
- // Sending packets bigger than max packet size
- if(w.buffer.length > packetSize) {
- static uint bigPacketId = 0;
- bigPacketId++;
- uint totalSize = w.buffer.length;
- alias chunkSize = BigPacket.chunkSize;
- uint totalChunks = cast(uint)ceil(totalSize / cast(float)chunkSize);
- log("Sending big packet of total size %s, no of cunks %s", totalSize, totalChunks);
- OutPacket[] packets;
- for(int chunkId = 0; chunkId < totalChunks; chunkId++) {
- int chunkMin = chunkId * chunkSize;
- int chunkMax = min(totalSize, (chunkId + 1) * chunkSize);
- ubyte[] chunk = w.buffer[chunkMin .. chunkMax];
- BigPacket bigPacket = new BigPacket(bigPacketId, chunkId, totalSize, chunk);
- packets ~= bigPacket;
- }
- foreach(bigPacket; packets)
- sendPacket(bigPacket);
- return;
- }
- synchronized(_queueSync) {
- _toSendQueue ~= w.buffer;
- }
- }
- private void start() {
- _running = true;
- _socket = new Socket(AddressFamily.INET, SocketType.STREAM);
- _socket.blocking = true;
- log("Connecting to %s", _address);
- while(true) {
- try {
- _socket.connect(_address);
- break;
- }
- catch(Throwable e) {
- log("Could not connect to the server. Retrying in a while.");
- Thread.sleep(seconds(5));
- }
- }
- _connected = true;
- log("Connected");
- _sendThread = new Thread(&networkingThreadFuncWrapper!thread_sendFunc);
- _sendThread.start();
- _receiveThread = new Thread(&networkingThreadFuncWrapper!thread_receiveFunc);
- _receiveThread.start();
- }
- private void networkingThreadFuncWrapper(alias Func)() {
- try
- Func();
- catch(Throwable e) {
- log(e.toString);
- onConnectionLost();
- }
- }
- private void onConnectionLost() {
- log("Connection lost");
- _connected = false;
- _socket.shutdown(SocketShutdown.BOTH);
- _socket.close();
- start();
- }
- private void thread_sendFunc() {
- _keepAliveSw.start();
- while(true) {
- synchronized(_queueSync) {
- while(_toSendQueue.length) {
- PacketData data = _toSendQueue[0];
- if(_socket.send(data, SocketFlags.NONE) == Socket.ERROR)
- throw new Exception("Could not send data.");
- _toSendQueue = _toSendQueue[1 .. $];
- // log("Sent packet of size %s", data.length);
- }
- }
- if(_keepAliveSw.peek.seconds >= statusPacketDelay) {
- _keepAliveSw.reset();
- sendPacket(new StatusPacket());
- }
- Thread.sleep(msecs(1));
- }
- }
- private void thread_receiveFunc() {
- while(true) {
- long bytesReceived = -1; // -1 is no data received
- ubyte[packetSize] buffer;
- Address from;
- bytesReceived = _socket.receiveFrom(buffer, SocketFlags.NONE, from);
- if(bytesReceived > 0) {
- log("Received data from %s, length %s", from, bytesReceived);
- BinaryReader r = new BinaryReader(buffer[0 .. cast(int)bytesReceived]);
- uint size = r.read!uint;
- uint id = r.read!uint;
- // Don't accept packets if connection not established
- if(!_connected) {
- // Handshake procedure
- if(id == 2) {
- log("Server wants to establish connection");
- HandshakePacket inPacket = new HandshakePacket();
- inPacket.read(r);
- if(inPacket.serverProtocol != app.protocolVersion) {
- sendPacket(new HandshakePacket(false, "Protocol versions aren't the same. Server is %s. Client is %s.".format(inPacket.serverProtocol, app.protocolVersion)));
- continue;
- }
- // TODO: Password verification
- sendPacket(new HandshakePacket(true, "At your service."));
- log("Connection established");
- _connected = true;
- _lastKeepAlive = Clock.currTime();
- _keepAliveSw.reset();
- _keepAliveSw.start();
- }
- continue;
- }
- if(id == 0) {
- _lastKeepAlive = Clock.currTime();
- continue;
- }
- auto packet = instantiatePacket(id);
- packet.read(r);
- }
- Thread.sleep(msecs(1));
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement