Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Linq;
- using System.Net;
- using System.Net.Sockets;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Collections.Concurrent;
- namespace IntegrationTests
- {
- public class Client : IDisposable
- {
- static private ulong _lastId = 1;
- private ulong _id;
- private bool _disposed;
- protected class ReceiveContext
- {
- public const int _bufferSize = 1024;
- public byte[] _buffer = new byte[_bufferSize]; // Contains bytes from one receive
- public StringBuilder _stringBuilder = new StringBuilder(); // Contains bytes for multiple receives in order to build message up to delim
- };
- protected enum ConnectionState
- {
- DISCONNECTED
- , LOGGING_IN
- , CONNECTED
- , ERROR
- , SHUTTING_DOWN
- , SHUTDOWN
- };
- protected string _host;
- protected int _port;
- protected TimeSpan _timeout;
- protected TimeSpan _reconnectAttemptInterval;
- protected string _sessionId;
- protected Socket _socket;
- protected object _lockNumOutstandingAsyncOps;
- protected int _numOutstandingAsyncOps;
- protected int _connectionState;
- /// <summary>
- /// Manages the state machine that is our connection state
- /// Issues Connect, Disconnect, and Receive calls
- /// </summary>
- protected Thread _processingThread;
- /// <summary>
- /// Startup should not return until we are connected and logged in, or it has failed in the attempt for the duration of the timeout
- /// </summary>
- protected ManualResetEvent _connectedEvent;
- /// <summary>
- /// We need a signalling mechanism outside of the connection state, because we do not want
- /// Startup, Shutdown, and the ProcessingThread competing to set the state.
- /// This flag is set from the thread calling Startup and Shutdown,
- /// while ConnectionState is set from the Processing thread or async send/receive calls and callbacks resulting from them
- /// </summary>
- protected ManualResetEvent _shutdownEvent;
- BlockingCollection<string> _messageQueue;
- public Client(string host, int port, string sessionId, TimeSpan timeoutMilliseconds, TimeSpan reconnectAttemptInterval)
- {
- _id = _lastId++;
- _disposed = false;
- _host = host;
- _port = port;
- _sessionId = sessionId;
- _timeout = timeoutMilliseconds;
- _reconnectAttemptInterval = reconnectAttemptInterval;
- _socket = null;
- _lockNumOutstandingAsyncOps = new object();
- _numOutstandingAsyncOps = 0;
- _connectionState = (int)ConnectionState.DISCONNECTED;
- _processingThread = new Thread(ProcessingThreadProc);
- _connectedEvent = new ManualResetEvent(false);
- _shutdownEvent = new ManualResetEvent(false);
- _messageQueue = new BlockingCollection<string>();
- }
- public void Dispose()
- {
- Dispose(true);
- GC.SuppressFinalize(this);
- }
- protected virtual void Dispose(bool disposing)
- {
- if(_disposed)
- {
- return;
- }
- if (disposing)
- {
- Shutdown();
- _messageQueue.Dispose();
- }
- _disposed = true;
- }
- /// <summary>
- /// Coming up with names that don't match the type is annoying
- /// </summary>
- protected ConnectionState ConnectionStateProperty
- {
- get { return (ConnectionState)_connectionState; }
- set { Interlocked.Exchange(ref _connectionState, (int)value); }
- }
- //--------------------------------------------------------------------------------------------------
- public void Startup()
- {
- if (_processingThread.IsAlive)
- {
- // Error
- throw new ApplicationException(string.Format("Client #{0} Call to Startup, but processing thread is already started"));
- }
- Debug.WriteLine(string.Format("Client #{0} Starting up.", _id));
- // We want this call to be synchronous, so wait until we are actually connected to return
- // That way, the caller will not start trying to send data until we connect
- _connectedEvent.Reset();
- _processingThread.Start();
- if (_connectedEvent.WaitOne(_timeout))
- {
- Debug.WriteLine(string.Format("Client #{0} Started.", _id));
- }
- else
- {
- _shutdownEvent.Set();
- throw new ApplicationException(string.Format("Client #{0} Startup timed out.", _id));
- }
- }
- //--------------------------------------------------------------------------------------------------
- public void Shutdown()
- {
- Debug.WriteLine(string.Format("Client #{0} Shutting Down...", _id));
- // Signal out processing thread that we want to shutdown
- _shutdownEvent.Set();
- // Wait for the thread to complete
- if (_processingThread.IsAlive)
- {
- _processingThread.Join();
- }
- // Done
- Debug.WriteLine(string.Format("Client #{0} Shutdown Complete...", _id));
- }
- //--------------------------------------------------------------------------------------------------
- protected void ProcessingThreadProc()
- {
- try
- {
- System.Threading.Thread.CurrentThread.Name = "Processing Thread";
- while(!_shutdownEvent.WaitOne(0))
- {
- switch (ConnectionStateProperty)
- {
- case ConnectionState.DISCONNECTED:
- {
- // Connect to the server
- if (!Connect())
- {
- // Try again, after a little wait, until we connect
- System.Threading.Thread.Sleep(_reconnectAttemptInterval);
- }
- else
- {
- // Start reading from the socket
- lock (_lockNumOutstandingAsyncOps)
- {
- ReceiveContext context = new ReceiveContext();
- _numOutstandingAsyncOps = 1;
- _socket.BeginReceive(context._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), context);
- }
- // Successfully connected
- ConnectionStateProperty = ConnectionState.LOGGING_IN;
- }
- break;
- }
- case ConnectionState.LOGGING_IN:
- {
- // Send the Login request
- string loginRequest = string.Format("loginstuff{0}", _sessionId);
- var data = Encoding.ASCII.GetBytes(loginRequest);
- lock (_lockNumOutstandingAsyncOps)
- {
- Debug.WriteLine(string.Format("Client #{0} Sending Login Request: {1}"
- , _id, loginRequest));
- ++_numOutstandingAsyncOps;
- _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket);
- }
- ConnectionStateProperty = ConnectionState.CONNECTED;
- // Signal Startup(), if it happend to be waiting
- _connectedEvent.Set();
- break;
- }
- case ConnectionState.ERROR:
- {
- // If anything went wrong we are going to go to the disconnected state which will cause a reconnection on the next loop iteration
- Disconnect();
- ConnectionStateProperty = ConnectionState.DISCONNECTED;
- break;
- }
- case ConnectionState.CONNECTED:
- {
- // TODO - We don't wan't to keep looping with no yield
- // We should wait to do the next iteration until
- // A) There is an item on the queue to be processed or
- // B) The connection state has changed
- // Currently, I only wait on A and we are polling for B
- string message;
- if( !_messageQueue.TryTake(out message, _timeout) )
- {
- // Timed out
- // Nothing to process, go on to the next iteration in order to handle connection status changes
- break;
- }
- else
- {
- if (!string.IsNullOrEmpty(message))
- {
- Debug.WriteLine(string.Format("Client #{0} Processed Received Data: {1}"
- , _id, message));
- }
- }
- break;
- }
- }
- }
- // We were signalled to shutdown
- // Disconnect the socket
- ConnectionStateProperty = ConnectionState.SHUTTING_DOWN;
- Disconnect();
- ConnectionStateProperty = ConnectionState.SHUTDOWN;
- }
- catch (Exception e)
- {
- Debug.WriteLine(string.Format("Client #{0} Unhandled exception caught in processing thread. Exception: {1}"
- , _id, e.ToString()));
- }
- Debug.WriteLine(string.Format("Client #{0} Processing Thread exiting."
- , _id));
- }
- public void MakeRequest(string thingy)
- {
- if (ConnectionStateProperty != ConnectionState.CONNECTED)
- {
- // Error - Cannot log in while not connected
- throw new ApplicationException(string.Format("Client #{0} MakeRequest was called for thingy{1}, but client is not connected."
- , _id, thingy));
- }
- string message = string.Format("requeststuff{0}", thingy);
- var data = Encoding.ASCII.GetBytes(message);
- lock (_lockNumOutstandingAsyncOps)
- {
- Debug.WriteLine(string.Format("Client #{0} Sending Request: {1}"
- , _id, message));
- ++_numOutstandingAsyncOps;
- _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket);
- }
- }
- //--------------------------------------------------------------------------------------------------
- protected bool Connect()
- {
- Disconnect();
- IPHostEntry ipHostInfo = Dns.GetHostEntry(_host);
- IPAddress[] ipV4Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetwork).ToArray();
- IPAddress[] ipV6Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetworkV6).ToArray();
- IPEndPoint endpoint = new IPEndPoint(ipV4Addresses[0], _port);
- _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
- _socket.ReceiveTimeout = _timeout.Milliseconds;
- _socket.SendTimeout = _timeout.Milliseconds;
- try
- {
- _socket.Connect(endpoint);
- }
- catch (Exception e)
- {
- // Error
- Debug.WriteLine(string.Format("Client #{0} Exception caught while attempting to Connect. Exception: {1}"
- , _id, e.ToString()));
- return false;
- }
- Debug.WriteLine(string.Format("Client #{0} connected to: {1}", _id, _socket.RemoteEndPoint.ToString()));
- return true;
- }
- protected void Disconnect()
- {
- Debug.WriteLine(string.Format("Client #{0} Disconnecting...", _id));
- // We need to wait here until all outstanding async operations complete
- // In order to avoid getting 'Object was disposed' exceptions in those async ops that use the socket
- lock (_lockNumOutstandingAsyncOps)
- {
- while (_numOutstandingAsyncOps > 0)
- {
- Monitor.Wait(_lockNumOutstandingAsyncOps);
- }
- if (_socket != null)
- {
- if (_socket.Connected)
- {
- _socket.Shutdown(SocketShutdown.Both);
- }
- _socket.Close();
- _socket = null;
- }
- }
- Debug.WriteLine(string.Format("Client #{0} Disconnected...", _id));
- }
- protected void OnReceive(IAsyncResult asyncResult)
- {
- ReceiveContext context = (ReceiveContext)asyncResult.AsyncState;
- string data = null;
- try
- {
- int bytesReceived = _socket.EndReceive(asyncResult);
- data = Encoding.ASCII.GetString(context._buffer, 0, bytesReceived);
- // If the remote host shuts down the Socket connection with the Shutdown method, and all available data has been received,
- // the EndReceive method will complete immediately and return zero bytes
- if (bytesReceived > 0)
- {
- StringBuilder stringBuilder = context._stringBuilder.Append(data);
- int index = -1;
- do
- {
- index = stringBuilder.ToString().IndexOf("#");
- if (index != -1)
- {
- string message = stringBuilder.ToString().Substring(0, index + 1);
- stringBuilder.Remove(0, index + 1);
- if (!_messageQueue.TryAdd(message, _timeout))
- {
- Debug.WriteLine(string.Format("Client #{0} Timed out while attempting to queue Received Data: {1}"
- , _id, message));
- }
- }
- } while (index != -1);
- }
- }
- catch (Exception e)
- {
- // Error
- Debug.WriteLine(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}"
- , _id, e.ToString()));
- ConnectionStateProperty = ConnectionState.ERROR;
- }
- finally
- {
- lock (_lockNumOutstandingAsyncOps)
- {
- --_numOutstandingAsyncOps;
- Monitor.Pulse(_lockNumOutstandingAsyncOps);
- }
- }
- // Issue the next async receive
- if (ConnectionStateProperty == ConnectionState.CONNECTED)
- {
- lock (_lockNumOutstandingAsyncOps)
- {
- try
- {
- ++_numOutstandingAsyncOps;
- ReceiveContext newContext = new ReceiveContext();
- _socket.BeginReceive(newContext._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), newContext);
- }
- catch(Exception e)
- {
- // Error
- Debug.WriteLine(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}"
- , _id, e.ToString()));
- --_numOutstandingAsyncOps;
- ConnectionStateProperty = ConnectionState.ERROR;
- }
- }
- }
- }
- protected void OnSend(IAsyncResult asyncResult)
- {
- try
- {
- int bytesSent = _socket.EndSend(asyncResult);
- }
- catch (Exception e)
- {
- Debug.WriteLine(string.Format("Client #{0} Exception caught OnSend. Exception: {1}"
- , _id, e.ToString()));
- ConnectionStateProperty = ConnectionState.ERROR;
- }
- finally
- {
- lock (_lockNumOutstandingAsyncOps)
- {
- --_numOutstandingAsyncOps;
- Monitor.Pulse(_lockNumOutstandingAsyncOps);
- }
- }
- }
- }
- }
- using System;
- using System.Collections.Generic;
- using System.Configuration;
- using System.Linq;
- using System.Text;
- using System.Threading.Tasks;
- namespace IntegrationTests
- {
- class Program
- {
- static void Main(string[] args)
- {
- string server = ConfigurationManager.AppSettings["server"];
- int port = int.Parse(ConfigurationManager.AppSettings["port"]);
- int numClients = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
- TimeSpan clientLifetime = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
- TimeSpan timeout = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
- TimeSpan reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
- List<string> clientIds = ConfigurationManager.GetSection("clientIds") as List<string>;
- try
- {
- Task[] tasks = new Task[numClients];
- for(int count = 0; count < numClients; ++count)
- {
- var copyOfcount = count;
- tasks[count] = Task.Factory.StartNew(() =>
- {
- try
- {
- Client client = new Client(server, port, clientIds[copyOfcount], timeout, reconnectInterval);
- client.Startup();
- client.MakeRequest("Request");
- System.Threading.Thread.Sleep(clientLifetime);
- client.Shutdown();
- }
- catch(Exception e)
- {
- Debug.WriteLine(string.Format("Caught an exception in task procedure. Exception: {0}"
- , e.ToString()));
- }
- });
- }
- Task.WaitAll(tasks);
- }
- catch (Exception e)
- {
- Debug.WriteLine(string.Format("Caught an exception in main. Exception: {0}"
- , e.ToString()));
- }
- }
- }
- }
Add Comment
Please, Sign In to add comment