Advertisement
Guest User

Untitled

a guest
Feb 5th, 2016
50
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.90 KB | None | 0 0
  1. // --------- DOMAIN MODEL ------------
  2. sealed trait TaskStatus
  3. case object Pending extends TaskStatus
  4. case object Complete extends TaskStatus
  5.  
  6. case class Task(id: String, status: TaskStatus = Pending)
  7.  
  8. case class Job(id: String, tasks: Seq[Task]) {
  9. def isFinished = tasks.forall(_.status == Complete)
  10.  
  11. def finishTask(task: Task) = copy(tasks = task :: tasks.filterNot(_.id == task.id))
  12. }
  13.  
  14. // --------- DATABASE ------------
  15. trait JobDatabase {
  16.  
  17. def save(job: Job): Future[Job]
  18.  
  19. def get(jobId: String): Future[Job]
  20. }
  21.  
  22. // Job Manager oversees ALL jobs
  23. // Supervision not here, but assert that we want to restart jobs on failure
  24. object JobManager {
  25.  
  26. case class Submit(job: Job)
  27. }
  28. class JobManager(db: JobDatabase) extends Actor {
  29.  
  30. import JobManager._
  31.  
  32. def receive = {
  33. case Submit(job: Job) =>
  34. db.save(job).onComplete {
  35. case Success(job) =>
  36. // kick off a new job processing, we give the id as the name so it can load its state
  37. context.actorOf(Props(classOf[JobProcessor], db), job.id)
  38.  
  39. case Failure(e) => ??? // doesn't matter for this discussion
  40. }
  41. }
  42. }
  43.  
  44. // Job Processor handles the processing of a long running individual job, saving state as each task is complete...
  45. // He loads his state from the database when he starts
  46. object JobProcessor {
  47.  
  48. case object Process
  49. case class Loaded(job: Job)
  50. case class Saved(job: Job)
  51. }
  52. class JobProcessor(db: JobDatabase) extends Actor with Stash {
  53.  
  54. import akka.pattern.pipe
  55. import scala.concurrent.ExecutionContext.Implicits.global
  56.  
  57. val jobId = self.path.name
  58.  
  59. var state: Job = _
  60.  
  61. def receive = loading
  62.  
  63. def loading: Receive = {
  64. case JobProcessor.Loaded(job) =>
  65. // job was loaded from the database
  66. state = job
  67. state.tasks.foreach(processTask)
  68. unstashAll()
  69. context.become(running, false)
  70.  
  71. case akka.actor.Status.Failure(e) =>
  72. throw e // notify parent to restart us
  73. }
  74.  
  75. def running: Receive = {
  76. case TaskProcessor.Completed(task) =>
  77. // update the job to mark this task as finished and save that
  78. db.save(state.finishTask(task)).map(JobProcessor.Saved) pipeTo self
  79.  
  80. case JobProcessor.Saved(job) =>
  81. state = job
  82. if (state.isFinished)
  83. self ! PoisonPill
  84.  
  85. case akka.actor.Status.Failure(e) =>
  86. throw e // notify parent to restart us
  87. }
  88.  
  89. private def processTask(task: Task) =
  90. context.actorOf(Props(classOf[TaskProcessor])) ! TaskProcessor.Process(task)
  91.  
  92. override def preStart(): Unit = {
  93. db.get(jobId).map(JobProcessor.Loaded) pipeTo self
  94. super.preStart()
  95. }
  96. }
  97.  
  98. // Processes an individual task, I am less concernted about this guy, so do something simple
  99. object TaskProcessor {
  100. case class Process(task: Task)
  101. case class Completed(task: Task)
  102. }
  103. class TaskProcessor extends Actor {
  104.  
  105. def receive = {
  106. case TaskProcessor.Process(task) =>
  107. Thread.sleep(100)
  108. context.parent ! TaskProcessor.Completed(task.copy(status = Complete))
  109. }
  110. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement