Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- extern crate yaml_rust;
- use std::fs::File;
- use std::io::prelude::*;
- // mod utils;
- // use utils::arg_parse::CmdArgs;
- use structopt::StructOpt;
- use yaml_rust::{YamlLoader};
- use structopt::StructOpt;
- use std::path::PathBuf;
- // This part to ampq
- use failure::Error;
- use futures::{future, Future, Stream};
- use lapin_futures as lapin;
- use crate::lapin::{BasicProperties, Client, ConnectionProperties};
- use crate::lapin::options::{BasicConsumeOptions, QueueDeclareOptions, ExchangeDeclareOptions};
- use crate::lapin::types::FieldTable;
- use log::{info, trace, warn, debug};
- use tokio;
- use tokio::runtime::Runtime;
- #[derive(StructOpt)]
- pub struct CmdArgs {
- #[structopt(
- short,
- long
- )]
- pub debug: bool,
- /// Config config file
- #[structopt(
- short,
- long,
- env = "PATH_CONFIG_RUST",
- parse(from_os_str)
- )]
- pub config: PathBuf,
- }
- fn main() {
- let args = CmdArgs::from_args();
- println!("{:?}", args.config);
- let mut f = File::open(&args.config).expect("Error while opening file");
- let mut contents = String::new();
- f.read_to_string(&mut contents).expect("Something went wrong while reading the file");
- static docs: Vec<yaml_rust::yaml::Yaml> = YamlLoader::load_from_str(contents).unwrap();
- let doc = &docs[0].as_hash().unwrap();
- for (key, value) in doc.into_iter() {
- if key.as_str().unwrap() == "rabbitmq-server" {
- Runtime::new().unwrap().block_on_all(
- Client::connect(&value.as_str().unwrap(), ConnectionProperties::default()).map_err(Error::from).and_then(|client| {
- // create_channel returns a future that is resolved
- // once the channel is successfully created
- client.create_channel().map_err(Error::from)
- }).and_then(move |channel| {
- let id = channel.id();
- info!("Connected to {}, channel_id: {} ...", &value.as_str().unwrap(), &id);
- let ch = channel.clone();
- channel.exchange_declare("services", "topic", ExchangeDeclareOptions::default(), FieldTable::default());
- channel.queue_declare("scoring", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |queue| {
- info!("Declared queue {} ...", id);
- // basic_consume returns a future of a message
- // stream. Any time a message arrives for this consumer,
- // the for_each method would be called
- channel.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default())
- }).and_then(|stream| {
- info!("got consumer stream");
- stream.for_each(move |message| {
- debug!("got message: {:?}", message);
- info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
- ch.basic_ack(message.delivery_tag, false)
- })
- }).map_err(Error::from)
- })
- ).expect("runtime failure");
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement