Advertisement
Guest User

Untitled

a guest
Apr 2nd, 2015
266
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Scala 3.57 KB | None | 0 0
  1.  
  2. import akka.actor.Actor.Receive
  3. import akka.actor._
  4. import akka.pattern.pipe
  5. import com.github.levkhomich.akka.tracing.{TracingActorLogging, ActorTracing}
  6.  
  7. import scala.concurrent.{ExecutionContext, Future}
  8.  
  9.  
  10.  
  11. object RequestProcessor extends StagingSupport
  12. {
  13.     case object Timeout
  14.     case class Complete(response: Any)
  15. }
  16.  
  17.  
  18. trait StagingSupport {
  19.     type Stage = Function[Receive, Receive]
  20. }
  21.  
  22.  
  23. // TODO: doc
  24. abstract class RequestProcessor[+T: Manifest]
  25.         extends Actor with ActorTracing with TracingActorLogging with StagingSupport
  26. {
  27.     import RequestProcessor._
  28.  
  29.     var _rootSender: Option[ActorRef] = None
  30.     var _rootRequest: Option[Any] = None
  31.  
  32.     def rootSender = _rootSender.get
  33.     def rootRequest = _rootRequest.get.asInstanceOf[T]
  34.  
  35.     private var timeoutSchedule: Option[Cancellable] = None
  36.  
  37.     override def preStart() : Unit = {
  38.         super.preStart()
  39.         // TODO: provide additional timeouts for HTTP
  40.         import context.dispatcher
  41.         timeoutSchedule = Some(
  42.             context.system.scheduler.scheduleOnce(CommonConfig.Timeouts.Background.duration) {
  43.                 self ! Timeout
  44.             })
  45.     }
  46.  
  47.     override def postStop() = {
  48.         timeoutSchedule.foreach(_.cancel())
  49.         super.postStop()
  50.     }
  51.  
  52.  
  53.     override def receive: Receive = stages(processRequest)
  54.  
  55.     protected def Receive(receive: Receive): Receive = receive
  56.  
  57.     protected def stages: Stage = handleInitRequest
  58.  
  59.  
  60.     protected def stage(nextStage: Receive): Unit = {
  61.         context.become(nextStage)
  62.         self.tell(rootRequest, rootSender)
  63.     }
  64.  
  65.  
  66.     def handleInitRequest(nextStage: Receive): Receive = {
  67.         case request if manifest[T].runtimeClass.isInstance(request)
  68.             log.debug(s"Received init request: $request")
  69.             _rootSender = Some(sender())
  70.             _rootRequest = Some(request)
  71.             stage(nextStage)
  72.             log.debug("Initialization complete going to the next stage")
  73.     }
  74.  
  75.     def processRequest: Receive
  76.  
  77.     protected def handleDefaults = handleComplete orElse handleTimeout
  78.  
  79.     @deprecated("Default handling now implemented by overriding unhandled method", "")
  80.     protected def withHandleDefaults(f: Receive): Receive = f
  81.  
  82.     protected def handleUnexpected: Receive = {
  83.         case message ⇒
  84.             log.error(s"Handled unexpected message ${message.getClass.getName} from ${sender().path}")
  85.     }
  86.  
  87.     protected def handleTimeout: Receive = handleTimeoutWith(complete(Timeout))
  88.  
  89.     protected def handleTimeoutWith(f: ⇒ Unit): Receive = {
  90.         case Timeout ⇒
  91.             log.warning(s"Timeout while processing ${_rootRequest}")
  92.             f
  93.     }
  94.  
  95.     protected def handleComplete: Receive = {
  96.         case Complete(response) ⇒ complete(response)
  97.     }
  98.  
  99.     @deprecated("use become instead", "")
  100.     protected def becomeWithHandleDefaults(receive: Receive) = context.become(receive)
  101.  
  102.  
  103.     override def unhandled(message: Any): Unit = (handleDefaults orElse ({
  104.         case m ⇒
  105.             super.unhandled(m)
  106.             handleUnexpected(message)
  107.     } : Receive))(message)
  108.  
  109.     protected def complete(response: Any): Unit = {
  110.         log.debug(s"Complete with: $response")
  111.         _rootSender.foreach(_.tell(response, self))
  112.         context.stop(self)
  113.     }
  114.  
  115.  
  116.     protected def completeWithResult(future: Future[Any])(implicit executionContext: ExecutionContext): Unit = {
  117.         future pipeTo self
  118.         context.become {
  119.                 case r ⇒ complete(r)
  120.         }
  121.     }
  122. }
  123.  
  124.  
  125. // TODO: Implement TimeoutGuardian to check unhandled timeouts
  126. //object TimeoutGuardian {
  127. //  case class RespondTo(actorRef: ActorRef)
  128. //}
  129. //
  130. //class TimeoutGuardian(guarded: ActorRef) extends Actor with ActorLogging {
  131. //  import TimeoutGuardian._
  132. //  import RequestProcessor._
  133. //
  134. //  private var _respondTo: Option[ActorRef] = None
  135. //
  136. //  override def receive = {
  137. //      case Timeout ⇒
  138. //
  139. //  }
  140. //}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement