Advertisement
Guest User

Untitled

a guest
Sep 19th, 2019
153
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.78 KB | None | 0 0
  1. extern crate yaml_rust;
  2.  
  3. use std::fs::File;
  4. use std::io::prelude::*;
  5.  
  6. // mod utils;
  7. // use utils::arg_parse::CmdArgs;
  8.  
  9. use structopt::StructOpt;
  10. use yaml_rust::{YamlLoader};
  11. use structopt::StructOpt;
  12. use std::path::PathBuf;
  13.  
  14. // This part to ampq
  15.  
  16. use failure::Error;
  17. use futures::{future, Future, Stream};
  18. use lapin_futures as lapin;
  19. use crate::lapin::{BasicProperties, Client, ConnectionProperties};
  20. use crate::lapin::options::{BasicConsumeOptions, QueueDeclareOptions, ExchangeDeclareOptions};
  21. use crate::lapin::types::FieldTable;
  22. use log::{info, trace, warn, debug};
  23. use tokio;
  24. use tokio::runtime::Runtime;
  25.  
  26.  
  27. #[derive(StructOpt)]
  28. pub struct CmdArgs {
  29.  
  30. #[structopt(
  31. short,
  32. long
  33. )]
  34. pub debug: bool,
  35. /// Config config file
  36. #[structopt(
  37. short,
  38. long,
  39. env = "PATH_CONFIG_RUST",
  40. parse(from_os_str)
  41. )]
  42. pub config: PathBuf,
  43. }
  44.  
  45.  
  46.  
  47.  
  48. fn main() {
  49. let args = CmdArgs::from_args();
  50. println!("{:?}", args.config);
  51. let mut f = File::open(&args.config).expect("Error while opening file");
  52. let mut contents = String::new();
  53. f.read_to_string(&mut contents).expect("Something went wrong while reading the file");
  54.  
  55. static docs: Vec<yaml_rust::yaml::Yaml> = YamlLoader::load_from_str(contents).unwrap();
  56. let doc = &docs[0].as_hash().unwrap();
  57.  
  58. for (key, value) in doc.into_iter() {
  59. if key.as_str().unwrap() == "rabbitmq-server" {
  60. Runtime::new().unwrap().block_on_all(
  61. Client::connect(&value.as_str().unwrap(), ConnectionProperties::default()).map_err(Error::from).and_then(|client| {
  62. // create_channel returns a future that is resolved
  63. // once the channel is successfully created
  64. client.create_channel().map_err(Error::from)
  65. }).and_then(move |channel| {
  66. let id = channel.id();
  67. info!("Connected to {}, channel_id: {} ...", &value.as_str().unwrap(), &id);
  68.  
  69. let ch = channel.clone();
  70. channel.exchange_declare("services", "topic", ExchangeDeclareOptions::default(), FieldTable::default());
  71. channel.queue_declare("scoring", QueueDeclareOptions::default(), FieldTable::default()).and_then(move |queue| {
  72. info!("Declared queue {} ...", id);
  73.  
  74. // basic_consume returns a future of a message
  75. // stream. Any time a message arrives for this consumer,
  76. // the for_each method would be called
  77. channel.basic_consume(&queue, "my_consumer", BasicConsumeOptions::default(), FieldTable::default())
  78. }).and_then(|stream| {
  79. info!("got consumer stream");
  80.  
  81. stream.for_each(move |message| {
  82. debug!("got message: {:?}", message);
  83. info!("decoded message: {:?}", std::str::from_utf8(&message.data).unwrap());
  84. ch.basic_ack(message.delivery_tag, false)
  85. })
  86. }).map_err(Error::from)
  87. })
  88. ).expect("runtime failure");
  89.  
  90. }
  91. }
  92. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement