Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Threading;
- using UniRx.InternalUtil;
- using UniRx.Operators;
- namespace UniRx
- {
- public static class Observer
- {
- internal static IObserver<T> CreateSubscribeObserver<T>
- (Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- // need compare for avoid iOS AOT
- if (onNext == Stubs<T>.Ignore)
- return new Subscribe_<T>(onError, onCompleted);
- return new Subscribe<T>(onNext, onError, onCompleted);
- }
- internal static IObserver<T> CreateSubscribeWithStateObserver<T, TState>
- (TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted) =>
- new Subscribe<T, TState>(state, onNext, onError, onCompleted);
- internal static IObserver<T> CreateSubscribeWithState2Observer<T, TState1, TState2>
- (
- TState1 state1,
- TState2 state2,
- Action<T, TState1, TState2> onNext,
- Action<Exception, TState1, TState2> onError,
- Action<TState1, TState2> onCompleted
- ) =>
- new Subscribe<T, TState1, TState2>(state1, state2, onNext, onError, onCompleted);
- internal static IObserver<T> CreateSubscribeWithState3Observer<T, TState1, TState2, TState3>
- (
- TState1 state1,
- TState2 state2,
- TState3 state3,
- Action<T, TState1, TState2, TState3> onNext,
- Action<Exception, TState1, TState2, TState3> onError,
- Action<TState1, TState2, TState3> onCompleted
- ) =>
- new Subscribe<T, TState1, TState2, TState3>(state1, state2, state3, onNext, onError, onCompleted);
- public static IObserver<T> Create<T>(Action<T> onNext) =>
- Create(onNext, Stubs.Throw, Stubs.Nop);
- public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError) =>
- Create(onNext, onError, Stubs.Nop);
- public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted) =>
- Create(onNext, Stubs.Throw, onCompleted);
- public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- // need compare for avoid iOS AOT
- if (onNext == Stubs<T>.Ignore)
- return new EmptyOnNextAnonymousObserver<T>(onError, onCompleted);
- return new AnonymousObserver<T>(onNext, onError, onCompleted);
- }
- public static IObserver<T> CreateAutoDetachObserver<T>(IObserver<T> observer, IDisposable disposable) =>
- new AutoDetachObserver<T>(observer, disposable);
- private class AnonymousObserver<T> : IObserver<T>
- {
- private readonly Action onCompleted;
- private readonly Action<Exception> onError;
- private readonly Action<T> onNext;
- private int isStopped;
- public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- onNext(value);
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onError(error);
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onCompleted();
- }
- }
- private class EmptyOnNextAnonymousObserver<T> : IObserver<T>
- {
- private readonly Action onCompleted;
- private readonly Action<Exception> onError;
- private int isStopped;
- public EmptyOnNextAnonymousObserver(Action<Exception> onError, Action onCompleted)
- {
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value) { }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onError(error);
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onCompleted();
- }
- }
- // same as AnonymousObserver...
- private class Subscribe<T> : IObserver<T>
- {
- private readonly Action onCompleted;
- private readonly Action<Exception> onError;
- private readonly Action<T> onNext;
- private int isStopped;
- public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted)
- {
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- onNext(value);
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onError(error);
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onCompleted();
- }
- }
- // same as EmptyOnNextAnonymousObserver...
- private class Subscribe_<T> : IObserver<T>
- {
- private readonly Action onCompleted;
- private readonly Action<Exception> onError;
- private int isStopped;
- public Subscribe_(Action<Exception> onError, Action onCompleted)
- {
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value) { }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onError(error);
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onCompleted();
- }
- }
- // with state
- private class Subscribe<T, TState> : IObserver<T>
- {
- private readonly Action<TState> onCompleted;
- private readonly Action<Exception, TState> onError;
- private readonly Action<T, TState> onNext;
- private readonly TState state;
- private int isStopped;
- public Subscribe
- (TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
- {
- this.state = state;
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- onNext(value, state);
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onError(error, state);
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onCompleted(state);
- }
- }
- private class Subscribe<T, TState1, TState2> : IObserver<T>
- {
- private readonly Action<TState1, TState2> onCompleted;
- private readonly Action<Exception, TState1, TState2> onError;
- private readonly Action<T, TState1, TState2> onNext;
- private readonly TState1 state1;
- private readonly TState2 state2;
- private int isStopped;
- public Subscribe
- (
- TState1 state1,
- TState2 state2,
- Action<T, TState1, TState2> onNext,
- Action<Exception, TState1, TState2> onError,
- Action<TState1, TState2> onCompleted
- )
- {
- this.state1 = state1;
- this.state2 = state2;
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- onNext(value, state1, state2);
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onError(error, state1, state2);
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onCompleted(state1, state2);
- }
- }
- private class Subscribe<T, TState1, TState2, TState3> : IObserver<T>
- {
- private readonly Action<TState1, TState2, TState3> onCompleted;
- private readonly Action<Exception, TState1, TState2, TState3> onError;
- private readonly Action<T, TState1, TState2, TState3> onNext;
- private readonly TState1 state1;
- private readonly TState2 state2;
- private readonly TState3 state3;
- private int isStopped;
- public Subscribe
- (
- TState1 state1,
- TState2 state2,
- TState3 state3,
- Action<T, TState1, TState2, TState3> onNext,
- Action<Exception, TState1, TState2, TState3> onError,
- Action<TState1, TState2, TState3> onCompleted
- )
- {
- this.state1 = state1;
- this.state2 = state2;
- this.state3 = state3;
- this.onNext = onNext;
- this.onError = onError;
- this.onCompleted = onCompleted;
- }
- public void OnNext(T value)
- {
- if (isStopped == 0)
- onNext(value, state1, state2, state3);
- }
- public void OnError(Exception error)
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onError(error, state1, state2, state3);
- }
- public void OnCompleted()
- {
- if (Interlocked.Increment(ref isStopped) == 1)
- onCompleted(state1, state2, state3);
- }
- }
- private class AutoDetachObserver<T> : OperatorObserverBase<T, T>
- {
- public AutoDetachObserver(IObserver<T> observer, IDisposable cancel) : base(observer, cancel) { }
- public override void OnNext(T value)
- {
- try
- {
- observer.OnNext(value);
- }
- catch
- {
- Dispose();
- throw;
- }
- }
- public override void OnError(Exception error)
- {
- try
- {
- observer.OnError(error);
- }
- finally
- {
- Dispose();
- }
- }
- public override void OnCompleted()
- {
- try
- {
- observer.OnCompleted();
- }
- finally
- {
- Dispose();
- }
- }
- }
- }
- public static class ObserverExtensions
- {
- public static IObserver<T> Synchronize<T>(this IObserver<T> observer) =>
- new SynchronizedObserver<T>(observer, new object());
- public static IObserver<T> Synchronize<T>(this IObserver<T> observer, object gate) =>
- new SynchronizedObserver<T>(observer, gate);
- }
- public static class ObservableExtensions
- {
- public static IDisposable Subscribe<T>(this IObservable<T> source) =>
- source.Subscribe(ThrowObserver<T>.Instance);
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext) =>
- source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action onNext) =>
- source.Subscribe(_ => onNext?.Invoke());
- public static IDisposable Subscribe<T>
- (this IObservable<T> source, Action<T> onNext, Action<Exception> onError) =>
- source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, Stubs.Nop));
- public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted) =>
- source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted));
- public static IDisposable Subscribe<T>
- (this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted) =>
- source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, onCompleted));
- public static IDisposable SubscribeWithState<T, TState>
- (this IObservable<T> source, TState state, Action<T, TState> onNext) =>
- source.Subscribe(
- Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, Stubs<TState>.Ignore));
- public static IDisposable SubscribeWithState<T, TState>
- (this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError) =>
- source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, Stubs<TState>.Ignore));
- public static IDisposable SubscribeWithState<T, TState>
- (this IObservable<T> source, TState state, Action<T, TState> onNext, Action<TState> onCompleted) =>
- source.Subscribe(
- Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, onCompleted));
- public static IDisposable SubscribeWithState<T, TState>
- (
- this IObservable<T> source,
- TState state,
- Action<T, TState> onNext,
- Action<Exception, TState> onError,
- Action<TState> onCompleted
- ) =>
- source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, onCompleted));
- public static IDisposable SubscribeWithState2<T, TState1, TState2>
- (this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext) =>
- source.Subscribe(
- Observer.CreateSubscribeWithState2Observer(
- state1,
- state2,
- onNext,
- Stubs<TState1, TState2>.Throw,
- Stubs<TState1, TState2>.Ignore));
- public static IDisposable SubscribeWithState2<T, TState1, TState2>
- (
- this IObservable<T> source,
- TState1 state1,
- TState2 state2,
- Action<T, TState1, TState2> onNext,
- Action<Exception, TState1, TState2> onError
- ) =>
- source.Subscribe(
- Observer.CreateSubscribeWithState2Observer(
- state1,
- state2,
- onNext,
- onError,
- Stubs<TState1, TState2>.Ignore));
- public static IDisposable SubscribeWithState2<T, TState1, TState2>
- (
- this IObservable<T> source,
- TState1 state1,
- TState2 state2,
- Action<T, TState1, TState2> onNext,
- Action<TState1, TState2> onCompleted
- ) =>
- source.Subscribe(
- Observer.CreateSubscribeWithState2Observer(
- state1,
- state2,
- onNext,
- Stubs<TState1, TState2>.Throw,
- onCompleted));
- public static IDisposable SubscribeWithState2<T, TState1, TState2>
- (
- this IObservable<T> source,
- TState1 state1,
- TState2 state2,
- Action<T, TState1, TState2> onNext,
- Action<Exception, TState1, TState2> onError,
- Action<TState1, TState2> onCompleted
- ) =>
- source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, onCompleted));
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
- (
- this IObservable<T> source,
- TState1 state1,
- TState2 state2,
- TState3 state3,
- Action<T, TState1, TState2, TState3> onNext
- ) =>
- source.Subscribe(
- Observer.CreateSubscribeWithState3Observer(
- state1,
- state2,
- state3,
- onNext,
- Stubs<TState1, TState2, TState3>.Throw,
- Stubs<TState1, TState2, TState3>.Ignore));
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
- (
- this IObservable<T> source,
- TState1 state1,
- TState2 state2,
- TState3 state3,
- Action<T, TState1, TState2, TState3> onNext,
- Action<Exception, TState1, TState2, TState3> onError
- ) =>
- source.Subscribe(
- Observer.CreateSubscribeWithState3Observer(
- state1,
- state2,
- state3,
- onNext,
- onError,
- Stubs<TState1, TState2, TState3>.Ignore));
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
- (
- this IObservable<T> source,
- TState1 state1,
- TState2 state2,
- TState3 state3,
- Action<T, TState1, TState2, TState3> onNext,
- Action<TState1, TState2, TState3> onCompleted
- ) =>
- source.Subscribe(
- Observer.CreateSubscribeWithState3Observer(
- state1,
- state2,
- state3,
- onNext,
- Stubs<TState1, TState2, TState3>.Throw,
- onCompleted));
- public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>
- (
- this IObservable<T> source,
- TState1 state1,
- TState2 state2,
- TState3 state3,
- Action<T, TState1, TState2, TState3> onNext,
- Action<Exception, TState1, TState2, TState3> onError,
- Action<TState1, TState2, TState3> onCompleted
- ) =>
- source.Subscribe(
- Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, onCompleted));
- }
- internal static class Stubs
- {
- public static readonly Action Nop = () => { };
- public static readonly Action<Exception> Throw = ex => { ex.Throw(); };
- // marker for CatchIgnore and Catch avoid iOS AOT problem.
- public static IObservable<TSource> CatchIgnore<TSource>(Exception ex) =>
- Observable.Empty<TSource>();
- }
- internal static class Stubs<T>
- {
- public static readonly Action<T> Ignore = t => { };
- public static readonly Func<T, T> Identity = t => t;
- public static readonly Action<Exception, T> Throw = (ex, _) => { ex.Throw(); };
- }
- internal static class Stubs<T1, T2>
- {
- public static readonly Action<T1, T2> Ignore = (x, y) => { };
- public static readonly Action<Exception, T1, T2> Throw = (ex, _, __) => { ex.Throw(); };
- }
- internal static class Stubs<T1, T2, T3>
- {
- public static readonly Action<T1, T2, T3> Ignore = (x, y, z) => { };
- public static readonly Action<Exception, T1, T2, T3> Throw = (ex, _, __, ___) => { ex.Throw(); };
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement