Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import akka.actor.Actor.Receive
- import akka.actor._
- import akka.pattern.pipe
- import com.github.levkhomich.akka.tracing.{TracingActorLogging, ActorTracing}
- import scala.concurrent.{ExecutionContext, Future}
- object RequestProcessor extends StagingSupport
- {
- case object Timeout
- case class Complete(response: Any)
- }
- trait StagingSupport {
- type Stage = Function[Receive, Receive]
- }
- // TODO: doc
- abstract class RequestProcessor[+T: Manifest]
- extends Actor with ActorTracing with TracingActorLogging with StagingSupport
- {
- import RequestProcessor._
- var _rootSender: Option[ActorRef] = None
- var _rootRequest: Option[Any] = None
- def rootSender = _rootSender.get
- def rootRequest = _rootRequest.get.asInstanceOf[T]
- private var timeoutSchedule: Option[Cancellable] = None
- override def preStart() : Unit = {
- super.preStart()
- // TODO: provide additional timeouts for HTTP
- import context.dispatcher
- timeoutSchedule = Some(
- context.system.scheduler.scheduleOnce(CommonConfig.Timeouts.Background.duration) {
- self ! Timeout
- })
- }
- override def postStop() = {
- timeoutSchedule.foreach(_.cancel())
- super.postStop()
- }
- override def receive: Receive = stages(processRequest)
- protected def Receive(receive: Receive): Receive = receive
- protected def stages: Stage = handleInitRequest
- protected def stage(nextStage: Receive): Unit = {
- context.become(nextStage)
- self.tell(rootRequest, rootSender)
- }
- def handleInitRequest(nextStage: Receive): Receive = {
- case request if manifest[T].runtimeClass.isInstance(request) ⇒
- log.debug(s"Received init request: $request")
- _rootSender = Some(sender())
- _rootRequest = Some(request)
- stage(nextStage)
- log.debug("Initialization complete going to the next stage")
- }
- def processRequest: Receive
- protected def handleDefaults = handleComplete orElse handleTimeout
- @deprecated("Default handling now implemented by overriding unhandled method", "")
- protected def withHandleDefaults(f: Receive): Receive = f
- protected def handleUnexpected: Receive = {
- case message ⇒
- log.error(s"Handled unexpected message ${message.getClass.getName} from ${sender().path}")
- }
- protected def handleTimeout: Receive = handleTimeoutWith(complete(Timeout))
- protected def handleTimeoutWith(f: ⇒ Unit): Receive = {
- case Timeout ⇒
- log.warning(s"Timeout while processing ${_rootRequest}")
- f
- }
- protected def handleComplete: Receive = {
- case Complete(response) ⇒ complete(response)
- }
- @deprecated("use become instead", "")
- protected def becomeWithHandleDefaults(receive: Receive) = context.become(receive)
- override def unhandled(message: Any): Unit = (handleDefaults orElse ({
- case m ⇒
- super.unhandled(m)
- handleUnexpected(message)
- } : Receive))(message)
- protected def complete(response: Any): Unit = {
- log.debug(s"Complete with: $response")
- _rootSender.foreach(_.tell(response, self))
- context.stop(self)
- }
- protected def completeWithResult(future: Future[Any])(implicit executionContext: ExecutionContext): Unit = {
- future pipeTo self
- context.become {
- case r ⇒ complete(r)
- }
- }
- }
- // TODO: Implement TimeoutGuardian to check unhandled timeouts
- //object TimeoutGuardian {
- // case class RespondTo(actorRef: ActorRef)
- //}
- //
- //class TimeoutGuardian(guarded: ActorRef) extends Actor with ActorLogging {
- // import TimeoutGuardian._
- // import RequestProcessor._
- //
- // private var _respondTo: Option[ActorRef] = None
- //
- // override def receive = {
- // case Timeout ⇒
- //
- // }
- //}
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement