Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System.Data;
- using Proto;
- using Sts.MarketData.Api.Actors.Exchanges;
- using Sts.MarketData.Api.Models;
- namespace Sts.MarketData.Api.Actors;
- /// <summary>
- /// This local classic actor is used as a proxy.
- /// The reason is that we have multiple exchanges and each one of them has different subscription method calls,
- /// different parameters are passed to and there has to be a factory method to choose an exchange.
- /// </summary>
- /// <remarks>
- /// There are two approaches used to keep the PIDs: 1) Dictionary; 2) PID.FromAddress.
- /// 1) Use PID.FromAddress to find known remote services. e.g. infrastructure actors for cluster/remote.
- /// 2) Use dictionary approach for anything where you have many actors that you need to keep track of
- /// e.g. where the name is not known in advance.
- /// It is also worth knowing that a PID will cache the backing Actor Process behind the scenes,
- /// so once it is resolved, it will be kept, while if you use FromAddress, you get a new PID,
- /// which will have to resolve the actor process for every new PID.
- /// </remarks>
- internal sealed class SubscriptionProxy : IActor
- {
- // This state will be gone after an actor restart. Timers only survive restarts.
- // If you want to keep the state, either use persistence, or never let the actor restart,
- // i.e. try/catch to prevent failures.
- private readonly Dictionary<string, PID> _subscribers = new();
- private readonly ILogger<SubscriptionProxy> _logger;
- private readonly IServiceProvider _serviceProvider;
- public SubscriptionProxy(ILogger<SubscriptionProxy> logger, IServiceProvider serviceProvider)
- {
- ArgumentNullException.ThrowIfNull(logger);
- ArgumentNullException.ThrowIfNull(serviceProvider);
- _logger = logger;
- _serviceProvider = serviceProvider;
- }
- public Task ReceiveAsync(IContext context)
- {
- switch (context.Message)
- {
- case Started:
- break;
- case SubscriberRequest { Exchange: var exchange, AccountId: var accountId }:
- CreateChild(context, exchange, accountId);
- break;
- case OneOffBackFill { Exchange: var exchange, AccountId: var accountId }:
- ForwardToChild(context, exchange, accountId);
- break;
- case Notification:
- SendToAllChildren(context);
- break;
- case Terminated:
- StopAll(context);
- break;
- }
- return Task.CompletedTask;
- }
- private void CreateChild(IContext context, string exchange, string accountId)
- {
- if (!_subscribers.TryGetValue($"{exchange}:{accountId}", out _))
- {
- var exchangeFactory = new ExchangeFactory(context, _serviceProvider, 5);
- var child = exchangeFactory.CreateExchange(exchange, accountId);
- _subscribers.Add($"{exchange}:{accountId}", child);
- }
- }
- private void ForwardToChild(IContext context, string exchange, string accountId)
- {
- if (_subscribers.TryGetValue($"{exchange}:{accountId}", out var pid))
- {
- context.Forward(pid);
- }
- else
- {
- _logger.LogInformation("The PID for account id {AccountId} was not found", accountId);
- }
- }
- private void SendToAllChildren(IContext context)
- {
- // Fan-out mechanism
- foreach (var child in context.Children)
- {
- context.Forward(child);
- }
- }
- private void StopAll(IContext context)
- {
- // Terminates parent and its children
- context.Stop(context.Self);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement