Guest User

Untitled

a guest
Nov 17th, 2017
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 20.29 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9.  
  10. using System.Collections.Concurrent;
  11.  
  12. namespace IntegrationTests
  13. {
  14. public class Client : IDisposable
  15. {
  16. static private ulong _lastId = 1;
  17. private ulong _id;
  18. private bool _disposed;
  19.  
  20. protected class ReceiveContext
  21. {
  22. public const int _bufferSize = 1024;
  23. public byte[] _buffer = new byte[_bufferSize]; // Contains bytes from one receive
  24. public StringBuilder _stringBuilder = new StringBuilder(); // Contains bytes for multiple receives in order to build message up to delim
  25. };
  26.  
  27. protected enum ConnectionState
  28. {
  29. DISCONNECTED
  30. , LOGGING_IN
  31. , CONNECTED
  32. , ERROR
  33. , SHUTTING_DOWN
  34. , SHUTDOWN
  35. };
  36.  
  37. protected string _host;
  38. protected int _port;
  39. protected TimeSpan _timeout;
  40. protected TimeSpan _reconnectAttemptInterval;
  41. protected string _sessionId;
  42. protected Socket _socket;
  43. protected object _lockNumOutstandingAsyncOps;
  44. protected int _numOutstandingAsyncOps;
  45. protected int _connectionState;
  46.  
  47. /// <summary>
  48. /// Manages the state machine that is our connection state
  49. /// Issues Connect, Disconnect, and Receive calls
  50. /// </summary>
  51. protected Thread _processingThread;
  52.  
  53. /// <summary>
  54. /// Startup should not return until we are connected and logged in, or it has failed in the attempt for the duration of the timeout
  55. /// </summary>
  56. protected ManualResetEvent _connectedEvent;
  57.  
  58. /// <summary>
  59. /// We need a signalling mechanism outside of the connection state, because we do not want
  60. /// Startup, Shutdown, and the ProcessingThread competing to set the state.
  61. /// This flag is set from the thread calling Startup and Shutdown,
  62. /// while ConnectionState is set from the Processing thread or async send/receive calls and callbacks resulting from them
  63. /// </summary>
  64. protected ManualResetEvent _shutdownEvent;
  65.  
  66. BlockingCollection<string> _messageQueue;
  67.  
  68. public Client(string host, int port, string sessionId, TimeSpan timeoutMilliseconds, TimeSpan reconnectAttemptInterval)
  69. {
  70. _id = _lastId++;
  71. _disposed = false;
  72. _host = host;
  73. _port = port;
  74. _sessionId = sessionId;
  75. _timeout = timeoutMilliseconds;
  76. _reconnectAttemptInterval = reconnectAttemptInterval;
  77. _socket = null;
  78. _lockNumOutstandingAsyncOps = new object();
  79. _numOutstandingAsyncOps = 0;
  80. _connectionState = (int)ConnectionState.DISCONNECTED;
  81. _processingThread = new Thread(ProcessingThreadProc);
  82. _connectedEvent = new ManualResetEvent(false);
  83. _shutdownEvent = new ManualResetEvent(false);
  84. _messageQueue = new BlockingCollection<string>();
  85. }
  86.  
  87. public void Dispose()
  88. {
  89. Dispose(true);
  90. GC.SuppressFinalize(this);
  91. }
  92.  
  93. protected virtual void Dispose(bool disposing)
  94. {
  95. if(_disposed)
  96. {
  97. return;
  98. }
  99.  
  100. if (disposing)
  101. {
  102. Shutdown();
  103. _messageQueue.Dispose();
  104. }
  105.  
  106. _disposed = true;
  107. }
  108.  
  109. /// <summary>
  110. /// Coming up with names that don't match the type is annoying
  111. /// </summary>
  112. protected ConnectionState ConnectionStateProperty
  113. {
  114. get { return (ConnectionState)_connectionState; }
  115. set { Interlocked.Exchange(ref _connectionState, (int)value); }
  116. }
  117.  
  118.  
  119. //--------------------------------------------------------------------------------------------------
  120. public void Startup()
  121. {
  122. if (_processingThread.IsAlive)
  123. {
  124. // Error
  125. throw new ApplicationException(string.Format("Client #{0} Call to Startup, but processing thread is already started"));
  126. }
  127.  
  128. Debug.WriteLine(string.Format("Client #{0} Starting up.", _id));
  129.  
  130. // We want this call to be synchronous, so wait until we are actually connected to return
  131. // That way, the caller will not start trying to send data until we connect
  132. _connectedEvent.Reset();
  133. _processingThread.Start();
  134.  
  135. if (_connectedEvent.WaitOne(_timeout))
  136. {
  137. Debug.WriteLine(string.Format("Client #{0} Started.", _id));
  138. }
  139. else
  140. {
  141. _shutdownEvent.Set();
  142. throw new ApplicationException(string.Format("Client #{0} Startup timed out.", _id));
  143. }
  144. }
  145.  
  146. //--------------------------------------------------------------------------------------------------
  147. public void Shutdown()
  148. {
  149. Debug.WriteLine(string.Format("Client #{0} Shutting Down...", _id));
  150.  
  151. // Signal out processing thread that we want to shutdown
  152. _shutdownEvent.Set();
  153.  
  154. // Wait for the thread to complete
  155. if (_processingThread.IsAlive)
  156. {
  157. _processingThread.Join();
  158. }
  159.  
  160. // Done
  161. Debug.WriteLine(string.Format("Client #{0} Shutdown Complete...", _id));
  162. }
  163.  
  164. //--------------------------------------------------------------------------------------------------
  165. protected void ProcessingThreadProc()
  166. {
  167. try
  168. {
  169. System.Threading.Thread.CurrentThread.Name = "Processing Thread";
  170.  
  171. while(!_shutdownEvent.WaitOne(0))
  172. {
  173. switch (ConnectionStateProperty)
  174. {
  175. case ConnectionState.DISCONNECTED:
  176. {
  177. // Connect to the server
  178. if (!Connect())
  179. {
  180. // Try again, after a little wait, until we connect
  181. System.Threading.Thread.Sleep(_reconnectAttemptInterval);
  182. }
  183. else
  184. {
  185. // Start reading from the socket
  186. lock (_lockNumOutstandingAsyncOps)
  187. {
  188. ReceiveContext context = new ReceiveContext();
  189. _numOutstandingAsyncOps = 1;
  190. _socket.BeginReceive(context._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), context);
  191. }
  192.  
  193. // Successfully connected
  194. ConnectionStateProperty = ConnectionState.LOGGING_IN;
  195. }
  196.  
  197. break;
  198. }
  199.  
  200. case ConnectionState.LOGGING_IN:
  201. {
  202. // Send the Login request
  203. string loginRequest = string.Format("loginstuff{0}", _sessionId);
  204. var data = Encoding.ASCII.GetBytes(loginRequest);
  205.  
  206. lock (_lockNumOutstandingAsyncOps)
  207. {
  208. Debug.WriteLine(string.Format("Client #{0} Sending Login Request: {1}"
  209. , _id, loginRequest));
  210.  
  211. ++_numOutstandingAsyncOps;
  212. _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket);
  213. }
  214.  
  215. ConnectionStateProperty = ConnectionState.CONNECTED;
  216.  
  217. // Signal Startup(), if it happend to be waiting
  218. _connectedEvent.Set();
  219.  
  220. break;
  221. }
  222.  
  223. case ConnectionState.ERROR:
  224. {
  225. // If anything went wrong we are going to go to the disconnected state which will cause a reconnection on the next loop iteration
  226. Disconnect();
  227. ConnectionStateProperty = ConnectionState.DISCONNECTED;
  228.  
  229. break;
  230. }
  231.  
  232. case ConnectionState.CONNECTED:
  233. {
  234. // TODO - We don't wan't to keep looping with no yield
  235. // We should wait to do the next iteration until
  236. // A) There is an item on the queue to be processed or
  237. // B) The connection state has changed
  238. // Currently, I only wait on A and we are polling for B
  239. string message;
  240. if( !_messageQueue.TryTake(out message, _timeout) )
  241. {
  242. // Timed out
  243. // Nothing to process, go on to the next iteration in order to handle connection status changes
  244. break;
  245. }
  246. else
  247. {
  248. if (!string.IsNullOrEmpty(message))
  249. {
  250. Debug.WriteLine(string.Format("Client #{0} Processed Received Data: {1}"
  251. , _id, message));
  252. }
  253. }
  254.  
  255. break;
  256. }
  257. }
  258. }
  259.  
  260. // We were signalled to shutdown
  261. // Disconnect the socket
  262. ConnectionStateProperty = ConnectionState.SHUTTING_DOWN;
  263. Disconnect();
  264. ConnectionStateProperty = ConnectionState.SHUTDOWN;
  265. }
  266. catch (Exception e)
  267. {
  268. Debug.WriteLine(string.Format("Client #{0} Unhandled exception caught in processing thread. Exception: {1}"
  269. , _id, e.ToString()));
  270. }
  271.  
  272. Debug.WriteLine(string.Format("Client #{0} Processing Thread exiting."
  273. , _id));
  274. }
  275.  
  276. public void MakeRequest(string thingy)
  277. {
  278. if (ConnectionStateProperty != ConnectionState.CONNECTED)
  279. {
  280. // Error - Cannot log in while not connected
  281. throw new ApplicationException(string.Format("Client #{0} MakeRequest was called for thingy{1}, but client is not connected."
  282. , _id, thingy));
  283. }
  284.  
  285. string message = string.Format("requeststuff{0}", thingy);
  286. var data = Encoding.ASCII.GetBytes(message);
  287.  
  288. lock (_lockNumOutstandingAsyncOps)
  289. {
  290. Debug.WriteLine(string.Format("Client #{0} Sending Request: {1}"
  291. , _id, message));
  292.  
  293. ++_numOutstandingAsyncOps;
  294. _socket.BeginSend(data, 0, data.Length, 0, new AsyncCallback(OnSend), _socket);
  295. }
  296. }
  297.  
  298. //--------------------------------------------------------------------------------------------------
  299. protected bool Connect()
  300. {
  301. Disconnect();
  302.  
  303. IPHostEntry ipHostInfo = Dns.GetHostEntry(_host);
  304. IPAddress[] ipV4Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetwork).ToArray();
  305. IPAddress[] ipV6Addresses = ipHostInfo.AddressList.Where(x => x.AddressFamily == AddressFamily.InterNetworkV6).ToArray();
  306. IPEndPoint endpoint = new IPEndPoint(ipV4Addresses[0], _port);
  307.  
  308. _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
  309. _socket.ReceiveTimeout = _timeout.Milliseconds;
  310. _socket.SendTimeout = _timeout.Milliseconds;
  311.  
  312. try
  313. {
  314. _socket.Connect(endpoint);
  315. }
  316. catch (Exception e)
  317. {
  318. // Error
  319. Debug.WriteLine(string.Format("Client #{0} Exception caught while attempting to Connect. Exception: {1}"
  320. , _id, e.ToString()));
  321. return false;
  322. }
  323.  
  324. Debug.WriteLine(string.Format("Client #{0} connected to: {1}", _id, _socket.RemoteEndPoint.ToString()));
  325. return true;
  326. }
  327.  
  328. protected void Disconnect()
  329. {
  330. Debug.WriteLine(string.Format("Client #{0} Disconnecting...", _id));
  331.  
  332. // We need to wait here until all outstanding async operations complete
  333. // In order to avoid getting 'Object was disposed' exceptions in those async ops that use the socket
  334. lock (_lockNumOutstandingAsyncOps)
  335. {
  336. while (_numOutstandingAsyncOps > 0)
  337. {
  338. Monitor.Wait(_lockNumOutstandingAsyncOps);
  339. }
  340.  
  341. if (_socket != null)
  342. {
  343. if (_socket.Connected)
  344. {
  345. _socket.Shutdown(SocketShutdown.Both);
  346. }
  347.  
  348. _socket.Close();
  349. _socket = null;
  350. }
  351. }
  352.  
  353. Debug.WriteLine(string.Format("Client #{0} Disconnected...", _id));
  354. }
  355.  
  356. protected void OnReceive(IAsyncResult asyncResult)
  357. {
  358. ReceiveContext context = (ReceiveContext)asyncResult.AsyncState;
  359. string data = null;
  360.  
  361. try
  362. {
  363. int bytesReceived = _socket.EndReceive(asyncResult);
  364. data = Encoding.ASCII.GetString(context._buffer, 0, bytesReceived);
  365.  
  366. // If the remote host shuts down the Socket connection with the Shutdown method, and all available data has been received,
  367. // the EndReceive method will complete immediately and return zero bytes
  368. if (bytesReceived > 0)
  369. {
  370. StringBuilder stringBuilder = context._stringBuilder.Append(data);
  371.  
  372. int index = -1;
  373. do
  374. {
  375. index = stringBuilder.ToString().IndexOf("#");
  376. if (index != -1)
  377. {
  378. string message = stringBuilder.ToString().Substring(0, index + 1);
  379. stringBuilder.Remove(0, index + 1);
  380.  
  381. if (!_messageQueue.TryAdd(message, _timeout))
  382. {
  383. Debug.WriteLine(string.Format("Client #{0} Timed out while attempting to queue Received Data: {1}"
  384. , _id, message));
  385. }
  386. }
  387. } while (index != -1);
  388. }
  389. }
  390. catch (Exception e)
  391. {
  392. // Error
  393. Debug.WriteLine(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}"
  394. , _id, e.ToString()));
  395.  
  396. ConnectionStateProperty = ConnectionState.ERROR;
  397. }
  398. finally
  399. {
  400. lock (_lockNumOutstandingAsyncOps)
  401. {
  402. --_numOutstandingAsyncOps;
  403. Monitor.Pulse(_lockNumOutstandingAsyncOps);
  404. }
  405. }
  406.  
  407. // Issue the next async receive
  408. if (ConnectionStateProperty == ConnectionState.CONNECTED)
  409. {
  410. lock (_lockNumOutstandingAsyncOps)
  411. {
  412. try
  413. {
  414. ++_numOutstandingAsyncOps;
  415.  
  416. ReceiveContext newContext = new ReceiveContext();
  417. _socket.BeginReceive(newContext._buffer, 0, ReceiveContext._bufferSize, SocketFlags.None, new AsyncCallback(OnReceive), newContext);
  418. }
  419. catch(Exception e)
  420. {
  421. // Error
  422. Debug.WriteLine(string.Format("Client #{0} Exception caught OnReceive. Exception: {1}"
  423. , _id, e.ToString()));
  424.  
  425. --_numOutstandingAsyncOps;
  426. ConnectionStateProperty = ConnectionState.ERROR;
  427. }
  428. }
  429. }
  430. }
  431.  
  432. protected void OnSend(IAsyncResult asyncResult)
  433. {
  434. try
  435. {
  436. int bytesSent = _socket.EndSend(asyncResult);
  437. }
  438. catch (Exception e)
  439. {
  440. Debug.WriteLine(string.Format("Client #{0} Exception caught OnSend. Exception: {1}"
  441. , _id, e.ToString()));
  442.  
  443. ConnectionStateProperty = ConnectionState.ERROR;
  444. }
  445. finally
  446. {
  447. lock (_lockNumOutstandingAsyncOps)
  448. {
  449. --_numOutstandingAsyncOps;
  450. Monitor.Pulse(_lockNumOutstandingAsyncOps);
  451. }
  452. }
  453. }
  454. }
  455. }
  456.  
  457. using System;
  458. using System.Collections.Generic;
  459. using System.Configuration;
  460. using System.Linq;
  461. using System.Text;
  462. using System.Threading.Tasks;
  463.  
  464. namespace IntegrationTests
  465. {
  466. class Program
  467. {
  468. static void Main(string[] args)
  469. {
  470. string server = ConfigurationManager.AppSettings["server"];
  471. int port = int.Parse(ConfigurationManager.AppSettings["port"]);
  472. int numClients = int.Parse(ConfigurationManager.AppSettings["numberOfClients"]);
  473. TimeSpan clientLifetime = TimeSpan.Parse(ConfigurationManager.AppSettings["clientLifetime"]);
  474. TimeSpan timeout = TimeSpan.Parse(ConfigurationManager.AppSettings["timeout"]);
  475. TimeSpan reconnectInterval = TimeSpan.Parse(ConfigurationManager.AppSettings["reconnectInterval"]);
  476. List<string> clientIds = ConfigurationManager.GetSection("clientIds") as List<string>;
  477.  
  478. try
  479. {
  480. Task[] tasks = new Task[numClients];
  481. for(int count = 0; count < numClients; ++count)
  482. {
  483. var copyOfcount = count;
  484. tasks[count] = Task.Factory.StartNew(() =>
  485. {
  486. try
  487. {
  488. Client client = new Client(server, port, clientIds[copyOfcount], timeout, reconnectInterval);
  489. client.Startup();
  490. client.MakeRequest("Request");
  491.  
  492. System.Threading.Thread.Sleep(clientLifetime);
  493.  
  494. client.Shutdown();
  495. }
  496. catch(Exception e)
  497. {
  498. Debug.WriteLine(string.Format("Caught an exception in task procedure. Exception: {0}"
  499. , e.ToString()));
  500. }
  501. });
  502. }
  503. Task.WaitAll(tasks);
  504. }
  505. catch (Exception e)
  506. {
  507. Debug.WriteLine(string.Format("Caught an exception in main. Exception: {0}"
  508. , e.ToString()));
  509. }
  510. }
  511. }
  512. }
Add Comment
Please, Sign In to add comment