Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.Linq;
- using System.Threading;
- using System.Threading.Tasks;
- using AutoMapper;
- using Exchange.Constants;
- using Exchange.ExchangeApplicationContracts;
- using JetBrains.Annotations;
- using Microsoft.Extensions.Logging;
- using Pair;
- using Pair.Dtos;
- using Trade;
- using Trade.Dtos;
- using Volo.Abp.Uow;
- namespace TradeFeed
- {
- public class TradeFeed : ITradeFeed
- {
- private readonly ITradeRepository _tradeReposytory;
- private readonly IExchangeProvider _exchangeProvider;
- private readonly IPairCrudAppService _pairCrudAppService;
- private readonly IUnitOfWorkManager _unitOfWorkManager;
- private readonly ITradeFeedContext _context;
- private static readonly object _lockerPairRead = new();
- private static readonly object _lockerTradeSave = new();
- private static readonly object _lockerExchangeTradeLoad = new();
- private readonly IMapper _mapper;
- private readonly ITradeFeedRepository _tradeFeedRepository;
- public TradeFeed([NotNull] IExchangeProvider exchangeProvider,
- [NotNull] IPairCrudAppService pairCrudAppService,
- [NotNull] ITradeRepository tradeReposytory,
- [NotNull] IUnitOfWorkManager unitOfWorkManager,
- [NotNull] ITradeFeedContext context,
- [NotNull] ITradeFeedRepository tradeFeedRepository)
- {
- _exchangeProvider = exchangeProvider ?? throw new ArgumentNullException(nameof(exchangeProvider));
- _pairCrudAppService = pairCrudAppService ?? throw new ArgumentNullException(nameof(pairCrudAppService));
- _tradeReposytory = tradeReposytory ?? throw new ArgumentNullException(nameof(tradeReposytory));
- _unitOfWorkManager = unitOfWorkManager ?? throw new ArgumentNullException(nameof(unitOfWorkManager));
- _context = context ?? throw new ArgumentNullException(nameof(context));
- _tradeFeedRepository = tradeFeedRepository ?? throw new ArgumentNullException(nameof(tradeFeedRepository));
- }
- public string Name { get; } = "Мониторинг сделок по валютной паре";
- public Action Execute => Feed;
- public void Dispose()
- {
- throw new NotImplementedException();
- }
- public ITradeFeedContext Context { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
- /// <summary>
- /// Собирает информацию о курсах валютных пар и сохраняет ее в БД для последующей аналитик
- /// </summary>
- [UnitOfWork(IsDisabled = false)]
- async void Feed()
- {
- //процесс получения фида проходит первый цикл импорта данных
- //в таком случае необходимо исключить из общего объема данных уже ранее загруженные сделки
- bool isFirstTimeRun = true;
- //Сделки загруженные на предыдущем запросе
- IDictionary<PairDto, IEnumerable<TradeDto>> previousTrades = new Dictionary<PairDto, IEnumerable<TradeDto>>();
- var exchange = _exchangeProvider.Get(_context.ExchangeId);
- IList<PairDto> pairs = new List<PairDto>();
- var pairIdList = _context.PairIds.Split(";");
- foreach (var pairId in pairIdList)
- {
- //TODO организовать агрегированный запрос на получение всех пар сразу
- pairs.Add(await _pairCrudAppService.GetAsync(pairId));
- }
- ///В цикле с установленной задержкой делать запросы на биржу по совершаемым
- /// сделкам отслеживаемой валютной пары.
- do
- {
- var trades = await exchange.ExchangeDriver.TradesAsync(pairs);
- if (trades.Count == 0)
- {
- break;
- }
- if (isFirstTimeRun)
- {
- //Должен исполняться только один раз при исполнении первой итеррации цикла
- //чтобы исключить дублирования загрузки
- int coefficientSelection = 2; // коэффициент выборки, для покрытия одинакового времи записи данных
- foreach (var keyValue in trades)
- {
- using (var uow = _unitOfWorkManager.Begin())
- {
- var loadedTradesByPair = (await _tradeReposytory
- .WithDetailsAsync(t => t.OrderBook, t => t.Pair))
- .Where(t => t.ExchangeId == _context.ExchangeId && t.PairId == keyValue.Key.Id)
- .OrderByDescending(t => t.Date)
- .Take(keyValue.Value.Count() * coefficientSelection)
- .ToList();
- previousTrades.Add(keyValue.Key, loadedTradesByPair.Select(t => _mapper.Map<TradeDto>(trades)));
- }
- }
- isFirstTimeRun = false;
- }
- ///Оставляем только новые сделки, т.к. в последнем запросе естественно
- /// будут сделки, котрые были уже получены в предпоследнем запросе
- var lastNewTrades = GetOnlyNewTrades(trades, previousTrades).ToList();
- var tradesToWrite = lastNewTrades.Select(t => _mapper.Map<TradeContext>(trades)).ToList();
- _ = await _tradeReposytory.InsertAsync(tradesToWrite, _context.ExchangeId);
- //Изменение задержки перед последующей загрузкой на основе анализа повторно скаченных данных
- ChangePause(lastNewTrades, trades);
- LogLevel.Information.ToString($"Count new trades loaded from {exchange.Id}: {lastNewTrades.Count} ");
- //сохраняем последние сделки, чтобы сравнить с ними загружаемые сделки на следующем шаге
- previousTrades = trades;
- await Task.Delay(_context.Pause);
- //Ведется контроль за выставленным состоянием бота и в случае необходимости
- //прекращает мониторинг
- //https://docs.microsoft.com/ru-ru/dotnet/standard/parallel-programming/task-cancellation
- if (Context.CancellationTokenSource.IsCancellationRequested)
- {
- ///TODO Предварительные операции по завершению работы мониторинга
- Context.CancellationTokenSource.Token.ThrowIfCancellationRequested();
- }
- } while (true);
- }
- /// <summary>
- /// Выбирает и возвращает только новые сделки по отношению к выборке
- /// на предыдущей итеррации. Нет необходимости коммитить в БД одни и те же сделки
- /// </summary>
- /// <param name="trades"></param>
- /// <param name="previousTrades"></param>
- /// <returns></returns>
- private IList<TradeDto> GetOnlyNewTrades(IDictionary<PairDto, IEnumerable<TradeDto>> trades, IDictionary<PairDto, IEnumerable<TradeDto>> previousTrades)
- {
- var allTrades = trades.SelectMany(x => x.Value);
- var allPreviousTrades = previousTrades.SelectMany(x => x.Value).ToList();
- List<TradeDto> newTrades = null;
- switch (_context.ExchangeId)
- {
- case "BitcoinExchange":
- newTrades = allTrades.Except(allPreviousTrades, new TradeBitcoinComparer()).ToList();
- break;
- default:
- newTrades = allTrades.Except(allPreviousTrades, new TradeComparer()).ToList();
- break;
- }
- return newTrades;
- }
- private class TradeBitcoinComparer : IEqualityComparer<TradeDto>
- {
- public bool Equals(TradeDto x, TradeDto y)
- {
- return x.PairId == y.PairId && x.Price == y.Price && x.Quantity == y.Quantity && x.TradeTypes == y.TradeTypes;
- }
- public int GetHashCode(TradeDto obj)
- {
- return (obj.PairId + "|" + obj.Price.ToString() + "|" + obj.Quantity.ToString() + "|" + obj.TradeTypes.ToString()).GetHashCode();
- }
- }
- /// <summary>
- /// Сравниватель двух сделок, загруженных из биржи
- /// </summary>
- private class TradeComparer : IEqualityComparer<TradeDto>
- {
- public bool Equals(TradeDto x, TradeDto y)
- {
- return x.TradeId == y.TradeId;
- }
- public int GetHashCode(TradeDto obj)
- {
- return obj.TradeId.GetHashCode();
- }
- }
- /// <summary>
- /// Вносит коррективы в величину паузы между двумя загрузками данных
- /// в зависимости от того, насколько быстро меняются данные.
- /// Настоящая реализация возможно несколько грубо отражает возможную динамику
- /// Этот алгоритм можно уточнять и делать более диференцированый и более динамичный
- /// </summary>
- /// <param name="countNewTrades">количество новых сделок</param>
- /// <param name="countTrades"></param>
- private void ChangePause(List<TradeDto> countNewTrades, IDictionary<PairDto, IEnumerable<TradeDto>> trades)
- {
- if (trades.Count == 0) return;
- double max = 0.0;
- double pairMax = 0.0;
- PairDto pair = null;
- foreach (var item in trades)
- {
- double count = countNewTrades.Where(x => x.PairId == item.Key.Id).Count();
- if (pairMax < count)
- {
- pairMax = count;
- max = item.Value.Count();
- pair = item.Key;
- }
- }
- if (pairMax / max < 0.2)
- {
- //Увеличиваем задержку в 2 раза
- Context.Pause *= 2;
- }
- else if (pairMax / max < 0.4)
- {
- //Увеличиваем задержку на 2 дельты
- Context.Pause += Context.PauseDelta * 2;
- }
- else if (pairMax / max < 0.5)
- {
- //Увеличиваем задержку на дельту
- Context.Pause += Context.PauseDelta;
- }
- else if (pairMax / max > 0.8)
- {
- //Уменьшаем задержку в 2 раза
- Context.Pause /= 2;
- }
- else if (pairMax / max > 0.6)
- {
- //Уменьшаем задержку на дельту
- Context.Pause -= Context.PauseDelta;
- }
- #if DEBUG
- Debug.WriteLine($"Количество новых ордеров: {pairMax} : {max} : {pair?.Name} с биржи: {_exchangeProvider.Get(Context.ExchangeId).BriefName} Задержка: {Context.Pause} милисек");
- #endif
- }
- [UnitOfWork(IsDisabled = true)]
- public virtual async Task StartAsync(ITradeFeedContext _context)
- {
- _context.Status = BotStatuses.Work;
- _context.StartTime = DateTime.UtcNow;
- _context.CancellationTokenSource = new CancellationTokenSource();
- //TODO сохранить tokenSource в контексте, чтобы потом воспользоваться им для
- //остановки задачи
- await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
- _context.Execute();
- }
- public virtual async Task StopAsync(ITradeFeedContext _context)
- {
- //bot.Dispose();
- _context.CancellationTokenSource.Cancel();
- _context.Status = BotStatuses.Stoped;
- await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
- }
- public virtual async Task PauseAsync(ITradeFeedContext _context)
- {
- /// 1. Все установленные заказы на покупку продажу на бирже остаются
- ///но бот уже не сможет делать новые заказы
- //2. Удалить бота из очереди исполнения задач
- /// 3. Изменить статус бота и произвести запись этих изменений в БД
- _context.Status = BotStatuses.OnPause;
- await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
- }
- public virtual async Task ResumeAsync(ITradeFeedContext _context)
- {
- /// 1. Все установленные заказы на покупку продажу на бирже остаются
- ///но бот уже не сможет делать новые заказы
- //2. Постановка в очередь исполнения задач
- //var create = CreateTradeFeedDto(tradeFeed.GetType(), tradeFeed);
- if (_context.StartTime == null)
- {
- // _queryManager.Enqueue(() => botTactics.Execute());
- }
- else
- {
- // _queryManager.Schedule(() => botTactics.Execute(), DateTime.Now - (DateTime)botTactic.StartTime);
- }
- /// 3. Изменить статус бота и произвести запись этих изменений в БД
- _context.Status = BotStatuses.Work;
- await _tradeFeedRepository.UpdateAsync((TradeFeedContext)_context);
- }
- }
- }
Add Comment
Please, Sign In to add comment