Advertisement
Guest User

Untitled

a guest
Oct 19th, 2017
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.31 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Threading;
  4. using System.Linq;
  5. using System.Linq.Expressions;
  6. using System.Threading.Tasks;
  7. using Akka.Actor;
  8. using Akka.Util.Internal;
  9. using CoinMM.Messages;
  10. using Newtonsoft.Json.Schema;
  11.  
  12. namespace CoinMM
  13. {
  14. public class Broker : UntypedActor
  15. {
  16. protected override void OnReceive(object message)
  17. {
  18. var order = message as Order;
  19. if (order != null)
  20. {
  21. // Send order to api..
  22. }
  23. }
  24. }
  25.  
  26. public class BrokerWithMarketData : ReceiveActor
  27. {
  28. private Func<string, Task<MarketData>> marketDataFunction;
  29. protected Dictionary<string,List<IActorRef>> subscriptions = new Dictionary<string,List<IActorRef>>();
  30. protected Dictionary<string,MarketData> marketDataDict = new Dictionary<string,MarketData>();
  31.  
  32. private string name;
  33. string[] supportedSymbols;
  34.  
  35. class GetMarketDataRequest
  36. {
  37. public List<string> subscriptions;
  38.  
  39. public GetMarketDataRequest(List<string> subscriptions)
  40. {
  41. this.subscriptions = subscriptions;
  42. }
  43. }
  44.  
  45.  
  46. public BrokerWithMarketData( string[] supportedSymbols, Func<string, Task<MarketData>> marketDataFunction, string name)
  47. {
  48. this.name = name;
  49. this.marketDataFunction = marketDataFunction;
  50. this.supportedSymbols = supportedSymbols;
  51.  
  52. Receive<string>(OnName);
  53.  
  54. Receive<MarketDataListRequest>(OnMarketDataListRequest);
  55. Receive<MarketDataSubscription>(OnMarketDataSubscription);
  56.  
  57. Receive<GetMarketDataRequest>(OnGetMarketDataRequest);
  58. Receive<List<MarketData>>(OnMarketDataReceived);
  59.  
  60. Receive<Order>(OnOrder);
  61.  
  62.  
  63. Context.System.Scheduler.ScheduleTellRepeatedly(
  64. TimeSpan.FromSeconds(2),
  65. TimeSpan.FromSeconds(30),
  66. Self,
  67. new GetMarketDataRequest(this.subscriptions.Keys.ToList()), ActorRefs.NoSender); //or ActorRefs.Nobody or som
  68. }
  69.  
  70. bool OnName (string d)
  71. {
  72. Sender.Tell(this.name, Self);
  73. return true;
  74. }
  75.  
  76. /*
  77.  
  78. MarketStatus ConvertToMarketStatus(MarketData marketData)
  79. {
  80. if (marketData != null)
  81. {
  82. MarketStatus marketStatus = new MarketStatus
  83. {
  84. symbol = marketData.symbol,
  85. Buy = new MarketPrice {Price = marketData.bid, Qty = 10.0m},
  86. Sell = new MarketPrice {Price = marketData.ask, Qty = 10.0m}
  87. };
  88.  
  89. //marketStatus.Buy.broker = name;
  90. //marketStatus.Sell.broker = name;
  91. return marketStatus;
  92. }
  93. return null;
  94. }*/
  95.  
  96.  
  97. bool IsSupported(string symbol)
  98. {
  99. var x = supportedSymbols.FirstOrDefault(s => s == symbol);
  100. return x != null;
  101. }
  102.  
  103. protected bool OnMarketDataSubscription(MarketDataSubscription subscription)
  104. {
  105. if ( (subscription != null) && (subscription.symbol != null) )
  106. {
  107. if (IsSupported(subscription.symbol))
  108. {
  109. Console.ForegroundColor = ConsoleColor.DarkMagenta;
  110. Console.WriteLine($"MarketData Adding subscriber for {subscription.symbol} @ {name} : {subscription.subscriber} ");
  111. List<IActorRef> subscribers;
  112. if (!subscriptions.TryGetValue(subscription.symbol, out subscribers))
  113. {
  114. subscribers = new List<IActorRef>();
  115. subscriptions.Add(subscription.symbol, subscribers);
  116. }
  117.  
  118. subscribers.Add(subscription.subscriber); // This adds subscription with the Reference supplied by the subscriber; this can be different to SENDER (happens if sender runs in a different thread)
  119.  
  120. // Sender needs to be replied to, BECAUSE different Threads in the Actors have a different "#" actor reference.
  121. Sender.Tell(new MarketDataSubscriptionResult(subscription.symbol, true));
  122.  
  123. }
  124. else
  125. {
  126. Console.WriteLine($"Ignoring subscription because broker does not support it: {subscription.symbol} @ {name}");
  127. Sender.Tell(new MarketDataSubscriptionResult(subscription.symbol, false));
  128. }
  129.  
  130. }
  131. return true;
  132. }
  133.  
  134.  
  135.  
  136.  
  137.  
  138. bool OnGetMarketDataRequest (GetMarketDataRequest request)
  139. {
  140. // This runs in main thread, and has access to Self and variables.
  141. var subscriptionSymbols = this.subscriptions.Keys.ToList();
  142. Console.WriteLine($"{name} is getting market Data for {subscriptionSymbols.Count} subscriptions..");
  143.  
  144. var tasks = subscriptionSymbols.Select( symbol => {
  145. var task = marketDataFunction(symbol);
  146. task.Start();
  147. return task;
  148. });
  149. Task.WaitAll(tasks.ToArray() );
  150. Console.WriteLine("All Market Data was received");
  151.  
  152.  
  153.  
  154. var marketDataList = tasks
  155. .Select( task => task.Result)
  156. .ToList();
  157.  
  158. Self.Tell(marketDataList);
  159.  
  160. return true;
  161. }
  162.  
  163. bool OnMarketDataListRequest(MarketDataListRequest request)
  164. {
  165. var marketDataList = this.marketDataDict.Values.ToList();
  166. Sender.Tell( new MarketDataListResponse { marketData = marketDataList} );
  167. return true;
  168. }
  169.  
  170.  
  171. bool OnMarketDataReceived( List<MarketData> marketDataList)
  172. {
  173. Console.WriteLine($"Broker {name} has received data. Forwarding data to subscribers..");
  174. //return true;
  175.  
  176. if (marketDataList != null)
  177. {
  178.  
  179. try
  180. {
  181. marketDataList.ForEach(marketData =>
  182. {
  183. // Update LastMarketData dictionary
  184. if (marketDataDict.ContainsKey(marketData.symbol))
  185. {
  186. marketDataDict[marketData.symbol] = marketData;
  187. }
  188. else
  189. {
  190. marketDataDict.Add(marketData.symbol , marketData);;
  191. }
  192.  
  193.  
  194. // Send MarketData to all subscribers tht are subscribed for the symbol
  195. List<IActorRef> subscribers;
  196. if (subscriptions.TryGetValue(marketData.symbol, out subscribers))
  197. {
  198. if (subscribers.Count == 0)
  199. {
  200. Console.WriteLine($"Broker {name}:{marketData.symbol} has no subscribers!");
  201. }
  202. subscribers.ForEach(subscriber =>
  203. {
  204. Console.WriteLine($"Sending Market data to subscriber {subscriber} {marketData.symbol} {marketData.last}..");
  205. subscriber.Tell(marketData);
  206. });
  207. }
  208. });
  209. }
  210. catch (Exception ex)
  211. {
  212. Console.WriteLine($"Broker {name} SendMarketData Exception {ex.Message}.");
  213. }
  214. }
  215. return true;
  216. }
  217.  
  218. protected bool OnOrder(Order order)
  219. {
  220. if (order != null)
  221. {
  222. // Send order to api..
  223. Console.WriteLine("Sending Order " + order.symbol + order.limitPrice + order.qty);
  224. }
  225. return true;
  226. }
  227.  
  228.  
  229. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement