Guest User

Untitled

a guest
May 27th, 2018
78
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 5.14 KB | None | 0 0
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using Akka.Actor;
  6. using Akka.Persistence;
  7. using Akka.TestKit.NUnit3;
  8. using NUnit.Framework;
  9.  
  10. namespace AkkaTest.UnitTest
  11. {
  12. [TestFixture]
  13. internal sealed class SupervisorTest : TestKit
  14. {
  15. private IActorRef m_actor;
  16. private Supervisor m_actorConcrete;
  17.  
  18. [SetUp]
  19. public void SetUp()
  20. {
  21. var testActor = ActorOfAsTestActorRef<Supervisor>(Props.Create(() => new Supervisor()));
  22. m_actor = testActor.Ref;
  23. m_actorConcrete = testActor.UnderlyingActor;
  24. }
  25.  
  26. [TearDown]
  27. public void TearDown()
  28. {
  29. }
  30.  
  31. [Test]
  32. public void StartStashedWhenNotYetInitialized()
  33. {
  34. using (var mre = new ManualResetEventSlim(false))
  35. {
  36. m_actorConcrete.Done += mre.Set;
  37.  
  38. var itemId = Guid.NewGuid();
  39. m_actor.Tell(new Confirmable<Started>(123, new Started(itemId)));
  40. m_actor.Tell(new Confirmable<Pending>(124, new Pending(new List<Guid> {itemId})));
  41.  
  42. Assert.IsTrue(mre.Wait(TimeSpan.FromMinutes(2)));
  43. ExpectMsg<DeliveryConfirmation>(x => x.MessageId == 123);
  44. ExpectMsg<DeliveryConfirmation>(x => x.MessageId == 124);
  45. }
  46. }
  47.  
  48. internal sealed class Confirmable<T>
  49. {
  50. public Confirmable(long messageId, T message)
  51. {
  52. MessageId = messageId;
  53. Message = message;
  54. }
  55.  
  56. public long MessageId { get; }
  57.  
  58. public T Message { get; }
  59. }
  60.  
  61. internal sealed class DeliveryConfirmation
  62. {
  63. public DeliveryConfirmation(long messageId)
  64. {
  65. MessageId = messageId;
  66. }
  67.  
  68. public long MessageId { get; }
  69. }
  70.  
  71. internal sealed class Pending
  72. {
  73. public Pending(IEnumerable<Guid> information)
  74. {
  75. Ids = information ?? throw new ArgumentNullException(nameof(information));
  76. }
  77.  
  78. public IEnumerable<Guid> Ids { get; private set; }
  79. }
  80.  
  81. internal sealed class Started
  82. {
  83. public Started(Guid id)
  84. {
  85. Id = id;
  86. }
  87.  
  88. public Guid Id { get; private set; }
  89. }
  90.  
  91. internal abstract class ConfirmablePersistentActor : AtLeastOnceDeliveryReceiveActor
  92. {
  93. protected ConfirmablePersistentActor()
  94. {
  95. Recover<DeliveryConfirmation>(OnReceive, null);
  96. Command<DeliveryConfirmation>(OnCommand, null);
  97. Recover<SnapshotOffer>(offer => offer.Snapshot is AtLeastOnceDeliverySnapshot, offer =>
  98. {
  99. var snapshot = offer.Snapshot as AtLeastOnceDeliverySnapshot;
  100. SetDeliverySnapshot(snapshot);
  101. });
  102. Command<SaveSnapshotSuccess>(saved =>
  103. {
  104. var seqNo = saved.Metadata.SequenceNr;
  105. DeleteSnapshots(new SnapshotSelectionCriteria(seqNo, saved.Metadata.Timestamp.AddMilliseconds(-1)));
  106. });
  107. }
  108.  
  109. private void OnCommand(DeliveryConfirmation message)
  110. {
  111. Persist(message, OnReceive);
  112. }
  113.  
  114. private void OnReceive(DeliveryConfirmation message)
  115. {
  116. ConfirmDelivery(message.MessageId);
  117. }
  118.  
  119. protected void ConfirmedSend<T>(IActorRef destination, T message)
  120. {
  121. Deliver(destination.Path, id => new Confirmable<T>(id, message));
  122. SaveSnapshot(GetDeliverySnapshot());
  123. }
  124.  
  125. /*protected override void OnPersistFailure(Exception cause, object @event, long sequenceNr)
  126. {
  127. base.OnPersistFailure(cause, @event, sequenceNr);
  128. }
  129.  
  130. protected override void OnPersistRejected(Exception cause, object @event, long sequenceNr)
  131. {
  132. base.OnPersistRejected(cause, @event, sequenceNr);
  133. }
  134.  
  135. protected override void Unhandled(object message)
  136. {
  137. base.Unhandled(message);
  138. }*/
  139.  
  140. protected void RegisterConfirmable<T>(Action<T> handler)
  141. {
  142. Action<Confirmable<T>> confirmHandler = confirmable =>
  143. {
  144. Confirm(confirmable.MessageId);
  145. handler(confirmable.Message);
  146. };
  147. Recover<Confirmable<T>>(confirmHandler);
  148. Command<Confirmable<T>>(confirmable => Persist(confirmable, confirmHandler));
  149. }
  150.  
  151. private void Confirm(long messageId)
  152. {
  153. Sender.Tell(new DeliveryConfirmation(messageId), Self);
  154. }
  155. }
  156.  
  157. internal sealed class Supervisor : ConfirmablePersistentActor, IWithUnboundedStash
  158. // ReceiveActor, IWithUnboundedStash
  159. {
  160. private readonly List<Guid> m_ids;
  161.  
  162. public Supervisor()
  163. {
  164. m_ids = new List<Guid>();
  165.  
  166. /*Receive<Confirmable<Pending>>(OnReceive2, null);
  167. Receive<Confirmable<Started>>(OnReceive2, null);*/
  168.  
  169. PersistenceId = "Supervisor";
  170. RegisterConfirmable<Pending>(OnReceive);
  171. RegisterConfirmable<Started>(OnReceive);
  172. }
  173.  
  174. public event Action Done;
  175.  
  176. public override string PersistenceId { get; }
  177.  
  178. /*public new IStash Stash
  179. {
  180. get;
  181. set;
  182. }*/
  183.  
  184. protected override void PreStart()
  185. {
  186. base.PreStart();
  187. m_ids.Clear();
  188. }
  189.  
  190. private bool StashIfNotYetInitialized()
  191. {
  192. if (m_ids.Any())
  193. {
  194. return false;
  195. }
  196.  
  197. Stash.Stash();
  198. return true;
  199. }
  200.  
  201. /*private void OnReceive2(Confirmable<Pending> msg)
  202. {
  203. OnReceive(msg.Message);
  204. }
  205.  
  206. private void OnReceive2(Confirmable<Started> msg)
  207. {
  208. OnReceive(msg.Message);
  209. }*/
  210.  
  211. private void OnReceive(Pending pending)
  212. {
  213. foreach (var pendingId in pending.Ids)
  214. {
  215. if (m_ids.Contains(pendingId))
  216. {
  217. continue;
  218. }
  219.  
  220. m_ids.Add(pendingId);
  221. }
  222. Stash.UnstashAll();
  223. }
  224.  
  225. private void OnReceive(Started started)
  226. {
  227. if (StashIfNotYetInitialized())
  228. {
  229. return;
  230. }
  231. Done?.Invoke();
  232. }
  233. }
  234. }
  235. }
Add Comment
Please, Sign In to add comment