Advertisement
Hulkstance

Untitled

Sep 30th, 2022
41
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 3.85 KB | None | 0 0
  1. using Proto;
  2. using Sts.MarketData.Api.Models;
  3.  
  4. namespace Sts.MarketData.Api.Actors;
  5.  
  6. // The whole actor instance is recreated after a restart. In other words, we lose the state,
  7. // i.e. anything we have stored in the private fields will be gone. Timers survive restarts though.
  8. // If we want to keep the state, we should either use persistence, or never let the actor restart,
  9. // i.e. try/catch to prevent failures.
  10. //
  11. // The point of this local classic/Erlang actor is to act as a proxy/manager for the subscriptions.
  12. // All it does is log the PIDs of the subscribers which makes it possible to forward backfill request messages
  13. // to the appropriate PID.
  14. internal sealed class SubscriptionManagerActor : IActor
  15. {
  16.     // If this actor is restarted for some reason, its state will be lost.
  17.     private readonly IDictionary<string, PID> _accounts = new Dictionary<string, PID>();
  18.  
  19.     private readonly ILogger<SubscriptionManagerActor> _logger;
  20.     private readonly IServiceProvider _serviceProvider;
  21.  
  22.     public SubscriptionManagerActor(ILogger<SubscriptionManagerActor> logger, IServiceProvider serviceProvider)
  23.     {
  24.         ArgumentNullException.ThrowIfNull(logger);
  25.         ArgumentNullException.ThrowIfNull(serviceProvider);
  26.  
  27.         _logger = logger;
  28.         _serviceProvider = serviceProvider;
  29.     }
  30.  
  31.     public async Task ReceiveAsync(IContext context)
  32.     {
  33.         switch (context.Message)
  34.         {
  35.             case Started:
  36.                 break;
  37.  
  38.             case SubscriberRequest { Account: var account }:
  39.                 // the PIDs of these requests should be logged, so we could forward messages to them later on.
  40.                 await SubscribeActors(context, account);
  41.                 break;
  42.  
  43.             case OneOffBackFill { AccountId: var accountId }:
  44.                 // Forwards the BackFillRequest message to the appropriate PID, meaning this message is handled not here,
  45.                 // but in the SubscriptionActor. SubscriptionActor is a child of this actor.
  46.                 await RequestBackFill(context, accountId);
  47.                 break;
  48.  
  49.             case Terminated:
  50.                 StopAll(context);
  51.                 break;
  52.         }
  53.     }
  54.  
  55.     private Task SubscribeActors(IContext context, Account account)
  56.     {
  57.         _logger.LogInformation("Spawning a subscriber for {AccountId}", account.AccountId);
  58.  
  59.         // ActivatorUtilities.CreateInstance<T> lets us do partial dependency injection i.e. DI "accountId"
  60.         // and let the IoC framework resolve the ILogger<T> itself.
  61.         var props = Props
  62.             .FromProducer(() => ActivatorUtilities.CreateInstance<SubscriptionActor>(_serviceProvider, account.AccountId))
  63.             .WithChildSupervisorStrategy(new OneForOneStrategy((_, _) => SupervisorDirective.Restart, 5, null));
  64.  
  65.         // TODO: Please review. Since the actor is named, do we actually need to log the PIDs into the Dictionary<T, Y>?
  66.         // or we could use `var pid = PID.FromAddress(ActorSystem.NoHost, "DEV");`
  67.         var pid = context.SpawnNamed(props, account.AccountId);
  68.         _accounts[account.AccountId] = pid;
  69.  
  70.         return Task.CompletedTask;
  71.     }
  72.  
  73.     private Task RequestBackFill(IContext context, string accountId)
  74.     {
  75.         if (_accounts.TryGetValue(accountId, out var pid))
  76.         {
  77.             _logger.LogInformation("Forwarding backfill request to the SubscriptionActor");
  78.             context.Forward(pid);
  79.         }
  80.         else
  81.         {
  82.             _logger.LogInformation("A subscriber for this backfill request was not found");
  83.         }
  84.  
  85.         return Task.CompletedTask;
  86.     }
  87.  
  88.     private void StopAll(IContext context)
  89.     {
  90.         // Terminate all subscribers
  91.         foreach (var account in _accounts)
  92.         {
  93.             context.Stop(account.Value);
  94.         }
  95.  
  96.         // Terminate itself
  97.         context.Stop(context.Self);
  98.     }
  99. }
  100.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement