Advertisement
Hulkstance

Untitled

Oct 1st, 2022
49
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 2.80 KB | None | 0 0
  1. using MktData.Api.Actors.Exchanges;
  2. using MktData.Api.Models;
  3. using Proto;
  4.  
  5. namespace MktData.Api.Actors;
  6.  
  7. // The whole actor instance is recreated on restart. We lose the state e.g. fields will be lost.
  8. // What if we want to keep the state? We should either use persistence or never let the actor restart, i.e. try/catch to prevent failures.
  9. //
  10. // This actor spawns child "classic" local actors supposed to coordinate the virtual actors via PID.
  11. internal sealed class SubscriptionManager : IActor
  12. {
  13.     private readonly ILogger<SubscriptionManager> _logger;
  14.     private readonly IServiceProvider _serviceProvider;
  15.  
  16.     public SubscriptionManager(ILogger<SubscriptionManager> logger, IServiceProvider serviceProvider)
  17.     {
  18.         _logger = logger;
  19.         _serviceProvider = serviceProvider;
  20.     }
  21.  
  22.     public Task ReceiveAsync(IContext context)
  23.     {
  24.         // We forward the messages to child actors because this is a subscription manager,
  25.         // i.e. it is meant for managing subscriptions for all exchanges.
  26.         switch (context.Message)
  27.         {
  28.             case Started:
  29.                 break;
  30.  
  31.             case SubscriberRequest { Exchange: var exchange, AccountId: var accountId }:
  32.                 CreateChild(context, exchange, accountId);
  33.                 break;
  34.  
  35.             case OneOffBackFill { Exchange: var exchange, AccountId: var accountId }:
  36.                 ForwardToChild(context, exchange, accountId);
  37.                 break;
  38.  
  39.             case Notification:
  40.                 SendToAllChildren(context);
  41.                 break;
  42.  
  43.             case Terminated:
  44.                 StopAll(context);
  45.                 break;
  46.         }
  47.  
  48.         return Task.CompletedTask;
  49.     }
  50.  
  51.     private PID CreateChild(IContext context, string exchange, string accountId)
  52.     {
  53.         var childProps = Props.FromProducer(() =>
  54.             // `ActivatorUtilities.CreateInstance` is used in opposed to `new FtxChildActor(...)`
  55.             // because we partially DI `accountId` and let the IoC framework inject the logger itself.
  56.             ActivatorUtilities.CreateInstance<FtxSubscriptionActor>(_serviceProvider, accountId)
  57.         );
  58.         return context.SpawnNamed(childProps, $"{exchange}:{accountId}");
  59.     }
  60.  
  61.     private void ForwardToChild(IContext context, string exchange, string accountId)
  62.     {
  63.         var pid = PID.FromAddress(ActorSystem.NoHost, $"subscription-manager/{exchange}:{accountId}");
  64.         context.Forward(pid);
  65.     }
  66.  
  67.     // Fan-out mechanism
  68.     private void SendToAllChildren(IContext context)
  69.     {
  70.         foreach (var child in context.Children)
  71.         {
  72.             context.Forward(child);
  73.         }
  74.     }
  75.  
  76.     // Terminates parent and its children
  77.     private void StopAll(IContext context)
  78.     {
  79.         context.Stop(context.Self);
  80.     }
  81. }
  82.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement