上海虹口龙之梦项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

524 lines
19 KiB

using System;
using System.Threading;
using UniRx.InternalUtil;
namespace UniRx
{
public static class Observer
{
internal static IObserver<T> CreateSubscribeObserver<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
// need compare for avoid iOS AOT
if (onNext == Stubs<T>.Ignore)
{
return new Subscribe_<T>(onError, onCompleted);
}
else
{
return new Subscribe<T>(onNext, onError, onCompleted);
}
}
internal static IObserver<T> CreateSubscribeWithStateObserver<T, TState>(TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
{
return new Subscribe<T, TState>(state, onNext, onError, onCompleted);
}
internal static IObserver<T> CreateSubscribeWithState2Observer<T, TState1, TState2>(TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)
{
return new Subscribe<T, TState1, TState2>(state1, state2, onNext, onError, onCompleted);
}
internal static IObserver<T> CreateSubscribeWithState3Observer<T, TState1, TState2, TState3>(TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)
{
return new Subscribe<T, TState1, TState2, TState3>(state1, state2, state3, onNext, onError, onCompleted);
}
public static IObserver<T> Create<T>(Action<T> onNext)
{
return Create<T>(onNext, UniRx.Stubs.Throw, UniRx.Stubs.Nop);
}
public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError)
{
return Create<T>(onNext, onError, UniRx.Stubs.Nop);
}
public static IObserver<T> Create<T>(Action<T> onNext, Action onCompleted)
{
return Create<T>(onNext, UniRx.Stubs.Throw, onCompleted);
}
public static IObserver<T> Create<T>(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
// need compare for avoid iOS AOT
if (onNext == Stubs<T>.Ignore)
{
return new EmptyOnNextAnonymousObserver<T>(onError, onCompleted);
}
else
{
return new AnonymousObserver<T>(onNext, onError, onCompleted);
}
}
public static IObserver<T> CreateAutoDetachObserver<T>(IObserver<T> observer, IDisposable disposable)
{
return new AutoDetachObserver<T>(observer, disposable);
}
class AnonymousObserver<T> : IObserver<T>
{
readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;
int isStopped = 0;
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
if (isStopped == 0)
{
onNext(value);
}
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted();
}
}
}
class EmptyOnNextAnonymousObserver<T> : IObserver<T>
{
readonly Action<Exception> onError;
readonly Action onCompleted;
int isStopped = 0;
public EmptyOnNextAnonymousObserver(Action<Exception> onError, Action onCompleted)
{
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted();
}
}
}
// same as AnonymousObserver...
class Subscribe<T> : IObserver<T>
{
readonly Action<T> onNext;
readonly Action<Exception> onError;
readonly Action onCompleted;
int isStopped = 0;
public Subscribe(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
if (isStopped == 0)
{
onNext(value);
}
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted();
}
}
}
// same as EmptyOnNextAnonymousObserver...
class Subscribe_<T> : IObserver<T>
{
readonly Action<Exception> onError;
readonly Action onCompleted;
int isStopped = 0;
public Subscribe_(Action<Exception> onError, Action onCompleted)
{
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted();
}
}
}
// with state
class Subscribe<T, TState> : IObserver<T>
{
readonly TState state;
readonly Action<T, TState> onNext;
readonly Action<Exception, TState> onError;
readonly Action<TState> onCompleted;
int isStopped = 0;
public Subscribe(TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
{
this.state = state;
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
if (isStopped == 0)
{
onNext(value, state);
}
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error, state);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted(state);
}
}
}
class Subscribe<T, TState1, TState2> : IObserver<T>
{
readonly TState1 state1;
readonly TState2 state2;
readonly Action<T, TState1, TState2> onNext;
readonly Action<Exception, TState1, TState2> onError;
readonly Action<TState1, TState2> onCompleted;
int isStopped = 0;
public Subscribe(TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)
{
this.state1 = state1;
this.state2 = state2;
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
if (isStopped == 0)
{
onNext(value, state1, state2);
}
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error, state1, state2);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted(state1, state2);
}
}
}
class Subscribe<T, TState1, TState2, TState3> : IObserver<T>
{
readonly TState1 state1;
readonly TState2 state2;
readonly TState3 state3;
readonly Action<T, TState1, TState2, TState3> onNext;
readonly Action<Exception, TState1, TState2, TState3> onError;
readonly Action<TState1, TState2, TState3> onCompleted;
int isStopped = 0;
public Subscribe(TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)
{
this.state1 = state1;
this.state2 = state2;
this.state3 = state3;
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
if (isStopped == 0)
{
onNext(value, state1, state2, state3);
}
}
public void OnError(Exception error)
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onError(error, state1, state2, state3);
}
}
public void OnCompleted()
{
if (Interlocked.Increment(ref isStopped) == 1)
{
onCompleted(state1, state2, state3);
}
}
}
class AutoDetachObserver<T> : UniRx.Operators.OperatorObserverBase<T, T>
{
public AutoDetachObserver(IObserver<T> observer, IDisposable cancel)
: base(observer, cancel)
{
}
public override void OnNext(T value)
{
try
{
base.observer.OnNext(value);
}
catch
{
Dispose();
throw;
}
}
public override void OnError(Exception error)
{
try { observer.OnError(error); }
finally { Dispose(); }
}
public override void OnCompleted()
{
try { observer.OnCompleted(); }
finally { Dispose(); }
}
}
}
public static partial class ObserverExtensions
{
public static IObserver<T> Synchronize<T>(this IObserver<T> observer)
{
return new UniRx.Operators.SynchronizedObserver<T>(observer, new object());
}
public static IObserver<T> Synchronize<T>(this IObserver<T> observer, object gate)
{
return new UniRx.Operators.SynchronizedObserver<T>(observer, gate);
}
}
public static partial class ObservableExtensions
{
public static IDisposable Subscribe<T>(this IObservable<T> source)
{
return source.Subscribe(UniRx.InternalUtil.ThrowObserver<T>.Instance);
}
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
{
return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, Stubs.Nop));
}
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError)
{
return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, Stubs.Nop));
}
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeObserver(onNext, Stubs.Throw, onCompleted));
}
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeObserver(onNext, onError, onCompleted));
}
public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext)
{
return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, Stubs<TState>.Ignore));
}
public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError)
{
return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, Stubs<TState>.Ignore));
}
public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<TState> onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, Stubs<TState>.Throw, onCompleted));
}
public static IDisposable SubscribeWithState<T, TState>(this IObservable<T> source, TState state, Action<T, TState> onNext, Action<Exception, TState> onError, Action<TState> onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeWithStateObserver(state, onNext, onError, onCompleted));
}
public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext)
{
return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs<TState1, TState2>.Throw, Stubs<TState1, TState2>.Ignore));
}
public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError)
{
return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, Stubs<TState1, TState2>.Ignore));
}
public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<TState1, TState2> onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, Stubs<TState1, TState2>.Throw, onCompleted));
}
public static IDisposable SubscribeWithState2<T, TState1, TState2>(this IObservable<T> source, TState1 state1, TState2 state2, Action<T, TState1, TState2> onNext, Action<Exception, TState1, TState2> onError, Action<TState1, TState2> onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeWithState2Observer(state1, state2, onNext, onError, onCompleted));
}
public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext)
{
return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs<TState1, TState2, TState3>.Throw, Stubs<TState1, TState2, TState3>.Ignore));
}
public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError)
{
return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, Stubs<TState1, TState2, TState3>.Ignore));
}
public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<TState1, TState2, TState3> onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, Stubs<TState1, TState2, TState3>.Throw, onCompleted));
}
public static IDisposable SubscribeWithState3<T, TState1, TState2, TState3>(this IObservable<T> source, TState1 state1, TState2 state2, TState3 state3, Action<T, TState1, TState2, TState3> onNext, Action<Exception, TState1, TState2, TState3> onError, Action<TState1, TState2, TState3> onCompleted)
{
return source.Subscribe(Observer.CreateSubscribeWithState3Observer(state1, state2, state3, onNext, onError, onCompleted));
}
}
internal static class Stubs
{
public static readonly Action Nop = () => { };
public static readonly Action<Exception> Throw = ex => { ex.Throw(); };
// marker for CatchIgnore and Catch avoid iOS AOT problem.
public static IObservable<TSource> CatchIgnore<TSource>(Exception ex)
{
return Observable.Empty<TSource>();
}
}
internal static class Stubs<T>
{
public static readonly Action<T> Ignore = (T t) => { };
public static readonly Func<T, T> Identity = (T t) => t;
public static readonly Action<Exception, T> Throw = (ex, _) => { ex.Throw(); };
}
internal static class Stubs<T1, T2>
{
public static readonly Action<T1, T2> Ignore = (x, y) => { };
public static readonly Action<Exception, T1, T2> Throw = (ex, _, __) => { ex.Throw(); };
}
internal static class Stubs<T1, T2, T3>
{
public static readonly Action<T1, T2, T3> Ignore = (x, y, z) => { };
public static readonly Action<Exception, T1, T2, T3> Throw = (ex, _, __, ___) => { ex.Throw(); };
}
}