Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package recoverystresstest;
- import java.io.Serializable;
- import java.time.Duration;
- import java.util.concurrent.TimeUnit;
- import akka.actor.Props;
- import akka.actor.ReceiveTimeout;
- import akka.japi.pf.ReceiveBuilder;
- import akka.persistence.AbstractPersistentActor;
- import akka.persistence.RecoveryCompleted;
- public class ReplayActor extends AbstractPersistentActor
- {
- public static final class UpdateCounterCommand implements Serializable {private static final long serialVersionUID = 5873937109283373749L;}
- public static final class CommandReceived implements Serializable {private static final long serialVersionUID = 3573804342081210305L;}
- public static class UpdatedCounterStateEvent implements Serializable
- {
- private static final long serialVersionUID = 4292701424971430047L;
- private final int newCounter;
- public UpdatedCounterStateEvent(final int newCounter)
- {
- this.newCounter = newCounter;
- }
- public int getNewCounter()
- {
- return this.newCounter;
- }
- }
- public static final CommandReceived COMMAND_RECEIVED_DEFAULT_INSTANCE = new CommandReceived();
- private int counter;
- private final String persistenceId;
- public static Props propsForReplayActor(final String persistenceId, final Duration receiveTimeout)
- {
- return Props.create(ReplayActor.class, () -> new ReplayActor(persistenceId, receiveTimeout));
- }
- private ReplayActor(final String persistenceId, final Duration receiveTimeout)
- {
- assert receiveTimeout != null;
- assert persistenceId != null;
- this.counter = 0;
- this.persistenceId = persistenceId;
- context().setReceiveTimeout(scala.concurrent.duration.Duration.create(receiveTimeout.toNanos(), TimeUnit.NANOSECONDS));
- }
- @Override
- public Receive createReceive()
- {
- return ReceiveBuilder.create()
- .match(UpdateCounterCommand.class, this::onUpdateCounterCommand)
- .match(ReceiveTimeout.class, this::onReceiveTimeout)
- .match(String.class, e ->
- {
- System.err.println("test");
- })
- .matchAny(this::unhandled)
- .build();
- }
- private void onUpdateCounterCommand(final UpdateCounterCommand command)
- {
- final UpdatedCounterStateEvent stateEvent = new UpdatedCounterStateEvent(this.counter + 1);
- persist(stateEvent, (event) ->
- {
- respondCommandReceived();
- onStateEvent(event);
- });
- }
- private void onStateEvent(final UpdatedCounterStateEvent stateEvent)
- {
- this.counter = stateEvent.getNewCounter();
- }
- private void respondCommandReceived()
- {
- sender().tell(COMMAND_RECEIVED_DEFAULT_INSTANCE, self());
- }
- private void onReceiveTimeout(final ReceiveTimeout receiveTimeout) throws Exception
- {
- System.out.println("Actor timed out");
- context().stop(self());
- }
- @Override
- public Receive createReceiveRecover()
- {
- return new ReceiveBuilder()
- .match(UpdatedCounterStateEvent.class, this::onStateEvent)
- .match(RecoveryCompleted.class, r ->
- {
- System.out.println("Recovery Completed: " + persistenceId());
- })
- .matchAny(this::unhandled)
- .build();
- }
- @Override
- public String persistenceId()
- {
- return this.persistenceId;
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement