Advertisement
Guest User

Untitled

a guest
Oct 21st, 2019
80
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.72 KB | None | 0 0
  1. fn early_return_fn_1(args: Args_0) -> impl Stream<Item = WhateverType, Error = ()> {
  2. //want to make this not early-return anymore with `my_stream_fn`
  3. let (tx, rx) = unbounded_channel();
  4.  
  5. for _ in 0..10 {
  6. spawn(early_return_fn_2(other_args, Some(tx.clone())));
  7. }
  8.  
  9. my_stream_fn(some_other_data, rx)
  10. }
  11.  
  12. fn early_return_fn_2(
  13. args: Args_1,
  14. braodcaster: Option<UnboundedSender<ZST>>,
  15. ) -> impl Future<Item = (), Error = ()> {
  16. CACHE.future_lock(move |mut lock| {
  17. match lock.get_mut(&args.metadata) {
  18. Some(tx) => tx.try_send(args.message).map_err(|e| error!("{:?}", e)),
  19. None => {
  20. let (crossbeam_tx, crossbeam_rx) = unbounded();
  21.  
  22. for _ in 0..5 {
  23. let parallel_braodcaster = braodcaster.clone();
  24. let cross_rx = crossbeam_rx.clone(); //we have another channel to send work to a crude threadpool
  25. spawn(
  26. poll_fn(move || match cross_rx.try_recv() {
  27. Ok(i) => Async::Ready(Some(i)),
  28. Err(TryRecvError::Empty) => Ok(Async::NotReady),
  29. _ => Err(()),
  30. })
  31. .fold(None, move |mut opt, msg| {
  32. let mut future : Box<
  33. dyn Future<Item = SomeType, Error = ()>
  34. + Send,
  35. > = /* stuff that does some work */();
  36.  
  37. let mut local_broadcaster = parallel_braodcaster.clone(); //subscriber
  38.  
  39. future.then(move |result| {
  40. let (keep, output) =
  41. match res { /* split result again in 2 options */ };
  42.  
  43. GLOBAL_REGISTRY
  44. .update(args.op_id, args.metadata, output)
  45. //now I added this because I want to get updated after `update` is done
  46. .and_then(move |_| {
  47. if let Some(ref mut tx) = &mut local_broadcaster {
  48. let _ = tx.try_send(ZST); //send that the work has been done
  49. }
  50.  
  51. Ok(())
  52. })
  53. .map(move |_| keep)
  54. })
  55. })
  56. .map(|_| ()),
  57. );
  58. }
  59.  
  60. let r = tx.try_send(args.msg).map_err(|_| ());
  61. lock.put(args.metadata, tx);
  62. r
  63. }
  64. }
  65. })
  66. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement