Guest User

ReplayActor.java

a guest
Aug 22nd, 2018
30
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. package recoverystresstest;
  2.  
  3. import java.io.Serializable;
  4. import java.time.Duration;
  5. import java.util.concurrent.TimeUnit;
  6.  
  7. import akka.actor.Props;
  8. import akka.actor.ReceiveTimeout;
  9. import akka.japi.pf.ReceiveBuilder;
  10. import akka.persistence.AbstractPersistentActor;
  11. import akka.persistence.RecoveryCompleted;
  12.  
  13.  
  14. public class ReplayActor extends AbstractPersistentActor
  15. {
  16.  
  17.     public static final class UpdateCounterCommand implements Serializable {private static final long serialVersionUID = 5873937109283373749L;}
  18.     public static final class CommandReceived implements Serializable {private static final long serialVersionUID = 3573804342081210305L;}
  19.     public static class UpdatedCounterStateEvent implements Serializable
  20.     {
  21.         private static final long serialVersionUID = 4292701424971430047L;
  22.         private final int newCounter;
  23.  
  24.         public UpdatedCounterStateEvent(final int newCounter)
  25.         {
  26.             this.newCounter = newCounter;
  27.         }
  28.  
  29.         public int getNewCounter()
  30.         {
  31.             return this.newCounter;
  32.         }
  33.     }
  34.  
  35.     public static final CommandReceived COMMAND_RECEIVED_DEFAULT_INSTANCE = new CommandReceived();
  36.     private int counter;
  37.     private final String persistenceId;
  38.  
  39.  
  40.     public static Props propsForReplayActor(final String persistenceId, final Duration receiveTimeout)
  41.     {
  42.         return Props.create(ReplayActor.class, () -> new ReplayActor(persistenceId, receiveTimeout));
  43.     }
  44.  
  45.  
  46.     private ReplayActor(final String persistenceId, final Duration receiveTimeout)
  47.     {
  48.         assert receiveTimeout != null;
  49.         assert persistenceId != null;
  50.  
  51.         this.counter = 0;
  52.         this.persistenceId = persistenceId;
  53.         context().setReceiveTimeout(scala.concurrent.duration.Duration.create(receiveTimeout.toNanos(), TimeUnit.NANOSECONDS));
  54.     }
  55.  
  56.     @Override
  57.     public Receive createReceive()
  58.     {
  59.         return ReceiveBuilder.create()
  60.                 .match(UpdateCounterCommand.class, this::onUpdateCounterCommand)
  61.                 .match(ReceiveTimeout.class, this::onReceiveTimeout)
  62.                 .match(String.class, e ->
  63.                 {
  64.                     System.err.println("test");
  65.                 })
  66.                 .matchAny(this::unhandled)
  67.                 .build();
  68.     }
  69.  
  70.     private void onUpdateCounterCommand(final UpdateCounterCommand command)
  71.     {
  72.         final UpdatedCounterStateEvent stateEvent = new UpdatedCounterStateEvent(this.counter + 1);
  73.  
  74.         persist(stateEvent, (event) ->
  75.         {
  76.             respondCommandReceived();
  77.             onStateEvent(event);
  78.         });
  79.     }
  80.  
  81.     private void onStateEvent(final UpdatedCounterStateEvent stateEvent)
  82.     {
  83.         this.counter = stateEvent.getNewCounter();
  84.     }
  85.  
  86.     private void respondCommandReceived()
  87.     {
  88.         sender().tell(COMMAND_RECEIVED_DEFAULT_INSTANCE, self());
  89.     }
  90.  
  91.     private void onReceiveTimeout(final ReceiveTimeout receiveTimeout) throws Exception
  92.     {
  93.         System.out.println("Actor timed out");
  94.         context().stop(self());
  95.     }
  96.  
  97.     @Override
  98.     public Receive createReceiveRecover()
  99.     {
  100.         return new ReceiveBuilder()
  101.                 .match(UpdatedCounterStateEvent.class, this::onStateEvent)
  102.                 .match(RecoveryCompleted.class, r ->
  103.                 {
  104.                     System.out.println("Recovery Completed: " + persistenceId());
  105.                 })
  106.                 .matchAny(this::unhandled)
  107.                 .build();
  108.     }
  109.  
  110.     @Override
  111.     public String persistenceId()
  112.     {
  113.         return this.persistenceId;
  114.     }
  115. }
RAW Paste Data