You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
329 lines
9.5 KiB
329 lines
9.5 KiB
1 year ago
|
using System;
|
||
|
using System.Collections.Generic;
|
||
|
using UniRx.InternalUtil;
|
||
|
|
||
|
#if (NET_4_6 || NET_STANDARD_2_0)
|
||
|
using System.Runtime.CompilerServices;
|
||
|
using System.Threading;
|
||
|
#endif
|
||
|
|
||
|
namespace UniRx
|
||
|
{
|
||
|
public sealed class AsyncSubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
|
||
|
#if (NET_4_6 || NET_STANDARD_2_0)
|
||
|
, INotifyCompletion
|
||
|
#endif
|
||
|
{
|
||
|
object observerLock = new object();
|
||
|
|
||
|
T lastValue;
|
||
|
bool hasValue;
|
||
|
bool isStopped;
|
||
|
bool isDisposed;
|
||
|
Exception lastError;
|
||
|
IObserver<T> outObserver = EmptyObserver<T>.Instance;
|
||
|
|
||
|
public T Value
|
||
|
{
|
||
|
get
|
||
|
{
|
||
|
ThrowIfDisposed();
|
||
|
if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet");
|
||
|
if (lastError != null) lastError.Throw();
|
||
|
return lastValue;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public bool HasObservers
|
||
|
{
|
||
|
get
|
||
|
{
|
||
|
return !(outObserver is EmptyObserver<T>) && !isStopped && !isDisposed;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public bool IsCompleted { get { return isStopped; } }
|
||
|
|
||
|
public void OnCompleted()
|
||
|
{
|
||
|
IObserver<T> old;
|
||
|
T v;
|
||
|
bool hv;
|
||
|
lock (observerLock)
|
||
|
{
|
||
|
ThrowIfDisposed();
|
||
|
if (isStopped) return;
|
||
|
|
||
|
old = outObserver;
|
||
|
outObserver = EmptyObserver<T>.Instance;
|
||
|
isStopped = true;
|
||
|
v = lastValue;
|
||
|
hv = hasValue;
|
||
|
}
|
||
|
|
||
|
if (hv)
|
||
|
{
|
||
|
old.OnNext(v);
|
||
|
old.OnCompleted();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
old.OnCompleted();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public void OnError(Exception error)
|
||
|
{
|
||
|
if (error == null) throw new ArgumentNullException("error");
|
||
|
|
||
|
IObserver<T> old;
|
||
|
lock (observerLock)
|
||
|
{
|
||
|
ThrowIfDisposed();
|
||
|
if (isStopped) return;
|
||
|
|
||
|
old = outObserver;
|
||
|
outObserver = EmptyObserver<T>.Instance;
|
||
|
isStopped = true;
|
||
|
lastError = error;
|
||
|
}
|
||
|
|
||
|
old.OnError(error);
|
||
|
}
|
||
|
|
||
|
public void OnNext(T value)
|
||
|
{
|
||
|
lock (observerLock)
|
||
|
{
|
||
|
ThrowIfDisposed();
|
||
|
if (isStopped) return;
|
||
|
|
||
|
this.hasValue = true;
|
||
|
this.lastValue = value;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public IDisposable Subscribe(IObserver<T> observer)
|
||
|
{
|
||
|
if (observer == null) throw new ArgumentNullException("observer");
|
||
|
|
||
|
var ex = default(Exception);
|
||
|
var v = default(T);
|
||
|
var hv = false;
|
||
|
|
||
|
lock (observerLock)
|
||
|
{
|
||
|
ThrowIfDisposed();
|
||
|
if (!isStopped)
|
||
|
{
|
||
|
var listObserver = outObserver as ListObserver<T>;
|
||
|
if (listObserver != null)
|
||
|
{
|
||
|
outObserver = listObserver.Add(observer);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
var current = outObserver;
|
||
|
if (current is EmptyObserver<T>)
|
||
|
{
|
||
|
outObserver = observer;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return new Subscription(this, observer);
|
||
|
}
|
||
|
|
||
|
ex = lastError;
|
||
|
v = lastValue;
|
||
|
hv = hasValue;
|
||
|
}
|
||
|
|
||
|
if (ex != null)
|
||
|
{
|
||
|
observer.OnError(ex);
|
||
|
}
|
||
|
else if (hv)
|
||
|
{
|
||
|
observer.OnNext(v);
|
||
|
observer.OnCompleted();
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
observer.OnCompleted();
|
||
|
}
|
||
|
|
||
|
return Disposable.Empty;
|
||
|
}
|
||
|
|
||
|
public void Dispose()
|
||
|
{
|
||
|
lock (observerLock)
|
||
|
{
|
||
|
isDisposed = true;
|
||
|
outObserver = DisposedObserver<T>.Instance;
|
||
|
lastError = null;
|
||
|
lastValue = default(T);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void ThrowIfDisposed()
|
||
|
{
|
||
|
if (isDisposed) throw new ObjectDisposedException("");
|
||
|
}
|
||
|
|
||
|
public bool IsRequiredSubscribeOnCurrentThread()
|
||
|
{
|
||
|
return false;
|
||
|
}
|
||
|
|
||
|
class Subscription : IDisposable
|
||
|
{
|
||
|
readonly object gate = new object();
|
||
|
AsyncSubject<T> parent;
|
||
|
IObserver<T> unsubscribeTarget;
|
||
|
|
||
|
public Subscription(AsyncSubject<T> parent, IObserver<T> unsubscribeTarget)
|
||
|
{
|
||
|
this.parent = parent;
|
||
|
this.unsubscribeTarget = unsubscribeTarget;
|
||
|
}
|
||
|
|
||
|
public void Dispose()
|
||
|
{
|
||
|
lock (gate)
|
||
|
{
|
||
|
if (parent != null)
|
||
|
{
|
||
|
lock (parent.observerLock)
|
||
|
{
|
||
|
var listObserver = parent.outObserver as ListObserver<T>;
|
||
|
if (listObserver != null)
|
||
|
{
|
||
|
parent.outObserver = listObserver.Remove(unsubscribeTarget);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
parent.outObserver = EmptyObserver<T>.Instance;
|
||
|
}
|
||
|
|
||
|
unsubscribeTarget = null;
|
||
|
parent = null;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
|
||
|
#if (NET_4_6 || NET_STANDARD_2_0)
|
||
|
|
||
|
/// <summary>
|
||
|
/// Gets an awaitable object for the current AsyncSubject.
|
||
|
/// </summary>
|
||
|
/// <returns>Object that can be awaited.</returns>
|
||
|
public AsyncSubject<T> GetAwaiter()
|
||
|
{
|
||
|
return this;
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// Specifies a callback action that will be invoked when the subject completes.
|
||
|
/// </summary>
|
||
|
/// <param name="continuation">Callback action that will be invoked when the subject completes.</param>
|
||
|
/// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception>
|
||
|
public void OnCompleted(Action continuation)
|
||
|
{
|
||
|
if (continuation == null)
|
||
|
throw new ArgumentNullException("continuation");
|
||
|
|
||
|
OnCompleted(continuation, true);
|
||
|
}
|
||
|
|
||
|
void OnCompleted(Action continuation, bool originalContext)
|
||
|
{
|
||
|
//
|
||
|
// [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe.
|
||
|
//
|
||
|
this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
|
||
|
}
|
||
|
|
||
|
class AwaitObserver : IObserver<T>
|
||
|
{
|
||
|
private readonly SynchronizationContext _context;
|
||
|
private readonly Action _callback;
|
||
|
|
||
|
public AwaitObserver(Action callback, bool originalContext)
|
||
|
{
|
||
|
if (originalContext)
|
||
|
_context = SynchronizationContext.Current;
|
||
|
|
||
|
_callback = callback;
|
||
|
}
|
||
|
|
||
|
public void OnCompleted()
|
||
|
{
|
||
|
InvokeOnOriginalContext();
|
||
|
}
|
||
|
|
||
|
public void OnError(Exception error)
|
||
|
{
|
||
|
InvokeOnOriginalContext();
|
||
|
}
|
||
|
|
||
|
public void OnNext(T value)
|
||
|
{
|
||
|
}
|
||
|
|
||
|
private void InvokeOnOriginalContext()
|
||
|
{
|
||
|
if (_context != null)
|
||
|
{
|
||
|
//
|
||
|
// No need for OperationStarted and OperationCompleted calls here;
|
||
|
// this code is invoked through await support and will have a way
|
||
|
// to observe its start/complete behavior, either through returned
|
||
|
// Task objects or the async method builder's interaction with the
|
||
|
// SynchronizationContext object.
|
||
|
//
|
||
|
_context.Post(c => ((Action)c)(), _callback);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
_callback();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.
|
||
|
/// </summary>
|
||
|
/// <returns>The last element of the subject. Throws an InvalidOperationException if no element was received.</returns>
|
||
|
/// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
|
||
|
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")]
|
||
|
public T GetResult()
|
||
|
{
|
||
|
if (!isStopped)
|
||
|
{
|
||
|
var e = new ManualResetEvent(false);
|
||
|
OnCompleted(() => e.Set(), false);
|
||
|
e.WaitOne();
|
||
|
}
|
||
|
|
||
|
if (lastError != null)
|
||
|
{
|
||
|
lastError.Throw();
|
||
|
}
|
||
|
|
||
|
if (!hasValue)
|
||
|
throw new InvalidOperationException("NO_ELEMENTS");
|
||
|
|
||
|
return lastValue;
|
||
|
}
|
||
|
#endif
|
||
|
}
|
||
|
}
|