Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- object ReportGeneratorApplication extends App {
- implicit val system: ActorSystem = ActorSystem()
- implicit val executor = system.dispatcher
- implicit val materializer = ActorMaterializer()
- val camelExtension: Camel = CamelExtension(system);
- val amqc: ActiveMQComponent = ActiveMQComponent.activeMQComponent(env.getString("jms.url"))
- amqc.setUsePooledConnection(true)
- amqc.setAsyncConsumer(true)
- amqc.setTrustAllPackages(true)
- amqc.setConcurrentConsumers(1)
- camelExtension.context.addComponent("jms", amqc);
- val jmsProducer: ActorRef = system.actorOf(Props[ActiveMQProducerActor])
- //Is this the correct way to pass the materializer?
- val jmsConsumer: ActorRef = system.actorOf(Props(new ActiveMQConsumerActor()(materializer)), name = "jmsConsumer")
- val endpoint: ReportEndpoint = new ReportEndpoint(jmsProducer);
- Http().bindAndHandle(endpoint.routes, "localhost", 8881)
- }
- class ReportEndpoint(jmsProducer: ActorRef)
- (implicit val system:ActorSystem,
- implicit val executor: ExecutionContext,
- implicit val materializer : ActorMaterializer)
- extends JsonSupport with Mongo {
- val routes =
- pathPrefix("reports"){
- post {
- path("generate"){
- entity(as[DataRequest]) { request =>
- val id = java.util.UUID.randomUUID.toString
- // **Enqueue the request into ActiveMq**
- jmsProducer ! request
- val future: Future[Seq[Completed]] = insertReport(request)
- complete {
- future.map[ToResponseMarshallable](r => r.head match {
- case r : Completed => println(r); s"Reporte Generado con id $id"
- case _ => HttpResponse(StatusCodes.InternalServerError, entity = "Error al generar reporte")
- })
- }
- }
- }
- } ....
- //Is this the correct way to pass the materializer??
- class ActiveMQConsumerActor (implicit materializer : ActorMaterializer) extends Consumer with Base {
- override def endpointUri: String = env.getString("jms.queue")
- val log = Logging(context.system, this)
- val reportActor: ActorRef = context.actorOf(Props(new ReportBuilderActor()(materializer)), name = "reportActor")
- override def receive: Receive = {
- case msg: CamelMessage => msg.body match {
- case data: DataRequest => {
- //I need only one task running
- Source.single(data).buffer(1, OverflowStrategy.backpressure).to(Sink.foreach(d => reportActor ! d)).run()
- }
- case _ => log.info("Invalid")
- }
- case _ => UnhandledMessage
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement