You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
328 lines
9.5 KiB
328 lines
9.5 KiB
using System; |
|
using System.Collections.Generic; |
|
using UniRx.InternalUtil; |
|
|
|
#if (NET_4_6 || NET_STANDARD_2_0) |
|
using System.Runtime.CompilerServices; |
|
using System.Threading; |
|
#endif |
|
|
|
namespace UniRx |
|
{ |
|
public sealed class AsyncSubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable |
|
#if (NET_4_6 || NET_STANDARD_2_0) |
|
, INotifyCompletion |
|
#endif |
|
{ |
|
object observerLock = new object(); |
|
|
|
T lastValue; |
|
bool hasValue; |
|
bool isStopped; |
|
bool isDisposed; |
|
Exception lastError; |
|
IObserver<T> outObserver = EmptyObserver<T>.Instance; |
|
|
|
public T Value |
|
{ |
|
get |
|
{ |
|
ThrowIfDisposed(); |
|
if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet"); |
|
if (lastError != null) lastError.Throw(); |
|
return lastValue; |
|
} |
|
} |
|
|
|
public bool HasObservers |
|
{ |
|
get |
|
{ |
|
return !(outObserver is EmptyObserver<T>) && !isStopped && !isDisposed; |
|
} |
|
} |
|
|
|
public bool IsCompleted { get { return isStopped; } } |
|
|
|
public void OnCompleted() |
|
{ |
|
IObserver<T> old; |
|
T v; |
|
bool hv; |
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (isStopped) return; |
|
|
|
old = outObserver; |
|
outObserver = EmptyObserver<T>.Instance; |
|
isStopped = true; |
|
v = lastValue; |
|
hv = hasValue; |
|
} |
|
|
|
if (hv) |
|
{ |
|
old.OnNext(v); |
|
old.OnCompleted(); |
|
} |
|
else |
|
{ |
|
old.OnCompleted(); |
|
} |
|
} |
|
|
|
public void OnError(Exception error) |
|
{ |
|
if (error == null) throw new ArgumentNullException("error"); |
|
|
|
IObserver<T> old; |
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (isStopped) return; |
|
|
|
old = outObserver; |
|
outObserver = EmptyObserver<T>.Instance; |
|
isStopped = true; |
|
lastError = error; |
|
} |
|
|
|
old.OnError(error); |
|
} |
|
|
|
public void OnNext(T value) |
|
{ |
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (isStopped) return; |
|
|
|
this.hasValue = true; |
|
this.lastValue = value; |
|
} |
|
} |
|
|
|
public IDisposable Subscribe(IObserver<T> observer) |
|
{ |
|
if (observer == null) throw new ArgumentNullException("observer"); |
|
|
|
var ex = default(Exception); |
|
var v = default(T); |
|
var hv = false; |
|
|
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (!isStopped) |
|
{ |
|
var listObserver = outObserver as ListObserver<T>; |
|
if (listObserver != null) |
|
{ |
|
outObserver = listObserver.Add(observer); |
|
} |
|
else |
|
{ |
|
var current = outObserver; |
|
if (current is EmptyObserver<T>) |
|
{ |
|
outObserver = observer; |
|
} |
|
else |
|
{ |
|
outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer })); |
|
} |
|
} |
|
|
|
return new Subscription(this, observer); |
|
} |
|
|
|
ex = lastError; |
|
v = lastValue; |
|
hv = hasValue; |
|
} |
|
|
|
if (ex != null) |
|
{ |
|
observer.OnError(ex); |
|
} |
|
else if (hv) |
|
{ |
|
observer.OnNext(v); |
|
observer.OnCompleted(); |
|
} |
|
else |
|
{ |
|
observer.OnCompleted(); |
|
} |
|
|
|
return Disposable.Empty; |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
lock (observerLock) |
|
{ |
|
isDisposed = true; |
|
outObserver = DisposedObserver<T>.Instance; |
|
lastError = null; |
|
lastValue = default(T); |
|
} |
|
} |
|
|
|
void ThrowIfDisposed() |
|
{ |
|
if (isDisposed) throw new ObjectDisposedException(""); |
|
} |
|
|
|
public bool IsRequiredSubscribeOnCurrentThread() |
|
{ |
|
return false; |
|
} |
|
|
|
class Subscription : IDisposable |
|
{ |
|
readonly object gate = new object(); |
|
AsyncSubject<T> parent; |
|
IObserver<T> unsubscribeTarget; |
|
|
|
public Subscription(AsyncSubject<T> parent, IObserver<T> unsubscribeTarget) |
|
{ |
|
this.parent = parent; |
|
this.unsubscribeTarget = unsubscribeTarget; |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
lock (gate) |
|
{ |
|
if (parent != null) |
|
{ |
|
lock (parent.observerLock) |
|
{ |
|
var listObserver = parent.outObserver as ListObserver<T>; |
|
if (listObserver != null) |
|
{ |
|
parent.outObserver = listObserver.Remove(unsubscribeTarget); |
|
} |
|
else |
|
{ |
|
parent.outObserver = EmptyObserver<T>.Instance; |
|
} |
|
|
|
unsubscribeTarget = null; |
|
parent = null; |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
|
|
#if (NET_4_6 || NET_STANDARD_2_0) |
|
|
|
/// <summary> |
|
/// Gets an awaitable object for the current AsyncSubject. |
|
/// </summary> |
|
/// <returns>Object that can be awaited.</returns> |
|
public AsyncSubject<T> GetAwaiter() |
|
{ |
|
return this; |
|
} |
|
|
|
/// <summary> |
|
/// Specifies a callback action that will be invoked when the subject completes. |
|
/// </summary> |
|
/// <param name="continuation">Callback action that will be invoked when the subject completes.</param> |
|
/// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception> |
|
public void OnCompleted(Action continuation) |
|
{ |
|
if (continuation == null) |
|
throw new ArgumentNullException("continuation"); |
|
|
|
OnCompleted(continuation, true); |
|
} |
|
|
|
void OnCompleted(Action continuation, bool originalContext) |
|
{ |
|
// |
|
// [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe. |
|
// |
|
this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext)); |
|
} |
|
|
|
class AwaitObserver : IObserver<T> |
|
{ |
|
private readonly SynchronizationContext _context; |
|
private readonly Action _callback; |
|
|
|
public AwaitObserver(Action callback, bool originalContext) |
|
{ |
|
if (originalContext) |
|
_context = SynchronizationContext.Current; |
|
|
|
_callback = callback; |
|
} |
|
|
|
public void OnCompleted() |
|
{ |
|
InvokeOnOriginalContext(); |
|
} |
|
|
|
public void OnError(Exception error) |
|
{ |
|
InvokeOnOriginalContext(); |
|
} |
|
|
|
public void OnNext(T value) |
|
{ |
|
} |
|
|
|
private void InvokeOnOriginalContext() |
|
{ |
|
if (_context != null) |
|
{ |
|
// |
|
// No need for OperationStarted and OperationCompleted calls here; |
|
// this code is invoked through await support and will have a way |
|
// to observe its start/complete behavior, either through returned |
|
// Task objects or the async method builder's interaction with the |
|
// SynchronizationContext object. |
|
// |
|
_context.Post(c => ((Action)c)(), _callback); |
|
} |
|
else |
|
{ |
|
_callback(); |
|
} |
|
} |
|
} |
|
|
|
/// <summary> |
|
/// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally. |
|
/// </summary> |
|
/// <returns>The last element of the subject. Throws an InvalidOperationException if no element was received.</returns> |
|
/// <exception cref="InvalidOperationException">The source sequence is empty.</exception> |
|
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")] |
|
public T GetResult() |
|
{ |
|
if (!isStopped) |
|
{ |
|
var e = new ManualResetEvent(false); |
|
OnCompleted(() => e.Set(), false); |
|
e.WaitOne(); |
|
} |
|
|
|
if (lastError != null) |
|
{ |
|
lastError.Throw(); |
|
} |
|
|
|
if (!hasValue) |
|
throw new InvalidOperationException("NO_ELEMENTS"); |
|
|
|
return lastValue; |
|
} |
|
#endif |
|
} |
|
}
|
|
|