Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using Proto;
- using Sts.MarketData.Api.Models;
- namespace Sts.MarketData.Api.Actors;
- // The whole actor instance is recreated after a restart. In other words, we lose the state,
- // i.e. anything we have stored in the private fields will be gone. Timers survive restarts though.
- // If we want to keep the state, we should either use persistence, or never let the actor restart,
- // i.e. try/catch to prevent failures.
- //
- // The point of this local classic/Erlang actor is to act as a proxy/manager for the subscriptions.
- // All it does is log the PIDs of the subscribers which makes it possible to forward backfill request messages
- // to the appropriate PID.
- internal sealed class SubscriptionManagerActor : IActor
- {
- // If this actor is restarted for some reason, its state will be lost.
- private readonly IDictionary<string, PID> _accounts = new Dictionary<string, PID>();
- private readonly ILogger<SubscriptionManagerActor> _logger;
- private readonly IServiceProvider _serviceProvider;
- public SubscriptionManagerActor(ILogger<SubscriptionManagerActor> logger, IServiceProvider serviceProvider)
- {
- ArgumentNullException.ThrowIfNull(logger);
- ArgumentNullException.ThrowIfNull(serviceProvider);
- _logger = logger;
- _serviceProvider = serviceProvider;
- }
- public async Task ReceiveAsync(IContext context)
- {
- switch (context.Message)
- {
- case Started:
- break;
- case SubscriberRequest { Account: var account }:
- // the PIDs of these requests should be logged, so we could forward messages to them later on.
- await SubscribeActors(context, account);
- break;
- case OneOffBackFill { AccountId: var accountId }:
- // Forwards the BackFillRequest message to the appropriate PID, meaning this message is handled not here,
- // but in the SubscriptionActor. SubscriptionActor is a child of this actor.
- await RequestBackFill(context, accountId);
- break;
- case Terminated:
- StopAll(context);
- break;
- }
- }
- private Task SubscribeActors(IContext context, Account account)
- {
- _logger.LogInformation("Spawning a subscriber for {AccountId}", account.AccountId);
- // ActivatorUtilities.CreateInstance<T> lets us do partial dependency injection i.e. DI "accountId"
- // and let the IoC framework resolve the ILogger<T> itself.
- var props = Props
- .FromProducer(() => ActivatorUtilities.CreateInstance<SubscriptionActor>(_serviceProvider, account.AccountId))
- .WithChildSupervisorStrategy(new OneForOneStrategy((_, _) => SupervisorDirective.Restart, 5, null));
- // TODO: Please review. Since the actor is named, do we actually need to log the PIDs into the Dictionary<T, Y>?
- // or we could use `var pid = PID.FromAddress(ActorSystem.NoHost, "DEV");`
- var pid = context.SpawnNamed(props, account.AccountId);
- _accounts[account.AccountId] = pid;
- return Task.CompletedTask;
- }
- private Task RequestBackFill(IContext context, string accountId)
- {
- if (_accounts.TryGetValue(accountId, out var pid))
- {
- _logger.LogInformation("Forwarding backfill request to the SubscriptionActor");
- context.Forward(pid);
- }
- else
- {
- _logger.LogInformation("A subscriber for this backfill request was not found");
- }
- return Task.CompletedTask;
- }
- private void StopAll(IContext context)
- {
- // Terminate all subscribers
- foreach (var account in _accounts)
- {
- context.Stop(account.Value);
- }
- // Terminate itself
- context.Stop(context.Self);
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement