Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- brew install rabbitmq
- rabbitmqctl add_user abhi abhi
- rabbitmqctl set_user_tags abhi administrator
- rabbitmqctl add_vhost myvhost
- rabbitmqctl set_permissions -p myvhost abhi ".*" ".*" ".*"
- rabbitmqadmin declare exchange --vhost=myvhost name=exchange1 type=direct --user=abhi --password=abhi
- rabbitmqadmin -u abhi -p abhi -V myvhost declare queue name=myqueue
- rabbitmqadmin -u abhi -p abhi -V myvhost declare binding source=exchange1 destination=myqueue routing_key=foo.bar durable=true
- CONFIG_FILE=/usr/local/etc/rabbitmq/rabbitmq
- NODE_IP_ADDRESS=10.zz.yy.xxx
- NODENAME=rabbit@mydevbox
- implicit val actorSystem = ActorSystem()
- implicit val actorMaterializer = ActorMaterializer()
- val queueName = "myqueue"
- val queueDeclaration = QueueDeclaration(queueName, durable = true)
- val uri = "amqp://abhi:abhi@mydevbox:5672/myvhost"
- val settings = AmqpSinkSettings(AmqpConnectionUri(uri)).withRoutingKey("foo.bar").withDeclarations(queueDeclaration)
- val amqpSink = AmqpSink.simple(settings)
- val resource = getClass.getResource("/countrycapital.csv")
- val path = Paths.get(resource.toURI)
- val source = FileTailSource.lines(path, 8092, 100 millis).map{x => println(x); x}.map(ByteString(_))
- val graph = RunnableGraph.fromGraph(GraphDSL.create(amqpSink){implicit builder =>
- s =>
- import GraphDSL.Implicits._
- source ~> s.in
- ClosedShape
- })
- val future = graph.run()
- future.onComplete{_ => actorSystem.terminate()}
- Await.result(actorSystem.whenTerminated, Duration.Inf)
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement