Guest User

Untitled

a guest
Aug 19th, 2021
26
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 14.99 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using AutoMapper;
  8. using Exchange.Constants;
  9. using Exchange.ExchangeApplicationContracts;
  10. using JetBrains.Annotations;
  11. using Microsoft.Extensions.Logging;
  12. using Pair;
  13. using Pair.Dtos;
  14. using Trade;
  15. using Trade.Dtos;
  16. using Volo.Abp.Uow;
  17.  
  18. namespace TradeFeed
  19. {
  20. public class TradeFeed : ITradeFeed
  21. {
  22. private readonly ITradeRepository _tradeReposytory;
  23. private readonly IExchangeProvider _exchangeProvider;
  24. private readonly IPairCrudAppService _pairCrudAppService;
  25. private readonly IUnitOfWorkManager _unitOfWorkManager;
  26. private readonly ITradeFeedContext _context;
  27. private static readonly object _lockerPairRead = new();
  28. private static readonly object _lockerTradeSave = new();
  29. private static readonly object _lockerExchangeTradeLoad = new();
  30. private readonly IMapper _mapper;
  31. private readonly ITradeFeedRepository _tradeFeedRepository;
  32.  
  33. public TradeFeed([NotNull] IExchangeProvider exchangeProvider,
  34. [NotNull] IPairCrudAppService pairCrudAppService,
  35. [NotNull] ITradeRepository tradeReposytory,
  36. [NotNull] IUnitOfWorkManager unitOfWorkManager,
  37. [NotNull] ITradeFeedContext context,
  38. [NotNull] ITradeFeedRepository tradeFeedRepository)
  39. {
  40. _exchangeProvider = exchangeProvider ?? throw new ArgumentNullException(nameof(exchangeProvider));
  41. _pairCrudAppService = pairCrudAppService ?? throw new ArgumentNullException(nameof(pairCrudAppService));
  42. _tradeReposytory = tradeReposytory ?? throw new ArgumentNullException(nameof(tradeReposytory));
  43. _unitOfWorkManager = unitOfWorkManager ?? throw new ArgumentNullException(nameof(unitOfWorkManager));
  44. _context = context ?? throw new ArgumentNullException(nameof(context));
  45. _tradeFeedRepository = tradeFeedRepository ?? throw new ArgumentNullException(nameof(tradeFeedRepository));
  46. }
  47.  
  48. public string Name { get; } = "Мониторинг сделок по валютной паре";
  49. public Action Execute => Feed;
  50.  
  51. public void Dispose()
  52. {
  53. throw new NotImplementedException();
  54. }
  55.  
  56. public ITradeFeedContext Context { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
  57.  
  58. /// <summary>
  59. /// Собирает информацию о курсах валютных пар и сохраняет ее в БД для последующей аналитик
  60. /// </summary>
  61. [UnitOfWork(IsDisabled = false)]
  62. async void Feed()
  63. {
  64. //процесс получения фида проходит первый цикл импорта данных
  65. //в таком случае необходимо исключить из общего объема данных уже ранее загруженные сделки
  66. bool isFirstTimeRun = true;
  67.  
  68. //Сделки загруженные на предыдущем запросе
  69. IDictionary<PairDto, IEnumerable<TradeDto>> previousTrades = new Dictionary<PairDto, IEnumerable<TradeDto>>();
  70.  
  71. var exchange = _exchangeProvider.Get(_context.ExchangeId);
  72.  
  73. IList<PairDto> pairs = new List<PairDto>();
  74.  
  75. var pairIdList = _context.PairIds.Split(";");
  76.  
  77. foreach (var pairId in pairIdList)
  78. {
  79. //TODO организовать агрегированный запрос на получение всех пар сразу
  80. pairs.Add(await _pairCrudAppService.GetAsync(pairId));
  81. }
  82.  
  83. ///В цикле с установленной задержкой делать запросы на биржу по совершаемым
  84. /// сделкам отслеживаемой валютной пары.
  85. do
  86. {
  87. var trades = await exchange.ExchangeDriver.TradesAsync(pairs);
  88.  
  89. if (trades.Count == 0)
  90. {
  91. break;
  92. }
  93.  
  94. if (isFirstTimeRun)
  95. {
  96. //Должен исполняться только один раз при исполнении первой итеррации цикла
  97. //чтобы исключить дублирования загрузки
  98. int coefficientSelection = 2; // коэффициент выборки, для покрытия одинакового времи записи данных
  99.  
  100. foreach (var keyValue in trades)
  101. {
  102. using (var uow = _unitOfWorkManager.Begin())
  103. {
  104. var loadedTradesByPair = (await _tradeReposytory
  105. .WithDetailsAsync(t => t.OrderBook, t => t.Pair))
  106. .Where(t => t.ExchangeId == _context.ExchangeId && t.PairId == keyValue.Key.Id)
  107. .OrderByDescending(t => t.Date)
  108. .Take(keyValue.Value.Count() * coefficientSelection)
  109. .ToList();
  110.  
  111. previousTrades.Add(keyValue.Key, loadedTradesByPair.Select(t => _mapper.Map<TradeDto>(trades)));
  112. }
  113. }
  114. isFirstTimeRun = false;
  115. }
  116.  
  117. ///Оставляем только новые сделки, т.к. в последнем запросе естественно
  118. /// будут сделки, котрые были уже получены в предпоследнем запросе
  119. var lastNewTrades = GetOnlyNewTrades(trades, previousTrades).ToList();
  120.  
  121. var tradesToWrite = lastNewTrades.Select(t => _mapper.Map<TradeContext>(trades)).ToList();
  122.  
  123. _ = await _tradeReposytory.InsertAsync(tradesToWrite, _context.ExchangeId);
  124.  
  125. //Изменение задержки перед последующей загрузкой на основе анализа повторно скаченных данных
  126. ChangePause(lastNewTrades, trades);
  127.  
  128. LogLevel.Information.ToString($"Count new trades loaded from {exchange.Id}: {lastNewTrades.Count} ");
  129.  
  130. //сохраняем последние сделки, чтобы сравнить с ними загружаемые сделки на следующем шаге
  131. previousTrades = trades;
  132.  
  133. await Task.Delay(_context.Pause);
  134.  
  135. //Ведется контроль за выставленным состоянием бота и в случае необходимости
  136. //прекращает мониторинг
  137. //https://docs.microsoft.com/ru-ru/dotnet/standard/parallel-programming/task-cancellation
  138. if (Context.CancellationTokenSource.IsCancellationRequested)
  139. {
  140. ///TODO Предварительные операции по завершению работы мониторинга
  141.  
  142. Context.CancellationTokenSource.Token.ThrowIfCancellationRequested();
  143. }
  144.  
  145. } while (true);
  146. }
  147.  
  148. /// <summary>
  149. /// Выбирает и возвращает только новые сделки по отношению к выборке
  150. /// на предыдущей итеррации. Нет необходимости коммитить в БД одни и те же сделки
  151. /// </summary>
  152. /// <param name="trades"></param>
  153. /// <param name="previousTrades"></param>
  154. /// <returns></returns>
  155. private IList<TradeDto> GetOnlyNewTrades(IDictionary<PairDto, IEnumerable<TradeDto>> trades, IDictionary<PairDto, IEnumerable<TradeDto>> previousTrades)
  156. {
  157. var allTrades = trades.SelectMany(x => x.Value);
  158.  
  159. var allPreviousTrades = previousTrades.SelectMany(x => x.Value).ToList();
  160.  
  161. List<TradeDto> newTrades = null;
  162. switch (_context.ExchangeId)
  163. {
  164. case "BitcoinExchange":
  165. newTrades = allTrades.Except(allPreviousTrades, new TradeBitcoinComparer()).ToList();
  166. break;
  167. default:
  168. newTrades = allTrades.Except(allPreviousTrades, new TradeComparer()).ToList();
  169. break;
  170. }
  171.  
  172. return newTrades;
  173. }
  174.  
  175. private class TradeBitcoinComparer : IEqualityComparer<TradeDto>
  176. {
  177. public bool Equals(TradeDto x, TradeDto y)
  178. {
  179. return x.PairId == y.PairId && x.Price == y.Price && x.Quantity == y.Quantity && x.TradeTypes == y.TradeTypes;
  180. }
  181.  
  182. public int GetHashCode(TradeDto obj)
  183. {
  184. return (obj.PairId + "|" + obj.Price.ToString() + "|" + obj.Quantity.ToString() + "|" + obj.TradeTypes.ToString()).GetHashCode();
  185. }
  186. }
  187.  
  188. /// <summary>
  189. /// Сравниватель двух сделок, загруженных из биржи
  190. /// </summary>
  191. private class TradeComparer : IEqualityComparer<TradeDto>
  192. {
  193. public bool Equals(TradeDto x, TradeDto y)
  194. {
  195. return x.TradeId == y.TradeId;
  196. }
  197.  
  198. public int GetHashCode(TradeDto obj)
  199. {
  200. return obj.TradeId.GetHashCode();
  201. }
  202. }
  203.  
  204.  
  205. /// <summary>
  206. /// Вносит коррективы в величину паузы между двумя загрузками данных
  207. /// в зависимости от того, насколько быстро меняются данные.
  208. /// Настоящая реализация возможно несколько грубо отражает возможную динамику
  209. /// Этот алгоритм можно уточнять и делать более диференцированый и более динамичный
  210. /// </summary>
  211. /// <param name="countNewTrades">количество новых сделок</param>
  212. /// <param name="countTrades"></param>
  213. private void ChangePause(List<TradeDto> countNewTrades, IDictionary<PairDto, IEnumerable<TradeDto>> trades)
  214. {
  215. if (trades.Count == 0) return;
  216.  
  217. double max = 0.0;
  218. double pairMax = 0.0;
  219. PairDto pair = null;
  220. foreach (var item in trades)
  221. {
  222. double count = countNewTrades.Where(x => x.PairId == item.Key.Id).Count();
  223. if (pairMax < count)
  224. {
  225. pairMax = count;
  226. max = item.Value.Count();
  227. pair = item.Key;
  228. }
  229. }
  230.  
  231. if (pairMax / max < 0.2)
  232. {
  233. //Увеличиваем задержку в 2 раза
  234. Context.Pause *= 2;
  235. }
  236. else if (pairMax / max < 0.4)
  237. {
  238. //Увеличиваем задержку на 2 дельты
  239. Context.Pause += Context.PauseDelta * 2;
  240. }
  241. else if (pairMax / max < 0.5)
  242. {
  243. //Увеличиваем задержку на дельту
  244. Context.Pause += Context.PauseDelta;
  245. }
  246. else if (pairMax / max > 0.8)
  247. {
  248. //Уменьшаем задержку в 2 раза
  249. Context.Pause /= 2;
  250. }
  251. else if (pairMax / max > 0.6)
  252. {
  253. //Уменьшаем задержку на дельту
  254. Context.Pause -= Context.PauseDelta;
  255. }
  256. #if DEBUG
  257. Debug.WriteLine($"Количество новых ордеров: {pairMax} : {max} : {pair?.Name} с биржи: {_exchangeProvider.Get(Context.ExchangeId).BriefName} Задержка: {Context.Pause} милисек");
  258. #endif
  259. }
  260.  
  261.  
  262. [UnitOfWork(IsDisabled = true)]
  263. public virtual async Task StartAsync(ITradeFeedContext _context)
  264. {
  265. _context.Status = BotStatuses.Work;
  266. _context.StartTime = DateTime.UtcNow;
  267. _context.CancellationTokenSource = new CancellationTokenSource();
  268.  
  269. //TODO сохранить tokenSource в контексте, чтобы потом воспользоваться им для
  270. //остановки задачи
  271. await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
  272.  
  273. _context.Execute();
  274. }
  275.  
  276. public virtual async Task StopAsync(ITradeFeedContext _context)
  277. {
  278. //bot.Dispose();
  279. _context.CancellationTokenSource.Cancel();
  280.  
  281. _context.Status = BotStatuses.Stoped;
  282.  
  283. await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
  284. }
  285.  
  286. public virtual async Task PauseAsync(ITradeFeedContext _context)
  287. {
  288. /// 1. Все установленные заказы на покупку продажу на бирже остаются
  289. ///но бот уже не сможет делать новые заказы
  290.  
  291.  
  292. //2. Удалить бота из очереди исполнения задач
  293.  
  294. /// 3. Изменить статус бота и произвести запись этих изменений в БД
  295. _context.Status = BotStatuses.OnPause;
  296.  
  297. await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
  298. }
  299.  
  300. public virtual async Task ResumeAsync(ITradeFeedContext _context)
  301. {
  302. /// 1. Все установленные заказы на покупку продажу на бирже остаются
  303. ///но бот уже не сможет делать новые заказы
  304.  
  305.  
  306. //2. Постановка в очередь исполнения задач
  307. //var create = CreateTradeFeedDto(tradeFeed.GetType(), tradeFeed);
  308.  
  309. if (_context.StartTime == null)
  310. {
  311. // _queryManager.Enqueue(() => botTactics.Execute());
  312. }
  313. else
  314. {
  315. // _queryManager.Schedule(() => botTactics.Execute(), DateTime.Now - (DateTime)botTactic.StartTime);
  316. }
  317.  
  318. /// 3. Изменить статус бота и произвести запись этих изменений в БД
  319. _context.Status = BotStatuses.Work;
  320.  
  321. await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
  322. }
  323. }
  324. }
Add Comment
Please, Sign In to add comment