Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Threading;
- using System.Linq;
- using System.Linq.Expressions;
- using System.Threading.Tasks;
- using Akka.Actor;
- using Akka.Util.Internal;
- using CoinMM.Messages;
- using Newtonsoft.Json.Schema;
- namespace CoinMM
- {
- public class Broker : UntypedActor
- {
- protected override void OnReceive(object message)
- {
- var order = message as Order;
- if (order != null)
- {
- // Send order to api..
- }
- }
- }
- public class BrokerWithMarketData : ReceiveActor
- {
- private Func<string, Task<MarketData>> marketDataFunction;
- protected Dictionary<string,List<IActorRef>> subscriptions = new Dictionary<string,List<IActorRef>>();
- protected Dictionary<string,MarketData> marketDataDict = new Dictionary<string,MarketData>();
- private string name;
- string[] supportedSymbols;
- class GetMarketDataRequest
- {
- public List<string> subscriptions;
- public GetMarketDataRequest(List<string> subscriptions)
- {
- this.subscriptions = subscriptions;
- }
- }
- public BrokerWithMarketData( string[] supportedSymbols, Func<string, Task<MarketData>> marketDataFunction, string name)
- {
- this.name = name;
- this.marketDataFunction = marketDataFunction;
- this.supportedSymbols = supportedSymbols;
- Receive<string>(OnName);
- Receive<MarketDataListRequest>(OnMarketDataListRequest);
- Receive<MarketDataSubscription>(OnMarketDataSubscription);
- Receive<GetMarketDataRequest>(OnGetMarketDataRequest);
- Receive<List<MarketData>>(OnMarketDataReceived);
- Receive<Order>(OnOrder);
- Context.System.Scheduler.ScheduleTellRepeatedly(
- TimeSpan.FromSeconds(2),
- TimeSpan.FromSeconds(30),
- Self,
- new GetMarketDataRequest(this.subscriptions.Keys.ToList()), ActorRefs.NoSender); //or ActorRefs.Nobody or som
- }
- bool OnName (string d)
- {
- Sender.Tell(this.name, Self);
- return true;
- }
- /*
- MarketStatus ConvertToMarketStatus(MarketData marketData)
- {
- if (marketData != null)
- {
- MarketStatus marketStatus = new MarketStatus
- {
- symbol = marketData.symbol,
- Buy = new MarketPrice {Price = marketData.bid, Qty = 10.0m},
- Sell = new MarketPrice {Price = marketData.ask, Qty = 10.0m}
- };
- //marketStatus.Buy.broker = name;
- //marketStatus.Sell.broker = name;
- return marketStatus;
- }
- return null;
- }*/
- bool IsSupported(string symbol)
- {
- var x = supportedSymbols.FirstOrDefault(s => s == symbol);
- return x != null;
- }
- protected bool OnMarketDataSubscription(MarketDataSubscription subscription)
- {
- if ( (subscription != null) && (subscription.symbol != null) )
- {
- if (IsSupported(subscription.symbol))
- {
- Console.ForegroundColor = ConsoleColor.DarkMagenta;
- Console.WriteLine($"MarketData Adding subscriber for {subscription.symbol} @ {name} : {subscription.subscriber} ");
- List<IActorRef> subscribers;
- if (!subscriptions.TryGetValue(subscription.symbol, out subscribers))
- {
- subscribers = new List<IActorRef>();
- subscriptions.Add(subscription.symbol, subscribers);
- }
- subscribers.Add(subscription.subscriber); // This adds subscription with the Reference supplied by the subscriber; this can be different to SENDER (happens if sender runs in a different thread)
- // Sender needs to be replied to, BECAUSE different Threads in the Actors have a different "#" actor reference.
- Sender.Tell(new MarketDataSubscriptionResult(subscription.symbol, true));
- }
- else
- {
- Console.WriteLine($"Ignoring subscription because broker does not support it: {subscription.symbol} @ {name}");
- Sender.Tell(new MarketDataSubscriptionResult(subscription.symbol, false));
- }
- }
- return true;
- }
- bool OnGetMarketDataRequest (GetMarketDataRequest request)
- {
- // This runs in main thread, and has access to Self and variables.
- var subscriptionSymbols = this.subscriptions.Keys.ToList();
- Console.WriteLine($"{name} is getting market Data for {subscriptionSymbols.Count} subscriptions..");
- var tasks = subscriptionSymbols.Select( symbol => {
- var task = marketDataFunction(symbol);
- task.Start();
- return task;
- });
- Task.WaitAll(tasks.ToArray() );
- Console.WriteLine("All Market Data was received");
- var marketDataList = tasks
- .Select( task => task.Result)
- .ToList();
- Self.Tell(marketDataList);
- return true;
- }
- bool OnMarketDataListRequest(MarketDataListRequest request)
- {
- var marketDataList = this.marketDataDict.Values.ToList();
- Sender.Tell( new MarketDataListResponse { marketData = marketDataList} );
- return true;
- }
- bool OnMarketDataReceived( List<MarketData> marketDataList)
- {
- Console.WriteLine($"Broker {name} has received data. Forwarding data to subscribers..");
- //return true;
- if (marketDataList != null)
- {
- try
- {
- marketDataList.ForEach(marketData =>
- {
- // Update LastMarketData dictionary
- if (marketDataDict.ContainsKey(marketData.symbol))
- {
- marketDataDict[marketData.symbol] = marketData;
- }
- else
- {
- marketDataDict.Add(marketData.symbol , marketData);;
- }
- // Send MarketData to all subscribers tht are subscribed for the symbol
- List<IActorRef> subscribers;
- if (subscriptions.TryGetValue(marketData.symbol, out subscribers))
- {
- if (subscribers.Count == 0)
- {
- Console.WriteLine($"Broker {name}:{marketData.symbol} has no subscribers!");
- }
- subscribers.ForEach(subscriber =>
- {
- Console.WriteLine($"Sending Market data to subscriber {subscriber} {marketData.symbol} {marketData.last}..");
- subscriber.Tell(marketData);
- });
- }
- });
- }
- catch (Exception ex)
- {
- Console.WriteLine($"Broker {name} SendMarketData Exception {ex.Message}.");
- }
- }
- return true;
- }
- protected bool OnOrder(Order order)
- {
- if (order != null)
- {
- // Send order to api..
- Console.WriteLine("Sending Order " + order.symbol + order.limitPrice + order.qty);
- }
- return true;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement