You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
210 lines
6.7 KiB
210 lines
6.7 KiB
using System; |
|
using System.Collections.Generic; |
|
|
|
namespace UniRx.Operators |
|
{ |
|
internal class ObserveOnObservable<T> : OperatorObservableBase<T> |
|
{ |
|
readonly IObservable<T> source; |
|
readonly IScheduler scheduler; |
|
|
|
public ObserveOnObservable(IObservable<T> source, IScheduler scheduler) |
|
: base(source.IsRequiredSubscribeOnCurrentThread()) |
|
{ |
|
this.source = source; |
|
this.scheduler = scheduler; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel) |
|
{ |
|
var queueing = scheduler as ISchedulerQueueing; |
|
if (queueing == null) |
|
{ |
|
return new ObserveOn(this, observer, cancel).Run(); |
|
} |
|
else |
|
{ |
|
return new ObserveOn_(this, queueing, observer, cancel).Run(); |
|
} |
|
} |
|
|
|
class ObserveOn : OperatorObserverBase<T, T> |
|
{ |
|
class SchedulableAction : IDisposable |
|
{ |
|
public Notification<T> data; |
|
public LinkedListNode<SchedulableAction> node; |
|
public IDisposable schedule; |
|
|
|
public void Dispose() |
|
{ |
|
if (schedule != null) |
|
schedule.Dispose(); |
|
schedule = null; |
|
|
|
if (node.List != null) |
|
{ |
|
node.List.Remove(node); |
|
} |
|
} |
|
|
|
public bool IsScheduled { get { return schedule != null; } } |
|
} |
|
|
|
readonly ObserveOnObservable<T> parent; |
|
readonly LinkedList<SchedulableAction> actions = new LinkedList<SchedulableAction>(); |
|
bool isDisposed; |
|
|
|
public ObserveOn(ObserveOnObservable<T> parent, IObserver<T> observer, IDisposable cancel) : base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
isDisposed = false; |
|
|
|
var sourceDisposable = parent.source.Subscribe(this); |
|
|
|
return StableCompositeDisposable.Create(sourceDisposable, Disposable.Create(() => |
|
{ |
|
lock (actions) |
|
{ |
|
isDisposed = true; |
|
|
|
while (actions.Count > 0) |
|
{ |
|
// Dispose will both cancel the action (if not already running) |
|
// and remove it from 'actions' |
|
actions.First.Value.Dispose(); |
|
} |
|
} |
|
})); |
|
} |
|
|
|
public override void OnNext(T value) |
|
{ |
|
QueueAction(new Notification<T>.OnNextNotification(value)); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
QueueAction(new Notification<T>.OnErrorNotification(error)); |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
QueueAction(new Notification<T>.OnCompletedNotification()); |
|
} |
|
|
|
private void QueueAction(Notification<T> data) |
|
{ |
|
var action = new SchedulableAction { data = data }; |
|
lock (actions) |
|
{ |
|
if (isDisposed) return; |
|
|
|
action.node = actions.AddLast(action); |
|
ProcessNext(); |
|
} |
|
} |
|
|
|
private void ProcessNext() |
|
{ |
|
lock (actions) |
|
{ |
|
if (actions.Count == 0 || isDisposed) |
|
return; |
|
|
|
var action = actions.First.Value; |
|
|
|
if (action.IsScheduled) |
|
return; |
|
|
|
action.schedule = parent.scheduler.Schedule(() => |
|
{ |
|
try |
|
{ |
|
switch (action.data.Kind) |
|
{ |
|
case NotificationKind.OnNext: |
|
observer.OnNext(action.data.Value); |
|
break; |
|
case NotificationKind.OnError: |
|
observer.OnError(action.data.Exception); |
|
break; |
|
case NotificationKind.OnCompleted: |
|
observer.OnCompleted(); |
|
break; |
|
} |
|
} |
|
finally |
|
{ |
|
lock (actions) |
|
{ |
|
action.Dispose(); |
|
} |
|
|
|
if (action.data.Kind == NotificationKind.OnNext) |
|
ProcessNext(); |
|
else |
|
Dispose(); |
|
} |
|
}); |
|
} |
|
} |
|
} |
|
|
|
class ObserveOn_ : OperatorObserverBase<T, T> |
|
{ |
|
readonly ObserveOnObservable<T> parent; |
|
readonly ISchedulerQueueing scheduler; |
|
readonly BooleanDisposable isDisposed; |
|
readonly Action<T> onNext; |
|
|
|
public ObserveOn_(ObserveOnObservable<T> parent, ISchedulerQueueing scheduler, IObserver<T> observer, IDisposable cancel) : base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
this.scheduler = scheduler; |
|
this.isDisposed = new BooleanDisposable(); |
|
this.onNext = new Action<T>(OnNext_); // cache delegate |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
var sourceDisposable = parent.source.Subscribe(this); |
|
return StableCompositeDisposable.Create(sourceDisposable, isDisposed); |
|
} |
|
|
|
void OnNext_(T value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
void OnError_(Exception error) |
|
{ |
|
try { observer.OnError(error); } finally { Dispose(); }; |
|
} |
|
|
|
void OnCompleted_(Unit _) |
|
{ |
|
try { observer.OnCompleted(); } finally { Dispose(); }; |
|
} |
|
|
|
public override void OnNext(T value) |
|
{ |
|
scheduler.ScheduleQueueing(isDisposed, value, onNext); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
scheduler.ScheduleQueueing(isDisposed, error, OnError_); |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
scheduler.ScheduleQueueing(isDisposed, Unit.Default, OnCompleted_); |
|
} |
|
} |
|
} |
|
} |