Guest User

Untitled

a guest
Jul 20th, 2018
86
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.92 KB | None | 0 0
  1. package akka.persistence.journal
  2.  
  3. import akka.actor.{Actor, Terminated}
  4. import akka.persistence.JournalProtocol.{RecoverySuccess, ReplayMessagesFailure}
  5. import akka.persistence.journal.leveldb.LeveldbJournal._
  6. import akka.persistence.journal.leveldb.SharedLeveldbStore
  7. import akka.pattern._
  8. import scala.concurrent.Future
  9.  
  10. class MySharedLeveldbJournal extends SharedLeveldbStore {
  11. override def receive: PartialFunction[Any, Unit] = receivePluginInternal orElse super.receive
  12. def receivePluginInternal: Receive = {
  13. case r@ReplayTaggedMessages(fromSequenceNr, toSequenceNr, max, tag, replyTo) ⇒
  14. import context.dispatcher
  15. val readHighestSequenceNrFrom = math.max(0L, fromSequenceNr - 1)
  16. asyncReadHighestSequenceNr(tagAsPersistenceId(tag), readHighestSequenceNrFrom)
  17. .flatMap { highSeqNr ⇒
  18. val toSeqNr = math.min(toSequenceNr, highSeqNr)
  19. if (highSeqNr == 0L || fromSequenceNr > toSeqNr)
  20. Future.successful(highSeqNr)
  21. else {
  22. asyncReplayTaggedMessages(tag, fromSequenceNr, toSeqNr, max) {
  23. case ReplayedTaggedMessage(p, tag, offset) ⇒
  24. adaptFromJournal(p).foreach { adaptedPersistentRepr ⇒
  25. replyTo.tell(ReplayedTaggedMessage(adaptedPersistentRepr, tag, offset), Actor.noSender)
  26. }
  27. }.map(_ ⇒ highSeqNr)
  28. }
  29. }.map {
  30. highSeqNr ⇒ RecoverySuccess(highSeqNr)
  31. }.recover {
  32. case e ⇒ ReplayMessagesFailure(e)
  33. }.pipeTo(replyTo)
  34.  
  35. case SubscribePersistenceId(persistenceId: String) ⇒
  36. addPersistenceIdSubscriber(sender(), persistenceId)
  37. context.watch(sender())
  38. case SubscribeAllPersistenceIds ⇒
  39. addAllPersistenceIdsSubscriber(sender())
  40. context.watch(sender())
  41. case SubscribeTag(tag: String) ⇒
  42. addTagSubscriber(sender(), tag)
  43. context.watch(sender())
  44. case Terminated(ref) ⇒
  45. removeSubscriber(ref)
  46. }
  47. }
Add Comment
Please, Sign In to add comment