Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package akka.persistence.journal
- import akka.actor.{Actor, Terminated}
- import akka.persistence.JournalProtocol.{RecoverySuccess, ReplayMessagesFailure}
- import akka.persistence.journal.leveldb.LeveldbJournal._
- import akka.persistence.journal.leveldb.SharedLeveldbStore
- import akka.pattern._
- import scala.concurrent.Future
- class MySharedLeveldbJournal extends SharedLeveldbStore {
- override def receive: PartialFunction[Any, Unit] = receivePluginInternal orElse super.receive
- def receivePluginInternal: Receive = {
- case r@ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) ⇒
- import context.dispatcher
- val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
- asyncReadHighestSequenceNr(tagAsPersistenceId(tag), readHighestSequenceNrFrom)
- .flatMap { highSeqNr ⇒
- val toSeqNr = math.min(toSequenceNr, highSeqNr)
- if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
- Future.successful(highSeqNr)
- else {
- asyncReplayTaggedMessages(tag, fromSequenceNr, toSeqNr, max) {
- case ReplayedTaggedMessage(p, tag, offset) ⇒
- adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒
- replyTo.tell(ReplayedTaggedMessage(adaptedPersistentRepr, tag, offset), Actor.noSender)
- }
- }.map(_ ⇒ highSeqNr)
- }
- }.map {
- highSeqNr ⇒ RecoverySuccess(highSeqNr)
- }.recover {
- case e ⇒ ReplayMessagesFailure(e)
- }.pipeTo(replyTo)
- case SubscribePersistenceId(persistenceId: String) ⇒
- addPersistenceIdSubscriber(sender(), persistenceId)
- context.watch(sender())
- case SubscribeAllPersistenceIds ⇒
- addAllPersistenceIdsSubscriber(sender())
- context.watch(sender())
- case SubscribeTag(tag: String) ⇒
- addTagSubscriber(sender(), tag)
- context.watch(sender())
- case Terminated(ref) ⇒
- removeSubscriber(ref)
- }
- }
Add Comment
Please, Sign In to add comment