Advertisement
Guest User

Untitled

a guest
Oct 28th, 2016
63
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.72 KB | None | 0 0
  1. object ReportGeneratorApplication extends App {
  2.  
  3. implicit val system: ActorSystem = ActorSystem()
  4. implicit val executor = system.dispatcher
  5. implicit val materializer = ActorMaterializer()
  6.  
  7. val camelExtension: Camel = CamelExtension(system);
  8. val amqc: ActiveMQComponent = ActiveMQComponent.activeMQComponent(env.getString("jms.url"))
  9. amqc.setUsePooledConnection(true)
  10. amqc.setAsyncConsumer(true)
  11. amqc.setTrustAllPackages(true)
  12. amqc.setConcurrentConsumers(1)
  13. camelExtension.context.addComponent("jms", amqc);
  14. val jmsProducer: ActorRef = system.actorOf(Props[ActiveMQProducerActor])
  15.  
  16. //Is this the correct way to pass the materializer?
  17. val jmsConsumer: ActorRef = system.actorOf(Props(new ActiveMQConsumerActor()(materializer)), name = "jmsConsumer")
  18.  
  19. val endpoint: ReportEndpoint = new ReportEndpoint(jmsProducer);
  20. Http().bindAndHandle(endpoint.routes, "localhost", 8881)
  21. }
  22.  
  23. class ReportEndpoint(jmsProducer: ActorRef)
  24. (implicit val system:ActorSystem,
  25. implicit val executor: ExecutionContext,
  26. implicit val materializer : ActorMaterializer)
  27. extends JsonSupport with Mongo {
  28.  
  29.  
  30. val routes =
  31. pathPrefix("reports"){
  32. post {
  33. path("generate"){
  34. entity(as[DataRequest]) { request =>
  35. val id = java.util.UUID.randomUUID.toString
  36.  
  37. // **Enqueue the request into ActiveMq**
  38. jmsProducer ! request
  39. val future: Future[Seq[Completed]] = insertReport(request)
  40. complete {
  41. future.map[ToResponseMarshallable](r => r.head match {
  42. case r : Completed => println(r); s"Reporte Generado con id $id"
  43. case _ => HttpResponse(StatusCodes.InternalServerError, entity = "Error al generar reporte")
  44. })
  45. }
  46. }
  47. }
  48. } ....
  49.  
  50. //Is this the correct way to pass the materializer??
  51. class ActiveMQConsumerActor (implicit materializer : ActorMaterializer) extends Consumer with Base {
  52. override def endpointUri: String = env.getString("jms.queue")
  53. val log = Logging(context.system, this)
  54. val reportActor: ActorRef = context.actorOf(Props(new ReportBuilderActor()(materializer)), name = "reportActor")
  55.  
  56. override def receive: Receive = {
  57. case msg: CamelMessage => msg.body match {
  58. case data: DataRequest => {
  59. //I need only one task running
  60. Source.single(data).buffer(1, OverflowStrategy.backpressure).to(Sink.foreach(d => reportActor ! d)).run()
  61. }
  62. case _ => log.info("Invalid")
  63. }
  64. case _ => UnhandledMessage
  65. }
  66. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement