Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- // --------- DOMAIN MODEL ------------
- sealed trait TaskStatus
- case object Pending extends TaskStatus
- case object Complete extends TaskStatus
- case class Task(id: String, status: TaskStatus = Pending)
- case class Job(id: String, tasks: Seq[Task]) {
- def isFinished = tasks.forall(_.status == Complete)
- def finishTask(task: Task) = copy(tasks = task :: tasks.filterNot(_.id == task.id))
- }
- // --------- DATABASE ------------
- trait JobDatabase {
- def save(job: Job): Future[Job]
- def get(jobId: String): Future[Job]
- }
- // Job Manager oversees ALL jobs
- // Supervision not here, but assert that we want to restart jobs on failure
- object JobManager {
- case class Submit(job: Job)
- }
- class JobManager(db: JobDatabase) extends Actor {
- import JobManager._
- def receive = {
- case Submit(job: Job) =>
- db.save(job).onComplete {
- case Success(job) =>
- // kick off a new job processing, we give the id as the name so it can load its state
- context.actorOf(Props(classOf[JobProcessor], db), job.id)
- case Failure(e) => ??? // doesn't matter for this discussion
- }
- }
- }
- // Job Processor handles the processing of a long running individual job, saving state as each task is complete...
- // He loads his state from the database when he starts
- object JobProcessor {
- case object Process
- case class Loaded(job: Job)
- case class Saved(job: Job)
- }
- class JobProcessor(db: JobDatabase) extends Actor with Stash {
- import akka.pattern.pipe
- import scala.concurrent.ExecutionContext.Implicits.global
- val jobId = self.path.name
- var state: Job = _
- def receive = loading
- def loading: Receive = {
- case JobProcessor.Loaded(job) =>
- // job was loaded from the database
- state = job
- state.tasks.foreach(processTask)
- unstashAll()
- context.become(running, false)
- case akka.actor.Status.Failure(e) =>
- throw e // notify parent to restart us
- }
- def running: Receive = {
- case TaskProcessor.Completed(task) =>
- // update the job to mark this task as finished and save that
- db.save(state.finishTask(task)).map(JobProcessor.Saved) pipeTo self
- case JobProcessor.Saved(job) =>
- state = job
- if (state.isFinished)
- self ! PoisonPill
- case akka.actor.Status.Failure(e) =>
- throw e // notify parent to restart us
- }
- private def processTask(task: Task) =
- context.actorOf(Props(classOf[TaskProcessor])) ! TaskProcessor.Process(task)
- override def preStart(): Unit = {
- db.get(jobId).map(JobProcessor.Loaded) pipeTo self
- super.preStart()
- }
- }
- // Processes an individual task, I am less concernted about this guy, so do something simple
- object TaskProcessor {
- case class Process(task: Task)
- case class Completed(task: Task)
- }
- class TaskProcessor extends Actor {
- def receive = {
- case TaskProcessor.Process(task) =>
- Thread.sleep(100)
- context.parent ! TaskProcessor.Completed(task.copy(status = Complete))
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement