You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
251 lines
7.1 KiB
251 lines
7.1 KiB
using System; |
|
using System.Collections.Generic; |
|
using UniRx.InternalUtil; |
|
|
|
namespace UniRx |
|
{ |
|
public sealed class ReplaySubject<T> : ISubject<T>, IOptimizedObservable<T>, IDisposable |
|
{ |
|
object observerLock = new object(); |
|
|
|
bool isStopped; |
|
bool isDisposed; |
|
Exception lastError; |
|
IObserver<T> outObserver = EmptyObserver<T>.Instance; |
|
|
|
readonly int bufferSize; |
|
readonly TimeSpan window; |
|
readonly DateTimeOffset startTime; |
|
readonly IScheduler scheduler; |
|
Queue<TimeInterval<T>> queue = new Queue<TimeInterval<T>>(); |
|
|
|
|
|
public ReplaySubject() |
|
: this(int.MaxValue, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration) |
|
{ |
|
} |
|
|
|
public ReplaySubject(IScheduler scheduler) |
|
: this(int.MaxValue, TimeSpan.MaxValue, scheduler) |
|
{ |
|
} |
|
|
|
public ReplaySubject(int bufferSize) |
|
: this(bufferSize, TimeSpan.MaxValue, Scheduler.DefaultSchedulers.Iteration) |
|
{ |
|
} |
|
|
|
public ReplaySubject(int bufferSize, IScheduler scheduler) |
|
: this(bufferSize, TimeSpan.MaxValue, scheduler) |
|
{ |
|
} |
|
|
|
public ReplaySubject(TimeSpan window) |
|
: this(int.MaxValue, window, Scheduler.DefaultSchedulers.Iteration) |
|
{ |
|
} |
|
|
|
public ReplaySubject(TimeSpan window, IScheduler scheduler) |
|
: this(int.MaxValue, window, scheduler) |
|
{ |
|
} |
|
|
|
// full constructor |
|
public ReplaySubject(int bufferSize, TimeSpan window, IScheduler scheduler) |
|
{ |
|
if (bufferSize < 0) throw new ArgumentOutOfRangeException("bufferSize"); |
|
if (window < TimeSpan.Zero) throw new ArgumentOutOfRangeException("window"); |
|
if (scheduler == null) throw new ArgumentNullException("scheduler"); |
|
|
|
this.bufferSize = bufferSize; |
|
this.window = window; |
|
this.scheduler = scheduler; |
|
startTime = scheduler.Now; |
|
} |
|
|
|
void Trim() |
|
{ |
|
var elapsedTime = Scheduler.Normalize(scheduler.Now - startTime); |
|
|
|
while (queue.Count > bufferSize) |
|
{ |
|
queue.Dequeue(); |
|
} |
|
while (queue.Count > 0 && elapsedTime.Subtract(queue.Peek().Interval).CompareTo(window) > 0) |
|
{ |
|
queue.Dequeue(); |
|
} |
|
} |
|
|
|
public void OnCompleted() |
|
{ |
|
IObserver<T> old; |
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (isStopped) return; |
|
|
|
old = outObserver; |
|
outObserver = EmptyObserver<T>.Instance; |
|
isStopped = true; |
|
Trim(); |
|
} |
|
|
|
old.OnCompleted(); |
|
} |
|
|
|
public void OnError(Exception error) |
|
{ |
|
if (error == null) throw new ArgumentNullException("error"); |
|
|
|
IObserver<T> old; |
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (isStopped) return; |
|
|
|
old = outObserver; |
|
outObserver = EmptyObserver<T>.Instance; |
|
isStopped = true; |
|
lastError = error; |
|
Trim(); |
|
} |
|
|
|
old.OnError(error); |
|
} |
|
|
|
public void OnNext(T value) |
|
{ |
|
IObserver<T> current; |
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (isStopped) return; |
|
|
|
// enQ |
|
queue.Enqueue(new TimeInterval<T>(value, scheduler.Now - startTime)); |
|
Trim(); |
|
|
|
current = outObserver; |
|
} |
|
|
|
current.OnNext(value); |
|
} |
|
|
|
public IDisposable Subscribe(IObserver<T> observer) |
|
{ |
|
if (observer == null) throw new ArgumentNullException("observer"); |
|
|
|
var ex = default(Exception); |
|
var subscription = default(Subscription); |
|
|
|
lock (observerLock) |
|
{ |
|
ThrowIfDisposed(); |
|
if (!isStopped) |
|
{ |
|
var listObserver = outObserver as ListObserver<T>; |
|
if (listObserver != null) |
|
{ |
|
outObserver = listObserver.Add(observer); |
|
} |
|
else |
|
{ |
|
var current = outObserver; |
|
if (current is EmptyObserver<T>) |
|
{ |
|
outObserver = observer; |
|
} |
|
else |
|
{ |
|
outObserver = new ListObserver<T>(new ImmutableList<IObserver<T>>(new[] { current, observer })); |
|
} |
|
} |
|
|
|
subscription = new Subscription(this, observer); |
|
} |
|
|
|
ex = lastError; |
|
Trim(); |
|
foreach (var item in queue) |
|
{ |
|
observer.OnNext(item.Value); |
|
} |
|
} |
|
|
|
if (subscription != null) |
|
{ |
|
return subscription; |
|
} |
|
else if (ex != null) |
|
{ |
|
observer.OnError(ex); |
|
} |
|
else |
|
{ |
|
observer.OnCompleted(); |
|
} |
|
|
|
return Disposable.Empty; |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
lock (observerLock) |
|
{ |
|
isDisposed = true; |
|
outObserver = DisposedObserver<T>.Instance; |
|
lastError = null; |
|
queue = null; |
|
} |
|
} |
|
|
|
void ThrowIfDisposed() |
|
{ |
|
if (isDisposed) throw new ObjectDisposedException(""); |
|
} |
|
|
|
public bool IsRequiredSubscribeOnCurrentThread() |
|
{ |
|
return false; |
|
} |
|
|
|
class Subscription : IDisposable |
|
{ |
|
readonly object gate = new object(); |
|
ReplaySubject<T> parent; |
|
IObserver<T> unsubscribeTarget; |
|
|
|
public Subscription(ReplaySubject<T> parent, IObserver<T> unsubscribeTarget) |
|
{ |
|
this.parent = parent; |
|
this.unsubscribeTarget = unsubscribeTarget; |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
lock (gate) |
|
{ |
|
if (parent != null) |
|
{ |
|
lock (parent.observerLock) |
|
{ |
|
var listObserver = parent.outObserver as ListObserver<T>; |
|
if (listObserver != null) |
|
{ |
|
parent.outObserver = listObserver.Remove(unsubscribeTarget); |
|
} |
|
else |
|
{ |
|
parent.outObserver = EmptyObserver<T>.Instance; |
|
} |
|
|
|
unsubscribeTarget = null; |
|
parent = null; |
|
} |
|
} |
|
} |
|
} |
|
} |
|
} |
|
} |