Advertisement
Hulkstance

Untitled

Sep 30th, 2022
43
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 4.04 KB | None | 0 0
  1. using Proto;
  2. using Sts.MarketData.Api.Models;
  3.  
  4. namespace Sts.MarketData.Api.Actors;
  5.  
  6. // The whole actor instance is recreated after a restart. In other words, we lose the state,
  7. // i.e. anything we have stored in the private fields will be gone. Timers survive restarts though.
  8. // If we want to keep the state, we should either use persistence, or never let the actor restart,
  9. // i.e. try/catch to prevent failures.
  10. //
  11. // The point of this local classic/Erlang actor is to act as a proxy/manager for the subscriptions.
  12. // All it does is log the PIDs of the subscribers which makes it possible to forward backfill request messages
  13. // to the appropriate PID.
  14. internal sealed class SubscriptionManagerActor : IActor
  15. {
  16.     private readonly ILogger<SubscriptionManagerActor> _logger;
  17.     private readonly IServiceProvider _serviceProvider;
  18.  
  19.     public SubscriptionManagerActor(ILogger<SubscriptionManagerActor> logger, IServiceProvider serviceProvider)
  20.     {
  21.         ArgumentNullException.ThrowIfNull(logger);
  22.         ArgumentNullException.ThrowIfNull(serviceProvider);
  23.  
  24.         _logger = logger;
  25.         _serviceProvider = serviceProvider;
  26.     }
  27.  
  28.     public async Task ReceiveAsync(IContext context)
  29.     {
  30.         switch (context.Message)
  31.         {
  32.             case Started:
  33.                 break;
  34.  
  35.             case SubscriberRequest { Account: var account }:
  36.                 // the PIDs of these requests should be logged, so we could forward messages to them later on.
  37.                 await SubscribeActors(context, account);
  38.                 break;
  39.  
  40.             case OneOffBackFill { AccountId: var accountId }:
  41.                 // Forwards the BackFillRequest message to the appropriate PID, meaning this message is handled not here,
  42.                 // but in the SubscriptionActor. SubscriptionActor is a child of this actor.
  43.                 await RequestBackFill(context, accountId);
  44.                 break;
  45.  
  46.             case Terminated:
  47.                 await context.StopAsync(context.Self);
  48.                 break;
  49.         }
  50.     }
  51.  
  52.     private Task SubscribeActors(IContext context, Account account)
  53.     {
  54.         _logger.LogInformation("Spawning a subscriber for {AccountId}", account.AccountId);
  55.  
  56.         // ActivatorUtilities.CreateInstance<T> lets us do partial dependency injection i.e. DI "accountId"
  57.         // and let the IoC framework resolve the ILogger<T> itself.
  58.         var props = Props
  59.             .FromProducer(() => ActivatorUtilities.CreateInstance<TestActor>(_serviceProvider, account.AccountId))
  60.             .WithChildSupervisorStrategy(new OneForOneStrategy((_, _) => SupervisorDirective.Restart, 5, null));
  61.  
  62.         // We could log the child actors aka the subscribers into a Dictionary<string, PID>
  63.         // and use the PIDs to forward messages and so on and so forth.
  64.         context.SpawnNamed(props, account.AccountId);
  65.  
  66.         return Task.CompletedTask;
  67.     }
  68.  
  69.     private Task RequestBackFill(IContext context, string accountId)
  70.     {
  71.         var pid = PID.FromAddress(ActorSystem.NoHost, $"subscription-manager/{accountId}");
  72.  
  73.         _logger.LogInformation("Forwarding backfill request to the SubscriptionActor");
  74.         context.Forward(pid);
  75.  
  76.         return Task.CompletedTask;
  77.     }
  78. }
  79.  
  80. internal sealed class TestActor : IActor
  81. {
  82.     private readonly ILogger<TestActor> _logger;
  83.     private readonly string _accountId;
  84.  
  85.     public TestActor(ILogger<TestActor> logger, string accountId)
  86.     {
  87.         ArgumentNullException.ThrowIfNull(logger);
  88.         ArgumentNullException.ThrowIfNull(accountId);
  89.  
  90.         _logger = logger;
  91.         _accountId = accountId;
  92.     }
  93.  
  94.     public Task ReceiveAsync(IContext context)
  95.     {
  96.         switch (context.Message)
  97.         {
  98.             case Started:
  99.                 _logger.LogInformation("TEST ACTOR SPAWNED");
  100.                 break;
  101.  
  102.             case OneOffBackFill backFill:
  103.                 _logger.LogInformation("{AccountId} received backfill: {@BackFill}", _accountId, backFill);
  104.                 break;
  105.         }
  106.  
  107.         return Task.CompletedTask;
  108.     }
  109. }
  110.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement