Advertisement
Hulkstance

Untitled

Oct 3rd, 2022
45
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 3.71 KB | None | 0 0
  1. using System.Data;
  2. using Proto;
  3. using Sts.MarketData.Api.Actors.Exchanges;
  4. using Sts.MarketData.Api.Models;
  5.  
  6. namespace Sts.MarketData.Api.Actors;
  7.  
  8. /// <summary>
  9. ///     This local classic actor is used as a proxy.
  10. ///     The reason is that we have multiple exchanges and each one of them has different subscription method calls,
  11. ///     different parameters are passed to and there has to be a factory method to choose an exchange.
  12. /// </summary>
  13. /// <remarks>
  14. ///     There are two approaches used to keep the PIDs: 1) Dictionary; 2) PID.FromAddress.
  15. ///     1) Use PID.FromAddress to find known remote services. e.g. infrastructure actors for cluster/remote.
  16. ///     2) Use dictionary approach for anything where you have many actors that you need to keep track of
  17. ///     e.g. where the name is not known in advance.
  18. ///     It is also worth knowing that a PID will cache the backing Actor Process behind the scenes,
  19. ///     so once it is resolved, it will be kept, while if you use FromAddress, you get a new PID,
  20. ///     which will have to resolve the actor process for every new PID.
  21. /// </remarks>
  22. internal sealed class SubscriptionProxy : IActor
  23. {
  24.     // This state will be gone after an actor restart. Timers only survive restarts.
  25.     // If you want to keep the state, either use persistence, or never let the actor restart,
  26.     // i.e. try/catch to prevent failures.
  27.     private readonly Dictionary<string, PID> _subscribers = new();
  28.  
  29.     private readonly ILogger<SubscriptionProxy> _logger;
  30.     private readonly IServiceProvider _serviceProvider;
  31.  
  32.     public SubscriptionProxy(ILogger<SubscriptionProxy> logger, IServiceProvider serviceProvider)
  33.     {
  34.         ArgumentNullException.ThrowIfNull(logger);
  35.         ArgumentNullException.ThrowIfNull(serviceProvider);
  36.  
  37.         _logger = logger;
  38.         _serviceProvider = serviceProvider;
  39.     }
  40.  
  41.     public Task ReceiveAsync(IContext context)
  42.     {
  43.         switch (context.Message)
  44.         {
  45.             case Started:
  46.                 break;
  47.  
  48.             case SubscriberRequest { Exchange: var exchange, AccountId: var accountId }:
  49.                 CreateChild(context, exchange, accountId);
  50.                 break;
  51.  
  52.             case OneOffBackFill { Exchange: var exchange, AccountId: var accountId }:
  53.                 ForwardToChild(context, exchange, accountId);
  54.                 break;
  55.  
  56.             case Notification:
  57.                 SendToAllChildren(context);
  58.                 break;
  59.  
  60.             case Terminated:
  61.                 StopAll(context);
  62.                 break;
  63.         }
  64.  
  65.         return Task.CompletedTask;
  66.     }
  67.  
  68.     private void CreateChild(IContext context, string exchange, string accountId)
  69.     {
  70.         if (!_subscribers.TryGetValue($"{exchange}:{accountId}", out _))
  71.         {
  72.             var exchangeFactory = new ExchangeFactory(context, _serviceProvider, 5);
  73.             var child = exchangeFactory.CreateExchange(exchange, accountId);
  74.  
  75.             _subscribers.Add($"{exchange}:{accountId}", child);
  76.         }
  77.     }
  78.  
  79.     private void ForwardToChild(IContext context, string exchange, string accountId)
  80.     {
  81.         if (_subscribers.TryGetValue($"{exchange}:{accountId}", out var pid))
  82.         {
  83.             context.Forward(pid);
  84.         }
  85.         else
  86.         {
  87.             _logger.LogInformation("The PID for account id {AccountId} was not found", accountId);
  88.         }
  89.     }
  90.  
  91.     private void SendToAllChildren(IContext context)
  92.     {
  93.         // Fan-out mechanism
  94.         foreach (var child in context.Children)
  95.         {
  96.             context.Forward(child);
  97.         }
  98.     }
  99.  
  100.     private void StopAll(IContext context)
  101.     {
  102.         // Terminates parent and its children
  103.         context.Stop(context.Self);
  104.     }
  105. }
  106.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement