上海虹口龙之梦项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

328 lines
9.5 KiB

using System;
using System.Collections.Generic;
using UniRx.InternalUtil;
#if (NET_4_6 || NET_STANDARD_2_0)
using System.Runtime.CompilerServices;
using System.Threading;
#endif
namespace UniRx
{
public sealed class AsyncSubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
#if (NET_4_6 || NET_STANDARD_2_0)
, INotifyCompletion
#endif
{
object observerLock = new object();
T lastValue;
bool hasValue;
bool isStopped;
bool isDisposed;
Exception lastError;
IObserver<T> outObserver = EmptyObserver<T>.Instance;
public T Value
{
get
{
ThrowIfDisposed();
if (!isStopped) throw new InvalidOperationException("AsyncSubject is not completed yet");
if (lastError != null) lastError.Throw();
return lastValue;
}
}
public bool HasObservers
{
get
{
return !(outObserver is EmptyObserver<T>) && !isStopped && !isDisposed;
}
}
public bool IsCompleted { get { return isStopped; } }
public void OnCompleted()
{
IObserver<T> old;
T v;
bool hv;
lock (observerLock)
{
ThrowIfDisposed();
if (isStopped) return;
old = outObserver;
outObserver = EmptyObserver<T>.Instance;
isStopped = true;
v = lastValue;
hv = hasValue;
}
if (hv)
{
old.OnNext(v);
old.OnCompleted();
}
else
{
old.OnCompleted();
}
}
public void OnError(Exception error)
{
if (error == null) throw new ArgumentNullException("error");
IObserver<T> old;
lock (observerLock)
{
ThrowIfDisposed();
if (isStopped) return;
old = outObserver;
outObserver = EmptyObserver<T>.Instance;
isStopped = true;
lastError = error;
}
old.OnError(error);
}
public void OnNext(T value)
{
lock (observerLock)
{
ThrowIfDisposed();
if (isStopped) return;
this.hasValue = true;
this.lastValue = value;
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
var ex = default(Exception);
var v = default(T);
var hv = false;
lock (observerLock)
{
ThrowIfDisposed();
if (!isStopped)
{
var listObserver = outObserver as ListObserver<T>;
if (listObserver != null)
{
outObserver = listObserver.Add(observer);
}
else
{
var current = outObserver;
if (current is EmptyObserver<T>)
{
outObserver = observer;
}
else
{
outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
}
}
return new Subscription(this, observer);
}
ex = lastError;
v = lastValue;
hv = hasValue;
}
if (ex != null)
{
observer.OnError(ex);
}
else if (hv)
{
observer.OnNext(v);
observer.OnCompleted();
}
else
{
observer.OnCompleted();
}
return Disposable.Empty;
}
public void Dispose()
{
lock (observerLock)
{
isDisposed = true;
outObserver = DisposedObserver<T>.Instance;
lastError = null;
lastValue = default(T);
}
}
void ThrowIfDisposed()
{
if (isDisposed) throw new ObjectDisposedException("");
}
public bool IsRequiredSubscribeOnCurrentThread()
{
return false;
}
class Subscription : IDisposable
{
readonly object gate = new object();
AsyncSubject<T> parent;
IObserver<T> unsubscribeTarget;
public Subscription(AsyncSubject<T> parent, IObserver<T> unsubscribeTarget)
{
this.parent = parent;
this.unsubscribeTarget = unsubscribeTarget;
}
public void Dispose()
{
lock (gate)
{
if (parent != null)
{
lock (parent.observerLock)
{
var listObserver = parent.outObserver as ListObserver<T>;
if (listObserver != null)
{
parent.outObserver = listObserver.Remove(unsubscribeTarget);
}
else
{
parent.outObserver = EmptyObserver<T>.Instance;
}
unsubscribeTarget = null;
parent = null;
}
}
}
}
}
#if (NET_4_6 || NET_STANDARD_2_0)
/// <summary>
/// Gets an awaitable object for the current AsyncSubject.
/// </summary>
/// <returns>Object that can be awaited.</returns>
public AsyncSubject<T> GetAwaiter()
{
return this;
}
/// <summary>
/// Specifies a callback action that will be invoked when the subject completes.
/// </summary>
/// <param name="continuation">Callback action that will be invoked when the subject completes.</param>
/// <exception cref="ArgumentNullException"><paramref name="continuation"/> is null.</exception>
public void OnCompleted(Action continuation)
{
if (continuation == null)
throw new ArgumentNullException("continuation");
OnCompleted(continuation, true);
}
void OnCompleted(Action continuation, bool originalContext)
{
//
// [OK] Use of unsafe Subscribe: this type's Subscribe implementation is safe.
//
this.Subscribe/*Unsafe*/(new AwaitObserver(continuation, originalContext));
}
class AwaitObserver : IObserver<T>
{
private readonly SynchronizationContext _context;
private readonly Action _callback;
public AwaitObserver(Action callback, bool originalContext)
{
if (originalContext)
_context = SynchronizationContext.Current;
_callback = callback;
}
public void OnCompleted()
{
InvokeOnOriginalContext();
}
public void OnError(Exception error)
{
InvokeOnOriginalContext();
}
public void OnNext(T value)
{
}
private void InvokeOnOriginalContext()
{
if (_context != null)
{
//
// No need for OperationStarted and OperationCompleted calls here;
// this code is invoked through await support and will have a way
// to observe its start/complete behavior, either through returned
// Task objects or the async method builder's interaction with the
// SynchronizationContext object.
//
_context.Post(c => ((Action)c)(), _callback);
}
else
{
_callback();
}
}
}
/// <summary>
/// Gets the last element of the subject, potentially blocking until the subject completes successfully or exceptionally.
/// </summary>
/// <returns>The last element of the subject. Throws an InvalidOperationException if no element was received.</returns>
/// <exception cref="InvalidOperationException">The source sequence is empty.</exception>
[System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Await pattern for C# and VB compilers.")]
public T GetResult()
{
if (!isStopped)
{
var e = new ManualResetEvent(false);
OnCompleted(() => e.Set(), false);
e.WaitOne();
}
if (lastError != null)
{
lastError.Throw();
}
if (!hasValue)
throw new InvalidOperationException("NO_ELEMENTS");
return lastValue;
}
#endif
}
}