SHARE
TWEET

Untitled

a guest Oct 21st, 2019 68 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. fn early_return_fn_1(args: Args_0) -> impl Stream<Item = WhateverType, Error = ()> {
  2.     //want to make this not early-return anymore with `my_stream_fn`
  3.     let (tx, rx) = unbounded_channel();
  4.  
  5.     for _ in 0..10 {
  6.         spawn(early_return_fn_2(other_args, Some(tx.clone())));
  7.     }
  8.  
  9.     my_stream_fn(some_other_data, rx)
  10. }
  11.  
  12. fn early_return_fn_2(
  13.     args: Args_1,
  14.     braodcaster: Option<UnboundedSender<ZST>>,
  15. ) -> impl Future<Item = (), Error = ()> {
  16.     CACHE.future_lock(move |mut lock| {
  17.         match lock.get_mut(&args.metadata) {
  18.             Some(tx) => tx.try_send(args.message).map_err(|e| error!("{:?}", e)),
  19.             None => {
  20.                 let (crossbeam_tx, crossbeam_rx) = unbounded();
  21.  
  22.                 for _ in 0..5 {
  23.                     let parallel_braodcaster = braodcaster.clone();
  24.                     let cross_rx = crossbeam_rx.clone(); //we have another channel to send work to a crude threadpool
  25.                     spawn(
  26.                         poll_fn(move || match cross_rx.try_recv() {
  27.                             Ok(i) => Async::Ready(Some(i)),
  28.                             Err(TryRecvError::Empty) => Ok(Async::NotReady),
  29.                             _ => Err(()),
  30.                         })
  31.                         .fold(None, move |mut opt, msg| {
  32.                             let mut future : Box<
  33.                                 dyn Future<Item = SomeType, Error = ()>
  34.                                     + Send,
  35.                             > = /* stuff that does some work */();
  36.  
  37.                             let mut local_broadcaster = parallel_braodcaster.clone(); //subscriber
  38.  
  39.                             future.then(move |result| {
  40.                                 let (keep, output) =
  41.                                     match res { /* split result again in 2 options */ };
  42.  
  43.                                 GLOBAL_REGISTRY
  44.                                     .update(args.op_id, args.metadata, output)
  45.                                     //now I added this because I want to get updated after `update` is done
  46.                                     .and_then(move |_| {
  47.                                         if let Some(ref mut tx) = &mut local_broadcaster {
  48.                                             let _ = tx.try_send(ZST); //send that the work has been done
  49.                                         }
  50.  
  51.                                         Ok(())
  52.                                     })
  53.                                     .map(move |_| keep)
  54.                             })
  55.                         })
  56.                         .map(|_| ()),
  57.                     );
  58.                 }
  59.  
  60.                 let r = tx.try_send(args.msg).map_err(|_| ());
  61.                 lock.put(args.metadata, tx);
  62.                 r
  63.             }
  64.         }
  65.     })
  66. }
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top