Guest User

Untitled

a guest
Jul 21st, 2018
94
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.99 KB | None | 0 0
  1. /**
  2. * I had a question directed at me, on how to encode the following scenario in Akka Actors,
  3. * as for Scala Actors one would simply nest the receives.
  4. *
  5. * Recuirements are as follows:
  6. * The first thing the actor needs to do, is to subscribe to a channel of events,
  7. * Then it must replay (process) all "old" events
  8. * Then it has to wait for a GoAhead signal to begin processing the new events
  9. * It mustn't "miss" events that happen between catching up with the old events and getting the GoAhead signal
  10. */
  11.  
  12. import akka.actor._
  13. import akka.actor.Actor._
  14. import scala.collection.mutable.ListBuffer
  15.  
  16. class MyActor extends Actor {
  17. //If you need to store sender/senderFuture you can change it to ListBuffer[(Any, Channel)]
  18. val queue = new ListBuffer[Any]()
  19.  
  20. //This message processes a message/event
  21. def process(msg: Any): Unit = println("processing: " + msg)
  22. //This method subscribes the actor to the event bus
  23. def subscribe() {} //Your external stuff
  24. //This method retrieves all prior messages/events
  25. def allOldMessages() = List()
  26.  
  27. override def preStart {
  28. //We override preStart to be sure that the first message the actor gets is
  29. //'Replay, that message will start to be processed _after_ the actor is started
  30. self ! 'Replay
  31. //Then we subscribe to the stream of messages/events
  32. subscribe()
  33. }
  34.  
  35. def receive = {
  36. case 'Replay => //Our first message should be a 'Replay message, all others are invalid
  37. allOldMessages() foreach process //Process all old messages/events
  38. become { //Switch behavior to look for the GoAhead signal
  39. case 'GoAhead => //When we get the GoAhead signal we process all our buffered messages/events
  40. queue foreach process
  41. queue.clear
  42. become { //Then we change behaviour to process incoming messages/events as they arrive
  43. case msg => process(msg)
  44. }
  45. case msg => //While we haven't gotten the GoAhead signal, buffer all incoming messages
  46. queue += msg //Here you have full control, you can handle overflow etc
  47. }
  48. }
  49. }
Add Comment
Please, Sign In to add comment