Advertisement
Pro_Unit

Observer.cs

Dec 25th, 2023
849
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
C# 15.91 KB | None | 0 0
  1. using System;
  2. using System.Threading;
  3.  
  4. using UniRx.InternalUtil;
  5. using UniRx.Operators;
  6.  
  7. namespace UniRx
  8. {
  9.     public static class Observer
  10.     {
  11.         internal static IObserver<T> CreateSubscribeObserver<T>
  12.             (Action<T> onNext, Action<Exception> onError, Action onCompleted)
  13.         {
  14.             // need compare for avoid iOS AOT
  15.             if (onNext == Stubs<T>.Ignore)
  16.                 return new Subscribe_<T>(onError, onCompleted);
  17.             return new Subscribe<T>(onNext, onError, onCompleted);
  18.         }
  19.  
  20.         internal static IObserver<T> CreateSubscribeWithStateObserver<T, TState>
  21.             (TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted) =>
  22.             new Subscribe<T, TState>(state, onNext, onError, onCompleted);
  23.  
  24.         internal static IObserver<T> CreateSubscribeWithState2Observer<T, TState1, TState2>
  25.         (
  26.             TState1 state1,
  27.             TState2 state2,
  28.             Action<T, TState1, TState2> onNext,
  29.             Action<Exception, TState1, TState2> onError,
  30.             Action<TState1, TState2> onCompleted
  31.         ) =>
  32.             new Subscribe<T, TState1, TState2>(state1, state2, onNext, onError, onCompleted);
  33.  
  34.         internal static IObserver<T> CreateSubscribeWithState3Observer<T, TState1, TState2, TState3>
  35.         (
  36.             TState1 state1,
  37.             TState2 state2,
  38.             TState3 state3,
  39.             Action<T, TState1, TState2, TState3> onNext,
  40.             Action<Exception, TState1, TState2, TState3> onError,
  41.             Action<TState1, TState2, TState3> onCompleted
  42.         ) =>
  43.             new Subscribe<T, TState1, TState2, TState3>(state1, state2, state3, onNext, onError, onCompleted);
  44.  
  45.         public static IObserver<T> Create<T>(Action<T> onNext) =>
  46.             Create(onNext, Stubs.Throw, Stubs.Nop);
  47.  
  48.         public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError) =>
  49.             Create(onNext, onError, Stubs.Nop);
  50.  
  51.         public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted) =>
  52.             Create(onNext, Stubs.Throw, onCompleted);
  53.  
  54.         public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  55.         {
  56.             // need compare for avoid iOS AOT
  57.             if (onNext == Stubs<T>.Ignore)
  58.                 return new EmptyOnNextAnonymousObserver<T>(onError, onCompleted);
  59.             return new AnonymousObserver<T>(onNext, onError, onCompleted);
  60.         }
  61.  
  62.         public static IObserver<T> CreateAutoDetachObserver<T>(IObserver<T> observer, IDisposable disposable) =>
  63.             new AutoDetachObserver<T>(observer, disposable);
  64.  
  65.         private class AnonymousObserver<T> : IObserver<T>
  66.         {
  67.             private readonly Action onCompleted;
  68.             private readonly Action<Exception> onError;
  69.             private readonly Action<T> onNext;
  70.  
  71.             private int isStopped;
  72.  
  73.             public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  74.             {
  75.                 this.onNext = onNext;
  76.                 this.onError = onError;
  77.                 this.onCompleted = onCompleted;
  78.             }
  79.  
  80.             public void OnNext(T value)
  81.             {
  82.                 if (isStopped == 0)
  83.                     onNext(value);
  84.             }
  85.  
  86.             public void OnError(Exception error)
  87.             {
  88.                 if (Interlocked.Increment(ref isStopped) == 1)
  89.                     onError(error);
  90.             }
  91.  
  92.             public void OnCompleted()
  93.             {
  94.                 if (Interlocked.Increment(ref isStopped) == 1)
  95.                     onCompleted();
  96.             }
  97.         }
  98.  
  99.         private class EmptyOnNextAnonymousObserver<T> : IObserver<T>
  100.         {
  101.             private readonly Action onCompleted;
  102.             private readonly Action<Exception> onError;
  103.  
  104.             private int isStopped;
  105.  
  106.             public EmptyOnNextAnonymousObserver(Action<Exception> onError, Action onCompleted)
  107.             {
  108.                 this.onError = onError;
  109.                 this.onCompleted = onCompleted;
  110.             }
  111.  
  112.             public void OnNext(T value) { }
  113.  
  114.             public void OnError(Exception error)
  115.             {
  116.                 if (Interlocked.Increment(ref isStopped) == 1)
  117.                     onError(error);
  118.             }
  119.  
  120.             public void OnCompleted()
  121.             {
  122.                 if (Interlocked.Increment(ref isStopped) == 1)
  123.                     onCompleted();
  124.             }
  125.         }
  126.  
  127.         // same as AnonymousObserver...
  128.         private class Subscribe<T> : IObserver<T>
  129.         {
  130.             private readonly Action onCompleted;
  131.             private readonly Action<Exception> onError;
  132.             private readonly Action<T> onNext;
  133.  
  134.             private int isStopped;
  135.  
  136.             public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted)
  137.             {
  138.                 this.onNext = onNext;
  139.                 this.onError = onError;
  140.                 this.onCompleted = onCompleted;
  141.             }
  142.  
  143.             public void OnNext(T value)
  144.             {
  145.                 if (isStopped == 0)
  146.                     onNext(value);
  147.             }
  148.  
  149.             public void OnError(Exception error)
  150.             {
  151.                 if (Interlocked.Increment(ref isStopped) == 1)
  152.                     onError(error);
  153.             }
  154.  
  155.             public void OnCompleted()
  156.             {
  157.                 if (Interlocked.Increment(ref isStopped) == 1)
  158.                     onCompleted();
  159.             }
  160.         }
  161.  
  162.         // same as EmptyOnNextAnonymousObserver...
  163.         private class Subscribe_<T> : IObserver<T>
  164.         {
  165.             private readonly Action onCompleted;
  166.             private readonly Action<Exception> onError;
  167.  
  168.             private int isStopped;
  169.  
  170.             public Subscribe_(Action<Exception> onError, Action onCompleted)
  171.             {
  172.                 this.onError = onError;
  173.                 this.onCompleted = onCompleted;
  174.             }
  175.  
  176.             public void OnNext(T value) { }
  177.  
  178.             public void OnError(Exception error)
  179.             {
  180.                 if (Interlocked.Increment(ref isStopped) == 1)
  181.                     onError(error);
  182.             }
  183.  
  184.             public void OnCompleted()
  185.             {
  186.                 if (Interlocked.Increment(ref isStopped) == 1)
  187.                     onCompleted();
  188.             }
  189.         }
  190.  
  191.         // with state
  192.         private class Subscribe<T, TState> : IObserver<T>
  193.         {
  194.             private readonly Action<TState> onCompleted;
  195.             private readonly Action<Exception, TState> onError;
  196.             private readonly Action<T, TState> onNext;
  197.             private readonly TState state;
  198.  
  199.             private int isStopped;
  200.  
  201.             public Subscribe
  202.                 (TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
  203.             {
  204.                 this.state = state;
  205.                 this.onNext = onNext;
  206.                 this.onError = onError;
  207.                 this.onCompleted = onCompleted;
  208.             }
  209.  
  210.             public void OnNext(T value)
  211.             {
  212.                 if (isStopped == 0)
  213.                     onNext(value, state);
  214.             }
  215.  
  216.             public void OnError(Exception error)
  217.             {
  218.                 if (Interlocked.Increment(ref isStopped) == 1)
  219.                     onError(error, state);
  220.             }
  221.  
  222.             public void OnCompleted()
  223.             {
  224.                 if (Interlocked.Increment(ref isStopped) == 1)
  225.                     onCompleted(state);
  226.             }
  227.         }
  228.  
  229.         private class Subscribe<T, TState1, TState2> : IObserver<T>
  230.         {
  231.             private readonly Action<TState1, TState2> onCompleted;
  232.             private readonly Action<Exception, TState1, TState2> onError;
  233.             private readonly Action<T, TState1, TState2> onNext;
  234.             private readonly TState1 state1;
  235.             private readonly TState2 state2;
  236.  
  237.             private int isStopped;
  238.  
  239.             public Subscribe
  240.             (
  241.                 TState1 state1,
  242.                 TState2 state2,
  243.                 Action<T, TState1, TState2> onNext,
  244.                 Action<Exception, TState1, TState2> onError,
  245.                 Action<TState1, TState2> onCompleted
  246.             )
  247.             {
  248.                 this.state1 = state1;
  249.                 this.state2 = state2;
  250.                 this.onNext = onNext;
  251.                 this.onError = onError;
  252.                 this.onCompleted = onCompleted;
  253.             }
  254.  
  255.             public void OnNext(T value)
  256.             {
  257.                 if (isStopped == 0)
  258.                     onNext(value, state1, state2);
  259.             }
  260.  
  261.             public void OnError(Exception error)
  262.             {
  263.                 if (Interlocked.Increment(ref isStopped) == 1)
  264.                     onError(error, state1, state2);
  265.             }
  266.  
  267.             public void OnCompleted()
  268.             {
  269.                 if (Interlocked.Increment(ref isStopped) == 1)
  270.                     onCompleted(state1, state2);
  271.             }
  272.         }
  273.  
  274.         private class Subscribe<T, TState1, TState2, TState3> : IObserver<T>
  275.         {
  276.             private readonly Action<TState1, TState2, TState3> onCompleted;
  277.             private readonly Action<Exception, TState1, TState2, TState3> onError;
  278.             private readonly Action<T, TState1, TState2, TState3> onNext;
  279.             private readonly TState1 state1;
  280.             private readonly TState2 state2;
  281.             private readonly TState3 state3;
  282.  
  283.             private int isStopped;
  284.  
  285.             public Subscribe
  286.             (
  287.                 TState1 state1,
  288.                 TState2 state2,
  289.                 TState3 state3,
  290.                 Action<T, TState1, TState2, TState3> onNext,
  291.                 Action<Exception, TState1, TState2, TState3> onError,
  292.                 Action<TState1, TState2, TState3> onCompleted
  293.             )
  294.             {
  295.                 this.state1 = state1;
  296.                 this.state2 = state2;
  297.                 this.state3 = state3;
  298.                 this.onNext = onNext;
  299.                 this.onError = onError;
  300.                 this.onCompleted = onCompleted;
  301.             }
  302.  
  303.             public void OnNext(T value)
  304.             {
  305.                 if (isStopped == 0)
  306.                     onNext(value, state1, state2, state3);
  307.             }
  308.  
  309.             public void OnError(Exception error)
  310.             {
  311.                 if (Interlocked.Increment(ref isStopped) == 1)
  312.                     onError(error, state1, state2, state3);
  313.             }
  314.  
  315.             public void OnCompleted()
  316.             {
  317.                 if (Interlocked.Increment(ref isStopped) == 1)
  318.                     onCompleted(state1, state2, state3);
  319.             }
  320.         }
  321.  
  322.         private class AutoDetachObserver<T> : OperatorObserverBase<T, T>
  323.         {
  324.             public AutoDetachObserver(IObserver<T> observer, IDisposable cancel) : base(observer, cancel) { }
  325.  
  326.             public override void OnNext(T value)
  327.             {
  328.                 try
  329.                 {
  330.                     observer.OnNext(value);
  331.                 }
  332.                 catch
  333.                 {
  334.                     Dispose();
  335.                     throw;
  336.                 }
  337.             }
  338.  
  339.             public override void OnError(Exception error)
  340.             {
  341.                 try
  342.                 {
  343.                     observer.OnError(error);
  344.                 }
  345.                 finally
  346.                 {
  347.                     Dispose();
  348.                 }
  349.             }
  350.  
  351.             public override void OnCompleted()
  352.             {
  353.                 try
  354.                 {
  355.                     observer.OnCompleted();
  356.                 }
  357.                 finally
  358.                 {
  359.                     Dispose();
  360.                 }
  361.             }
  362.         }
  363.     }
  364.  
  365.     public static class ObserverExtensions
  366.     {
  367.         public static IObserver<T> Synchronize<T>(this IObserver<T> observer) =>
  368.             new SynchronizedObserver<T>(observer, new object());
  369.  
  370.         public static IObserver<T> Synchronize<T>(this IObserver<T> observer, object gate) =>
  371.             new SynchronizedObserver<T>(observer, gate);
  372.     }
  373.  
  374.     public static class ObservableExtensions
  375.     {
  376.         public static IDisposable Subscribe<T>(this IObservable<T> source) =>
  377.             source.Subscribe(ThrowObserver<T>.Instance);
  378.  
  379.         public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext) =>
  380.             source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));
  381.  
  382.         public static IDisposable Subscribe<T>(this IObservable<T> source, Action onNext) =>
  383.             source.Subscribe(_ => onNext?.Invoke());
  384.  
  385.         public static IDisposable Subscribe<T>
  386.             (this IObservable<T> source, Action<T> onNext, Action<Exception> onError) =>
  387.             source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, Stubs.Nop));
  388.  
  389.         public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted) =>
  390.             source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted));
  391.  
  392.         public static IDisposable Subscribe<T>
  393.             (this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted) =>
  394.             source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, onCompleted));
  395.  
  396.         public static IDisposable SubscribeWithState<T, TState>
  397.             (this IObservable<T> source, TState state, Action<T, TState> onNext) =>
  398.             source.Subscribe(
  399.                 Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, Stubs<TState>.Ignore));
  400.  
  401.         public static IDisposable SubscribeWithState<T, TState>
  402.             (this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError) =>
  403.             source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, Stubs<TState>.Ignore));
  404.  
  405.         public static IDisposable SubscribeWithState<T, TState>
  406.             (this IObservable<T> source, TState state, Action<T, TState> onNext, Action<TState> onCompleted) =>
  407.             source.Subscribe(
  408.                 Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, onCompleted));
  409.  
  410.         public static IDisposable SubscribeWithState<T, TState>
  411.         (
  412.             this IObservable<T> source,
  413.             TState state,
  414.             Action<T, TState> onNext,
  415.             Action<Exception, TState> onError,
  416.             Action<TState> onCompleted
  417.         ) =>
  418.             source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, onCompleted));
  419.  
  420.         public static IDisposable SubscribeWithState2<T, TState1, TState2>
  421.             (this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext) =>
  422.             source.Subscribe(
  423.                 Observer.CreateSubscribeWithState2Observer(
  424.                     state1,
  425.                     state2,
  426.                     onNext,
  427.                     Stubs<TState1, TState2>.Throw,
  428.                     Stubs<TState1, TState2>.Ignore));
  429.  
  430.         public static IDisposable SubscribeWithState2<T, TState1, TState2>
  431.         (
  432.             this IObservable<T> source,
  433.             TState1 state1,
  434.             TState2 state2,
  435.             Action<T, TState1, TState2> onNext,
  436.             Action<Exception, TState1, TState2> onError
  437.         ) =>
  438.             source.Subscribe(
  439.                 Observer.CreateSubscribeWithState2Observer(
  440.                     state1,
  441.                     state2,
  442.                     onNext,
  443.                     onError,
  444.                     Stubs<TState1, TState2>.Ignore));
  445.  
  446.         public static IDisposable SubscribeWithState2<T, TState1, TState2>
  447.         (
  448.             this IObservable<T> source,
  449.             TState1 state1,
  450.             TState2 state2,
  451.             Action<T, TState1, TState2> onNext,
  452.             Action<TState1, TState2> onCompleted
  453.         ) =>
  454.             source.Subscribe(
  455.                 Observer.CreateSubscribeWithState2Observer(
  456.                     state1,
  457.                     state2,
  458.                     onNext,
  459.                     Stubs<TState1, TState2>.Throw,
  460.                     onCompleted));
  461.  
  462.         public static IDisposable SubscribeWithState2<T, TState1, TState2>
  463.         (
  464.             this IObservable<T> source,
  465.             TState1 state1,
  466.             TState2 state2,
  467.             Action<T, TState1, TState2> onNext,
  468.             Action<Exception, TState1, TState2> onError,
  469.             Action<TState1, TState2> onCompleted
  470.         ) =>
  471.             source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, onCompleted));
  472.  
  473.         public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
  474.         (
  475.             this IObservable<T> source,
  476.             TState1 state1,
  477.             TState2 state2,
  478.             TState3 state3,
  479.             Action<T, TState1, TState2, TState3> onNext
  480.         ) =>
  481.             source.Subscribe(
  482.                 Observer.CreateSubscribeWithState3Observer(
  483.                     state1,
  484.                     state2,
  485.                     state3,
  486.                     onNext,
  487.                     Stubs<TState1, TState2, TState3>.Throw,
  488.                     Stubs<TState1, TState2, TState3>.Ignore));
  489.  
  490.         public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
  491.         (
  492.             this IObservable<T> source,
  493.             TState1 state1,
  494.             TState2 state2,
  495.             TState3 state3,
  496.             Action<T, TState1, TState2, TState3> onNext,
  497.             Action<Exception, TState1, TState2, TState3> onError
  498.         ) =>
  499.             source.Subscribe(
  500.                 Observer.CreateSubscribeWithState3Observer(
  501.                     state1,
  502.                     state2,
  503.                     state3,
  504.                     onNext,
  505.                     onError,
  506.                     Stubs<TState1, TState2, TState3>.Ignore));
  507.  
  508.         public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
  509.         (
  510.             this IObservable<T> source,
  511.             TState1 state1,
  512.             TState2 state2,
  513.             TState3 state3,
  514.             Action<T, TState1, TState2, TState3> onNext,
  515.             Action<TState1, TState2, TState3> onCompleted
  516.         ) =>
  517.             source.Subscribe(
  518.                 Observer.CreateSubscribeWithState3Observer(
  519.                     state1,
  520.                     state2,
  521.                     state3,
  522.                     onNext,
  523.                     Stubs<TState1, TState2, TState3>.Throw,
  524.                     onCompleted));
  525.  
  526.         public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
  527.         (
  528.             this IObservable<T> source,
  529.             TState1 state1,
  530.             TState2 state2,
  531.             TState3 state3,
  532.             Action<T, TState1, TState2, TState3> onNext,
  533.             Action<Exception, TState1, TState2, TState3> onError,
  534.             Action<TState1, TState2, TState3> onCompleted
  535.         ) =>
  536.             source.Subscribe(
  537.                 Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, onCompleted));
  538.     }
  539.  
  540.     internal static class Stubs
  541.     {
  542.         public static readonly Action Nop = () => { };
  543.         public static readonly Action<Exception> Throw = ex => { ex.Throw(); };
  544.  
  545.         // marker for CatchIgnore and Catch avoid iOS AOT problem.
  546.         public static IObservable<TSource> CatchIgnore<TSource>(Exception ex) =>
  547.             Observable.Empty<TSource>();
  548.     }
  549.  
  550.     internal static class Stubs<T>
  551.     {
  552.         public static readonly Action<T> Ignore = t => { };
  553.         public static readonly Func<T, T> Identity = t => t;
  554.         public static readonly Action<Exception, T> Throw = (ex, _) => { ex.Throw(); };
  555.     }
  556.  
  557.     internal static class Stubs<T1, T2>
  558.     {
  559.         public static readonly Action<T1, T2> Ignore = (x, y) => { };
  560.         public static readonly Action<Exception, T1, T2> Throw = (ex, _, __) => { ex.Throw(); };
  561.     }
  562.  
  563.     internal static class Stubs<T1, T2, T3>
  564.     {
  565.         public static readonly Action<T1, T2, T3> Ignore = (x, y, z) => { };
  566.         public static readonly Action<Exception, T1, T2, T3> Throw = (ex, _, __, ___) => { ex.Throw(); };
  567.     }
  568. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement