上海虹口龙之梦项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

251 lines
7.1 KiB

using System;
using System.Collections.Generic;
using UniRx.InternalUtil;
namespace UniRx
{
public sealed class ReplaySubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable
{
object observerLock = new object();
bool isStopped;
bool isDisposed;
Exception lastError;
IObserver<T> outObserver = EmptyObserver<T>.Instance;
readonly int bufferSize;
readonly TimeSpan window;
readonly DateTimeOffset startTime;
readonly IScheduler scheduler;
Queue<TimeInterval<T>> queue = new Queue<TimeInterval<T>>();
public ReplaySubject()
: this(int.MaxValue, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
{
}
public ReplaySubject(IScheduler scheduler)
: this(int.MaxValue, TimeSpan.MaxValue, scheduler)
{
}
public ReplaySubject(int bufferSize)
: this(bufferSize, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration)
{
}
public ReplaySubject(int bufferSize, IScheduler scheduler)
: this(bufferSize, TimeSpan.MaxValue, scheduler)
{
}
public ReplaySubject(TimeSpan window)
: this(int.MaxValue, window, Scheduler.DefaultSchedulers.Iteration)
{
}
public ReplaySubject(TimeSpan window, IScheduler scheduler)
: this(int.MaxValue, window, scheduler)
{
}
// full constructor
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler)
{
if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize");
if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window");
if (scheduler == null) throw new ArgumentNullException("scheduler");
this.bufferSize = bufferSize;
this.window = window;
this.scheduler = scheduler;
startTime = scheduler.Now;
}
void Trim()
{
var elapsedTime = Scheduler.Normalize(scheduler.Now - startTime);
while (queue.Count > bufferSize)
{
queue.Dequeue();
}
while (queue.Count > 0 && elapsedTime.Subtract(queue.Peek().Interval).CompareTo(window) > 0)
{
queue.Dequeue();
}
}
public void OnCompleted()
{
IObserver<T> old;
lock (observerLock)
{
ThrowIfDisposed();
if (isStopped) return;
old = outObserver;
outObserver = EmptyObserver<T>.Instance;
isStopped = true;
Trim();
}
old.OnCompleted();
}
public void OnError(Exception error)
{
if (error == null) throw new ArgumentNullException("error");
IObserver<T> old;
lock (observerLock)
{
ThrowIfDisposed();
if (isStopped) return;
old = outObserver;
outObserver = EmptyObserver<T>.Instance;
isStopped = true;
lastError = error;
Trim();
}
old.OnError(error);
}
public void OnNext(T value)
{
IObserver<T> current;
lock (observerLock)
{
ThrowIfDisposed();
if (isStopped) return;
// enQ
queue.Enqueue(new TimeInterval<T>(value, scheduler.Now - startTime));
Trim();
current = outObserver;
}
current.OnNext(value);
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException("observer");
var ex = default(Exception);
var subscription = default(Subscription);
lock (observerLock)
{
ThrowIfDisposed();
if (!isStopped)
{
var listObserver = outObserver as ListObserver<T>;
if (listObserver != null)
{
outObserver = listObserver.Add(observer);
}
else
{
var current = outObserver;
if (current is EmptyObserver<T>)
{
outObserver = observer;
}
else
{
outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer }));
}
}
subscription = new Subscription(this, observer);
}
ex = lastError;
Trim();
foreach (var item in queue)
{
observer.OnNext(item.Value);
}
}
if (subscription != null)
{
return subscription;
}
else if (ex != null)
{
observer.OnError(ex);
}
else
{
observer.OnCompleted();
}
return Disposable.Empty;
}
public void Dispose()
{
lock (observerLock)
{
isDisposed = true;
outObserver = DisposedObserver<T>.Instance;
lastError = null;
queue = null;
}
}
void ThrowIfDisposed()
{
if (isDisposed) throw new ObjectDisposedException("");
}
public bool IsRequiredSubscribeOnCurrentThread()
{
return false;
}
class Subscription : IDisposable
{
readonly object gate = new object();
ReplaySubject<T> parent;
IObserver<T> unsubscribeTarget;
public Subscription(ReplaySubject<T> parent, IObserver<T> unsubscribeTarget)
{
this.parent = parent;
this.unsubscribeTarget = unsubscribeTarget;
}
public void Dispose()
{
lock (gate)
{
if (parent != null)
{
lock (parent.observerLock)
{
var listObserver = parent.outObserver as ListObserver<T>;
if (listObserver != null)
{
parent.outObserver = listObserver.Remove(unsubscribeTarget);
}
else
{
parent.outObserver = EmptyObserver<T>.Instance;
}
unsubscribeTarget = null;
parent = null;
}
}
}
}
}
}
}