Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- class DistributedTellDispatcher(val role: Option[String]) extends Proxy with RandomSelector with ActorTracing {
- val path = self.path.elements.mkString("/")
- def receiver = random
- override protected def serviceName: String =
- NodeConfig.moduleName
- override def process: Receive = {
- case msg: TracingSupport if role.isEmpty || role.get == NodeConfig.role.toString =>
- trace.sample(msg, NodeConfig.moduleName)
- CommonServices.apiRouter.tell(msg, sender())
- case msg if role.isEmpty || role.get == NodeConfig.role.toString =>
- CommonServices.apiRouter.tell(msg, sender())
- }
- }
- class DistributedAskDispatcher(val role: Option[String]) extends Proxy with RandomSelector with ActorTracing {
- import context.dispatcher
- import me.selfish.common.CommonConfig.Timeouts.Background
- val path = self.path.elements.mkString("/")
- def receiver = random
- override protected def serviceName: String =
- NodeConfig.moduleName
- override def process: Receive = {
- case msg: TracingSupport if role.isEmpty || role.get == NodeConfig.role.toString =>
- trace.sample(msg, NodeConfig.moduleName)
- CommonServices.apiRouter ? msg map {
- case result =>
- trace.record(msg, "Ask service response: " + result)
- trace.finish(msg)
- result
- } recover {
- case e: ControlThrowable =>
- throw e
- case e: AskTimeoutException =>
- trace.record(msg, "Ask timeout in " + NodeConfig.role + " node")
- trace.finish(msg)
- ErrorResponse.ServiceUnavailable
- case e: Throwable =>
- trace.record(msg, "InternalServerError in " + NodeConfig.role + " node")
- trace.record(msg, e)
- trace.finish(msg)
- ErrorResponse.InternalServerError("distributed ask error " + msg)
- } pipeTo sender
- case msg if role.isEmpty || role.get == NodeConfig.role.toString =>
- CommonServices.apiRouter.tell(msg, sender())
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement