Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- fn early_return_fn_1(args: Args_0) -> impl Stream<Item = WhateverType, Error = ()> {
- //want to make this not early-return anymore with `my_stream_fn`
- let (tx, rx) = unbounded_channel();
- for _ in 0..10 {
- spawn(early_return_fn_2(other_args, Some(tx.clone())));
- }
- my_stream_fn(some_other_data, rx)
- }
- fn early_return_fn_2(
- args: Args_1,
- braodcaster: Option<UnboundedSender<ZST>>,
- ) -> impl Future<Item = (), Error = ()> {
- CACHE.future_lock(move |mut lock| {
- match lock.get_mut(&args.metadata) {
- Some(tx) => tx.try_send(args.message).map_err(|e| error!("{:?}", e)),
- None => {
- let (crossbeam_tx, crossbeam_rx) = unbounded();
- for _ in 0..5 {
- let parallel_braodcaster = braodcaster.clone();
- let cross_rx = crossbeam_rx.clone(); //we have another channel to send work to a crude threadpool
- spawn(
- poll_fn(move || match cross_rx.try_recv() {
- Ok(i) => Async::Ready(Some(i)),
- Err(TryRecvError::Empty) => Ok(Async::NotReady),
- _ => Err(()),
- })
- .fold(None, move |mut opt, msg| {
- let mut future : Box<
- dyn Future<Item = SomeType, Error = ()>
- + Send,
- > = /* stuff that does some work */();
- let mut local_broadcaster = parallel_braodcaster.clone(); //subscriber
- future.then(move |result| {
- let (keep, output) =
- match res { /* split result again in 2 options */ };
- GLOBAL_REGISTRY
- .update(args.op_id, args.metadata, output)
- //now I added this because I want to get updated after `update` is done
- .and_then(move |_| {
- if let Some(ref mut tx) = &mut local_broadcaster {
- let _ = tx.try_send(ZST); //send that the work has been done
- }
- Ok(())
- })
- .map(move |_| keep)
- })
- })
- .map(|_| ()),
- );
- }
- let r = tx.try_send(args.msg).map_err(|_| ());
- lock.put(args.metadata, tx);
- r
- }
- }
- })
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement