NLinker

Simple async service

Feb 2nd, 2020
624
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. // https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=fef597285e9b04c52512daf255ec988c
  2.  
  3. use num_cpus::get;
  4. use std::fmt::Display;
  5. use std::net::{TcpStream, ToSocketAddrs};
  6. use std::sync::mpsc::{Receiver, SyncSender};
  7. use std::sync::{mpsc, Arc};
  8. use std::thread::spawn;
  9. use tokio; // 0.2.9
  10. use tokio::runtime::Builder;
  11. use tokio::sync::Semaphore;
  12.  
  13. #[derive(Debug, Clone)]
  14. struct Response {
  15.     result: String,
  16.     hostname: String,
  17.     process_time: String,
  18.     status: bool,
  19. }
  20. async fn process_host<A>(
  21.     hostname: A,
  22.     tx: SyncSender<Response>,
  23.     connection_pool: Arc<Semaphore>,
  24. ) -> Response
  25. where
  26.     A: ToSocketAddrs + Display,
  27. {
  28.     let guard = connection_pool.acquire().await;
  29.     let res = Response {
  30.         result: "none".to_string(),
  31.         hostname: hostname.to_string(),
  32.         process_time: "sometime".to_string(),
  33.         status: false,
  34.     };
  35.     let tcp = match TcpStream::connect(&hostname) {
  36.         Ok(a) => a,
  37.         Err(e) => {
  38.             let res = Response {
  39.                 result: e.to_string(),
  40.                 hostname: hostname.to_string(),
  41.                 process_time: "sometime".to_string(),
  42.                 status: false,
  43.             };
  44.             tx.send(res.clone()).unwrap();
  45.             return res;
  46.         }
  47.     };
  48.     // chain of async calls.
  49.     res
  50. }
  51.  
  52. fn main() {
  53.     let hosts = vec!["1.1.1.1:22", "8.8.8.8:22" , "8.8.4.4:22"];
  54.     let num_of_threads = Arc::new(Semaphore::new(10));
  55.     let mut reactor = Builder::new()
  56.         .enable_all()
  57.         .threaded_scheduler()
  58.         .core_threads(get())
  59.         .build()
  60.         .unwrap();
  61.     let (tx, rx): (SyncSender<Response>, Receiver<Response>) = mpsc::sync_channel(0);
  62.     let queue_len =100;
  63.     spawn(move || incremental_save(rx,  queue_len));
  64.  
  65.     let tasks: Vec<_> = hosts.into_iter().map(|host| {
  66.         reactor.spawn(process_host(
  67.             host,
  68.             tx.clone(),
  69.             num_of_threads.clone(),
  70.         ))
  71.     }).collect();
  72.    
  73.     reactor.block_on(futures::future::join_all(tasks));
  74. }
  75. fn incremental_save(rx: Receiver<Response>, queue_len: u64) {
  76.     for _ in 0..=queue_len {
  77.         let received = match rx.recv() {
  78.             Ok(a) => a,
  79.             Err(e) => {
  80.                 eprintln!("incremental_save: {}", e);
  81.                 break;
  82.             }
  83.         };
  84.         println! {"{:?}", received};
  85.     }
  86. }
RAW Paste Data

Adblocker detected! Please consider disabling it...

We've detected AdBlock Plus or some other adblocking software preventing Pastebin.com from fully loading.

We don't have any obnoxious sound, or popup ads, we actively block these annoying types of ads!

Please add Pastebin.com to your ad blocker whitelist or disable your adblocking software.

×