Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Collections.Concurrent;
- using System.Globalization;
- using System.Linq;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using csclientnet;
- using Marathon.Trading.BookOrderTracker.Common.Models;
- namespace Marathon.Trading.BookOrderTracker.ServerCore.DAL
- {
- public class ItgDataAccess
- {
- public bool IS_PROD = true;
- private const int CLIENT_SYMBOL_LOAD = 1;
- private const int HEAVY_SYMBOL_LOAD = 4;
- //private const string SIM_USERNAME = "bherbstsim";
- //private const string SIM_PASSWORD = "Herbst76";
- private const string SIM_USERNAME = "marathsim";
- private const string SIM_PASSWORD = "Marath37";
- //private const string SIM_USERNAME = "sethsim1";
- //private const string SIM_PASSWORD = "Seth3s#m";
- private const string PROD_USERNAME = "mthndata";
- private const string PROD_PASSWORD = "mAt63#da";
- private int _disconnectResetPeriod;
- public int DisconnectThreshold {get; set;}
- public int DisconnectResetPeriod
- {
- get { return _disconnectResetPeriod; }
- set { if (_disconnectResetPeriod != value) { _disconnectResetPeriod = value; if(_disconnectResetTimer!=null) _disconnectResetTimer.Change(0, _disconnectResetPeriod);} }
- }
- private const string SIM_SERVER = "tcp=sim-csp.itg.com:40003";
- private const string CROSS_SERVER = "tcp=199.99.96.201:40003";
- private const string PROD_SERVER = "tcp=csp-ny.itg.com:30115";
- private const String SIM_PART = "TEST";
- private const String PROD_PART = "PROD";
- public delegate void ItgConnectionUpdate(String msg);
- public event ItgConnectionUpdate ItgDisconnected;
- public event ItgConnectionUpdate ItgReconnected;
- public event ItgConnectionUpdate ItgError;
- private string SERVER;
- private string PART;
- private string USERNAME;
- private string PASSWORD;
- private BlockingCollection<ClientUpdate> _itgUpdateQueue;
- public static Task _itgProcessTask;
- //Allow each instance of a client to hold up to CLIENT_SYMBOL_LOAD, each normal symbol has a load value of 4, heavy symbols have a load value of HEAVY_SYMBOL_LOAD
- private ConcurrentDictionary<int, Pair<int, Client>> _clientDictionary;
- private ConcurrentDictionary<int, Pair<int, ClientSession>> _sessionDictionary;
- private ConcurrentDictionary<string, int> _disconnectCount; //tracks the number of disconnects for each symbol
- private ConcurrentDictionary<string, int> _symbolToClientIdMap; //Identifies the id of the client where the symbol is subcribed
- private int _largestClientId;
- private HashSet<string> _heavySymbolOverrides = new HashSet<string>() { "SPY", "VXX" };
- private Client stockListener;
- private ClientSession stockSession;
- private Logger _logger;
- private Timer _disconnectResetTimer;
- public Task updateTask;
- private Action<Tuple<string, List<OptionOrderUpdate>>> _optionOrderUpdateDelegate;
- private Action<Underlyer> _underlyerUpdateDelegate;
- private CancellationTokenSource cancellationToken = new CancellationTokenSource();
- public delegate void MessageHandler(string message);
- public event MessageHandler MessageReceived;
- public delegate void DisconnectedHandler(string symbol);
- public event DisconnectedHandler SymbolDisconnectedEvent;
- public ItgDataAccess(Action<Tuple<string, List<OptionOrderUpdate>>> optionOrderUpdateDelegate, Action<Underlyer> underlyerUpdateDelegate, Logger logger)
- {
- if (IS_PROD)
- {
- SERVER = PROD_SERVER;
- PART = PROD_PART;
- USERNAME = PROD_USERNAME;
- PASSWORD = PROD_PASSWORD;
- }
- else
- {
- SERVER = CROSS_SERVER;
- PART = SIM_PART;
- USERNAME = SIM_USERNAME;
- PASSWORD = SIM_PASSWORD;
- }
- DisconnectThreshold = 10;
- DisconnectResetPeriod = 60000;
- _clientDictionary = new ConcurrentDictionary<int, Pair<int, Client>>();
- _sessionDictionary = new ConcurrentDictionary<int, Pair<int, ClientSession>>();
- _disconnectCount = new ConcurrentDictionary<string, int>();
- _symbolToClientIdMap = new ConcurrentDictionary<string, int>();
- _largestClientId = 0;
- _itgUpdateQueue = new BlockingCollection<ClientUpdate>();
- _logger = logger;
- _optionOrderUpdateDelegate = optionOrderUpdateDelegate;
- _underlyerUpdateDelegate = underlyerUpdateDelegate;
- Console.WriteLine("ITG connection established.");
- var stockTuple = SetupSession();
- stockListener = stockTuple.Item1;
- stockSession = stockTuple.Item2;
- _disconnectResetTimer = new Timer((sender) =>
- {
- foreach(string key in _disconnectCount.Keys.ToList())
- _disconnectCount[key] = 0;
- });
- _disconnectResetTimer.Change(0, DisconnectResetPeriod);
- ProcessItgUpdates();
- }
- private Tuple<Client, ClientSession> SetupSession()
- {
- var _session = new ClientSession();
- var _listener = new Client();
- _listener.SubscribeStatus += (sender, status) =>
- {
- var message = String.Format("Listener subscribed to {0}. Status is {1}.", status.Subscription, status.Status);
- _logger.QueueLine(message);
- var messageReceived = MessageReceived;
- if(messageReceived != null)
- messageReceived(message);
- };
- _listener.Disconnect += (s, e) =>
- {
- if (ItgDisconnected != null)
- ItgDisconnected(e.Reason.ToString());
- string symbol = "";
- int clientDictId = -1;
- foreach (KeyValuePair<int, Pair<int, Client>> kvp in _clientDictionary)
- {
- if (s == kvp.Value.Item2)
- clientDictId = kvp.Key;
- }
- var match = _symbolToClientIdMap.Where(kvp => kvp.Value == clientDictId);
- if (match != null && match.Count() > 0)
- {
- symbol = match.ElementAt(0).Key;
- var message = symbol + " - ITG feed disconnected: " + e.Reason.ToString();
- _logger.QueueLine(message);
- var messageReceived = MessageReceived;
- if (messageReceived != null)
- {
- messageReceived("Pre-Disconnect Update Buildup: " + _itgUpdateQueue.LongCount());
- messageReceived(message);
- }
- if (!symbol.Equals("") && _disconnectCount.ContainsKey(symbol))
- {
- _disconnectCount[symbol]++;
- if (_disconnectCount[symbol] > DisconnectThreshold)
- {
- UnsubscribeFromSymbol(symbol);
- if (messageReceived != null)
- messageReceived("Too many disconnects. Disconnected " + symbol);
- SymbolDisconnectedEvent(symbol);
- }
- }
- }
- };
- _listener.Error += (s, e) =>
- {
- var message = "ITG encountered an error: " + e.Description + " Status: " + e.Status;
- _logger.QueueLine(message);
- var messageReceived = MessageReceived;
- if(messageReceived!=null)
- messageReceived(message);
- if (ItgError != null)
- ItgError(e.Description);
- };
- _listener.Reconnect += (s) =>
- {
- var message = "ITG feed automatically reconnected.";
- _logger.QueueLine(message);
- var messageReceived = MessageReceived;
- if(messageReceived != null)
- messageReceived(message);
- if (ItgReconnected != null)
- ItgReconnected("Reconnected");
- };
- _listener.Update += OnItgDataUpdate;
- _listener.SetClientSession(_session);
- _session.Connect(SERVER, USERNAME, PASSWORD);
- return new Tuple<Client, ClientSession>(_listener, _session);
- }
- private void OnItgDataUpdate(Client sender, ClientUpdate update)
- {
- _itgUpdateQueue.Add(update);
- }
- private void ProcessItgUpdates()
- {
- _itgProcessTask = Task.Factory.StartNew(() =>
- {
- foreach(var update in _itgUpdateQueue.GetConsumingEnumerable())
- {
- if (cancellationToken.IsCancellationRequested)
- return;
- string[] subscriptionStrings;
- subscriptionStrings = update.Subscription.Split('.');
- UpdateOrderData(subscriptionStrings, update.Update);
- update.Dispose();
- }
- });
- }
- private void UpdateOrderData(string[] subscriptionStrings, Blob message)
- {
- DateTime timestamp = DateTime.Now;;
- if (subscriptionStrings[0].Equals(pricesrvfields.PRICESRV_SUB_STOCK))
- {
- string symbol = subscriptionStrings[subscriptionStrings.Length - 1];
- Underlyer underlyer = UpdateStockData(symbol, message);
- if(underlyer != null)
- _underlyerUpdateDelegate(underlyer);
- }
- else if (subscriptionStrings[0].Equals(pricesrvfields.PRICESRV_SUB_OPTION_SERIES) || subscriptionStrings[0].Equals(pricesrvfields.PRICESRV_SUB_OPTION_MONTH)
- || subscriptionStrings[0].Equals(pricesrvfields.PRICESRV_SUB_OPTION_SERIES_DELAYED))
- {
- string osiCode = "";
- var bboUpdates = new List<OptionOrderUpdate>();
- for (int i = 0; i < message.FieldCount; i++)
- {
- string tempOsiCode = UpdateOptionData(timestamp, bboUpdates, message, i);
- if (osiCode.Equals("") && !tempOsiCode.Equals(""))
- osiCode = tempOsiCode;
- }
- if (bboUpdates.Count > 0)
- _optionOrderUpdateDelegate(Tuple.Create(osiCode, bboUpdates));
- }
- message.Dispose();
- }
- private Underlyer UpdateStockData(string symbol, Blob message)
- {
- double lastStockPrice = 0, stockBid = 0, stockAsk = 0;
- for (int i = 0; i < message.FieldCount; i++)
- {
- switch (message.GetFieldDescription(i))
- {
- //Stock price update
- case pricesrvfields.FLD_LAST:
- lastStockPrice = message.GetDouble(i);
- break;
- case pricesrvfields.FLD_BID:
- stockBid = message.GetDouble(i);
- break;
- case pricesrvfields.FLD_ASK:
- stockAsk = message.GetDouble(i);
- break;
- }
- }
- if (lastStockPrice != 0 || stockBid != 0 || stockAsk != 0)
- return new Underlyer(symbol, lastStockPrice, stockBid, stockAsk);
- else
- return null;
- }
- private string UpdateOptionData(DateTime timestamp, List<OptionOrderUpdate> bboUpdates, Blob message, int fieldNum)
- {
- string osiCode="";
- OptionOrderUpdate update;
- switch (message.GetFieldDescription(fieldNum))
- {
- //Option Data
- case pricesrvfields.FLD_OPRACODE:
- osiCode = message.GetString(fieldNum);
- break;
- case pricesrvfields.FLD_EXCH_AMEX:
- update = GetExchangeData(message.GetBlob(fieldNum), "AMEX");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_BOX:
- update = GetExchangeData(message.GetBlob(fieldNum), "BOX");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_CBOE:
- update = GetExchangeData(message.GetBlob(fieldNum), "CBOE");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_ISE:
- update = GetExchangeData(message.GetBlob(fieldNum), "ISE");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_PSE:
- update = GetExchangeData(message.GetBlob(fieldNum), "ARCA");
- if (update.Bid > 0 || update.BidSize > 0 || update.Ask > 0 || update.AskSize > 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_PHLX:
- update = GetExchangeData(message.GetBlob(fieldNum), "PHLX");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_NSDQ:
- update = GetExchangeData(message.GetBlob(fieldNum), "NSDQ");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_BATS:
- update = GetExchangeData(message.GetBlob(fieldNum), "BATS");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_C2OX:
- update = GetExchangeData(message.GetBlob(fieldNum), "C2OX");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_BXO:
- update = GetExchangeData(message.GetBlob(fieldNum), "BXO");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_MX:
- update = GetExchangeData(message.GetBlob(fieldNum), "MX");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_MIO:
- update = GetExchangeData(message.GetBlob(fieldNum), "MIAX");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_GMNI:
- update = GetExchangeData(message.GetBlob(fieldNum), "GMNI");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- case pricesrvfields.FLD_EXCH_EDGE:
- update = GetExchangeData(message.GetBlob(fieldNum), "EDGX");
- if (update.Bid >= 0 || update.BidSize >= 0 || update.Ask >= 0 || update.AskSize >= 0)
- bboUpdates.Add(update);
- break;
- }
- return osiCode;
- }
- private OptionOrderUpdate GetExchangeData(Blob message, string exchange)
- {
- double bid = -1, ask = -1;
- int bidSize = -1, askSize = -1;
- for (int i = 0; i < message.FieldCount; i++)
- {
- switch (message.GetFieldDescription(i))
- {
- case pricesrvfields.FLD_BID:
- bid = message.GetDouble(i);
- break;
- case pricesrvfields.FLD_ASK:
- ask = message.GetDouble(i);
- break;
- case pricesrvfields.FLD_BIDSIZE:
- bidSize = message.GetInt(i);
- break;
- case pricesrvfields.FLD_ASKSIZE:
- askSize = message.GetInt(i);
- break;
- }
- }
- message.Dispose();
- return new OptionOrderUpdate(bid, ask, bidSize, askSize, exchange);
- }
- public void SubscribeToSymbol(String symbol)
- {
- //If the string is nothing or we have already subscribed to it, don't do it again
- if (String.IsNullOrEmpty(symbol) || _symbolToClientIdMap.ContainsKey(symbol))
- return;
- Client optionListener;
- ClientSession optionSession;
- string subscriptionStr;
- int clientId = -1;
- int symbolLoad = (_heavySymbolOverrides.Contains(symbol)) ? HEAVY_SYMBOL_LOAD : 1;
- foreach(KeyValuePair<int, Pair<int, Client>> kvp in _clientDictionary)
- {
- if (kvp.Value.Item1 + symbolLoad <= CLIENT_SYMBOL_LOAD)
- {
- clientId = kvp.Key;
- break;
- }
- }
- if (clientId == -1)
- {
- var optionTuple = SetupSession();
- optionListener = optionTuple.Item1;
- optionSession = optionTuple.Item2;
- _clientDictionary.TryAdd(_largestClientId, new Pair<int, Client>(0, optionListener));
- _sessionDictionary.TryAdd(_largestClientId, new Pair<int, ClientSession>(0, optionSession));
- clientId = _largestClientId;
- _largestClientId++;
- }
- else
- {
- optionListener = _clientDictionary[clientId].Item2;
- optionSession = _sessionDictionary[clientId].Item2;
- }
- subscriptionStr = pricesrvfields.PRICESRV_SUB_OPTION_SERIES_DELAYED + "." + PART + "." + symbol.ToUpper();
- optionListener.Subscribe(pricesrvfields.SERVICEID_PRICE, subscriptionStr, true);
- _symbolToClientIdMap.TryAdd(symbol, clientId);
- _clientDictionary[clientId].Item1 += symbolLoad;
- _sessionDictionary[clientId].Item1 += symbolLoad;
- //Subscribe to stock price for the symbol
- subscriptionStr = pricesrvfields.PRICESRV_SUB_STOCK + "." + PART + ". ." + symbol.ToUpper();
- stockListener.Subscribe(pricesrvfields.SERVICEID_PRICE, subscriptionStr, true);
- _disconnectCount.TryAdd(symbol, 0);
- _logger.QueueLine("Subscribed to: " + symbol);
- }
- public void UnsubscribeFromSymbol(String symbol)
- {
- if (String.IsNullOrEmpty(symbol) || !_symbolToClientIdMap.ContainsKey(symbol))
- return;
- string subscriptionStr;
- try
- {
- int symbolLoad = (_heavySymbolOverrides.Contains(symbol)) ? HEAVY_SYMBOL_LOAD : 1;
- //Unsubcribe from the option chain
- subscriptionStr = pricesrvfields.PRICESRV_SUB_OPTION_SERIES_DELAYED + "." + PART + "." + symbol.ToUpper();
- int clientId = -1;
- if (_symbolToClientIdMap.TryGetValue(symbol, out clientId))
- {
- Pair<int, Client> optionClientPair;
- if (_clientDictionary.TryGetValue(clientId, out optionClientPair))
- {
- optionClientPair.Item2.Unsubscribe(pricesrvfields.SERVICEID_PRICE, subscriptionStr);
- optionClientPair.Item1 -= symbolLoad;
- }
- Pair<int, ClientSession> optionSessionPair;
- if (_sessionDictionary.TryGetValue(clientId, out optionSessionPair))
- optionSessionPair.Item1 -= symbolLoad;
- ///Unsubscribe from the stock price
- subscriptionStr = pricesrvfields.PRICESRV_SUB_STOCK + "." + PART + ". ." + symbol.ToUpper();
- stockListener.Unsubscribe(pricesrvfields.SERVICEID_PRICE, subscriptionStr);
- int count;
- _disconnectCount.TryRemove(symbol, out count);
- var messageReceived = MessageReceived;
- if (messageReceived != null)
- messageReceived("Unsubscribing from: " + symbol);
- _logger.QueueLine("Unsubcribing from: " + symbol);
- }
- }
- catch (Exception)
- {
- var messageReceived = MessageReceived;
- if(messageReceived != null)
- messageReceived("Error unsubscribing from: " + symbol);
- _logger.QueueLine("Error unsubscribing from: " + symbol);
- }
- }
- private void UnsubscribeAllAndDisconnect()
- {
- foreach (KeyValuePair<int, Pair<int, Client>> kvp in _clientDictionary)
- {
- kvp.Value.Item2.CancelAllSubscriptions();
- kvp.Value.Item1 = 0;
- }
- stockListener.CancelAllSubscriptions();
- try
- {
- foreach (KeyValuePair<int, Pair<int, ClientSession>> kvp in _sessionDictionary)
- {
- var optionSession = kvp.Value.Item2;
- if(optionSession.Connected)
- optionSession.Disconnect();
- optionSession.Dispose();
- }
- if(stockSession.Connected)
- stockSession.Disconnect();
- stockSession.Dispose();
- }
- catch (Exception) { }
- }
- public void Shutdown()
- {
- UnsubscribeAllAndDisconnect();
- cancellationToken.Cancel();
- }
- private string[] GetFrontMonths()
- {
- string[] frontMonths = new string[4];
- DateTime month0, month1, month2, month3;
- month0 = DateTime.Now;
- month1 = DateTime.Now.AddMonths(1);
- month2 = DateTime.Now.AddMonths(2);
- month3 = DateTime.Now.AddMonths(3);
- frontMonths[0] = month0.ToString("yyyyMM");
- frontMonths[1] = month1.ToString("yyyyMM");
- frontMonths[2] = month2.ToString("yyyyMM");
- frontMonths[3] = month3.ToString("yyyyMM");
- return frontMonths;
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement