NLinker

Lapin 0.32 example

Feb 27th, 2020
368
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Rust 2.23 KB | None | 0 0
  1. use lapin::{Channel, ConsumerDelegate, Connection, ConnectionProperties};
  2. use lapin::message::DeliveryResult;
  3. use lapin::options::{BasicAckOptions, QueueDeclareOptions, BasicConsumeOptions, BasicPublishOptions};
  4. use lapin::types::FieldTable;
  5. use lapin::protocol::BasicProperties;
  6.  
  7. #[derive(Clone, Debug)]
  8. struct Subscriber {
  9.     channel: Channel,
  10. }
  11.  
  12. impl ConsumerDelegate for Subscriber {
  13.     fn on_new_delivery(&self, delivery: DeliveryResult) {
  14.         if let Some(delivery) = delivery.unwrap() {
  15.             self.channel
  16.                 .basic_ack(delivery.delivery_tag, BasicAckOptions::default())
  17.                 .wait()
  18.                 .expect("basic_ack");
  19.         }
  20.     }
  21. }
  22.  
  23. fn my_test() {
  24.     let addr =
  25.         std::env::var("AMQP_ADDR").unwrap_or_else(|_| "amqp://127.0.0.1:5672/%2f".into());
  26.     let conn = Connection::connect(&addr, ConnectionProperties::default())
  27.         .wait()
  28.         .expect("connection error");
  29.  
  30.     println!("CONNECTED");
  31.  
  32.     let channel_a = conn.create_channel().wait().expect("create_channel");
  33.     let channel_b = conn.create_channel().wait().expect("create_channel");
  34.  
  35.     channel_a
  36.         .queue_declare(
  37.             "hello",
  38.             QueueDeclareOptions::default(),
  39.             FieldTable::default(),
  40.         )
  41.         .wait()
  42.         .expect("queue_declare");
  43.     let _queue = channel_b
  44.         .queue_declare(
  45.             "hello",
  46.             QueueDeclareOptions::default(),
  47.             FieldTable::default(),
  48.         )
  49.         .wait()
  50.         .expect("queue_declare");
  51.     println!("will consume");
  52.     channel_b
  53.         .clone()
  54.         .basic_consume(
  55.             "hello",
  56.             "my_consumer",
  57.             BasicConsumeOptions::default(),
  58.             FieldTable::default(),
  59.         )
  60.         .wait()
  61.         .expect("basic_consume")
  62.         .set_delegate(Box::new(Subscriber { channel: channel_b }));
  63.  
  64.     let payload = b"Hello world!";
  65.  
  66.     loop {
  67.         channel_a
  68.             .basic_publish(
  69.                 "",
  70.                 "hello",
  71.                 BasicPublishOptions::default(),
  72.                 payload.to_vec(),
  73.                 BasicProperties::default(),
  74.             )
  75.             .wait()
  76.             .expect("basic_publish");
  77.     }
  78. }
Add Comment
Please, Sign In to add comment