Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using Proto;
- using Sts.MarketData.Api.Models;
- namespace Sts.MarketData.Api.Actors;
- // The whole actor instance is recreated after a restart. In other words, we lose the state,
- // i.e. anything we have stored in the private fields will be gone. Timers survive restarts though.
- // If we want to keep the state, we should either use persistence, or never let the actor restart,
- // i.e. try/catch to prevent failures.
- //
- // The point of this local classic/Erlang actor is to act as a proxy/manager for the subscriptions.
- // All it does is log the PIDs of the subscribers which makes it possible to forward backfill request messages
- // to the appropriate PID.
- internal sealed class SubscriptionManagerActor : IActor
- {
- private readonly ILogger<SubscriptionManagerActor> _logger;
- private readonly IServiceProvider _serviceProvider;
- public SubscriptionManagerActor(ILogger<SubscriptionManagerActor> logger, IServiceProvider serviceProvider)
- {
- ArgumentNullException.ThrowIfNull(logger);
- ArgumentNullException.ThrowIfNull(serviceProvider);
- _logger = logger;
- _serviceProvider = serviceProvider;
- }
- public async Task ReceiveAsync(IContext context)
- {
- switch (context.Message)
- {
- case Started:
- break;
- case SubscriberRequest { Account: var account }:
- // the PIDs of these requests should be logged, so we could forward messages to them later on.
- await SubscribeActors(context, account);
- break;
- case OneOffBackFill { AccountId: var accountId }:
- // Forwards the BackFillRequest message to the appropriate PID, meaning this message is handled not here,
- // but in the SubscriptionActor. SubscriptionActor is a child of this actor.
- await RequestBackFill(context, accountId);
- break;
- case Terminated:
- await context.StopAsync(context.Self);
- break;
- }
- }
- private Task SubscribeActors(IContext context, Account account)
- {
- _logger.LogInformation("Spawning a subscriber for {AccountId}", account.AccountId);
- // ActivatorUtilities.CreateInstance<T> lets us do partial dependency injection i.e. DI "accountId"
- // and let the IoC framework resolve the ILogger<T> itself.
- var props = Props
- .FromProducer(() => ActivatorUtilities.CreateInstance<TestActor>(_serviceProvider, account.AccountId))
- .WithChildSupervisorStrategy(new OneForOneStrategy((_, _) => SupervisorDirective.Restart, 5, null));
- // We could log the child actors aka the subscribers into a Dictionary<string, PID>
- // and use the PIDs to forward messages and so on and so forth.
- context.SpawnNamed(props, account.AccountId);
- return Task.CompletedTask;
- }
- private Task RequestBackFill(IContext context, string accountId)
- {
- var pid = PID.FromAddress(ActorSystem.NoHost, $"subscription-manager/{accountId}");
- _logger.LogInformation("Forwarding backfill request to the SubscriptionActor");
- context.Forward(pid);
- return Task.CompletedTask;
- }
- }
- internal sealed class TestActor : IActor
- {
- private readonly ILogger<TestActor> _logger;
- private readonly string _accountId;
- public TestActor(ILogger<TestActor> logger, string accountId)
- {
- ArgumentNullException.ThrowIfNull(logger);
- ArgumentNullException.ThrowIfNull(accountId);
- _logger = logger;
- _accountId = accountId;
- }
- public Task ReceiveAsync(IContext context)
- {
- switch (context.Message)
- {
- case Started:
- _logger.LogInformation("TEST ACTOR SPAWNED");
- break;
- case OneOffBackFill backFill:
- _logger.LogInformation("{AccountId} received backfill: {@BackFill}", _accountId, backFill);
- break;
- }
- return Task.CompletedTask;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement