上海虹口龙之梦项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

182 lines
5.9 KiB

using System;
using System.Collections.Generic;
namespace UniRx.Operators
{
internal class AmbObservable<T> : OperatorObservableBase<T>
{
readonly IObservable<T> source;
readonly IObservable<T> second;
public AmbObservable(IObservable<T> source, IObservable<T> second)
: base(source.IsRequiredSubscribeOnCurrentThread() || second.IsRequiredSubscribeOnCurrentThread())
{
this.source = source;
this.second = second;
}
protected override IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel)
{
return new AmbOuterObserver(this, observer, cancel).Run();
}
class AmbOuterObserver : OperatorObserverBase<T, T>
{
enum AmbState
{
Left, Right, Neither
}
readonly AmbObservable<T> parent;
readonly object gate = new object();
SingleAssignmentDisposable leftSubscription;
SingleAssignmentDisposable rightSubscription;
AmbState choice = AmbState.Neither;
public AmbOuterObserver(AmbObservable<T> parent, IObserver<T> observer, IDisposable cancel)
: base(observer, cancel)
{
this.parent = parent;
}
public IDisposable Run()
{
leftSubscription = new SingleAssignmentDisposable();
rightSubscription = new SingleAssignmentDisposable();
var d = StableCompositeDisposable.Create(leftSubscription, rightSubscription);
var left = new Amb();
left.targetDisposable = d;
left.targetObserver = new AmbDecisionObserver(this, AmbState.Left, rightSubscription, left);
var right = new Amb();
right.targetDisposable = d;
right.targetObserver = new AmbDecisionObserver(this, AmbState.Right, leftSubscription, right);
leftSubscription.Disposable = parent.source.Subscribe(left);
rightSubscription.Disposable = parent.second.Subscribe(right);
return d;
}
public override void OnNext(T value)
{
// no use
}
public override void OnError(Exception error)
{
// no use
}
public override void OnCompleted()
{
// no use
}
class Amb : IObserver<T>
{
public IObserver<T> targetObserver;
public IDisposable targetDisposable;
public void OnNext(T value)
{
targetObserver.OnNext(value);
}
public void OnError(Exception error)
{
try
{
targetObserver.OnError(error);
}
finally
{
targetObserver = UniRx.InternalUtil.EmptyObserver<T>.Instance;
targetDisposable.Dispose();
}
}
public void OnCompleted()
{
try
{
targetObserver.OnCompleted();
}
finally
{
targetObserver = UniRx.InternalUtil.EmptyObserver<T>.Instance;
targetDisposable.Dispose();
}
}
}
class AmbDecisionObserver : IObserver<T>
{
readonly AmbOuterObserver parent;
readonly AmbState me;
readonly IDisposable otherSubscription;
readonly Amb self;
public AmbDecisionObserver(AmbOuterObserver parent, AmbState me, IDisposable otherSubscription, Amb self)
{
this.parent = parent;
this.me = me;
this.otherSubscription = otherSubscription;
this.self = self;
}
public void OnNext(T value)
{
lock (parent.gate)
{
if (parent.choice == AmbState.Neither)
{
parent.choice = me;
otherSubscription.Dispose();
self.targetObserver = parent.observer;
}
if (parent.choice == me) self.targetObserver.OnNext(value);
}
}
public void OnError(Exception error)
{
lock (parent.gate)
{
if (parent.choice == AmbState.Neither)
{
parent.choice = me;
otherSubscription.Dispose();
self.targetObserver = parent.observer;
}
if (parent.choice == me)
{
self.targetObserver.OnError(error);
}
}
}
public void OnCompleted()
{
lock (parent.gate)
{
if (parent.choice == AmbState.Neither)
{
parent.choice = me;
otherSubscription.Dispose();
self.targetObserver = parent.observer;
}
if (parent.choice == me)
{
self.targetObserver.OnCompleted();
}
}
}
}
}
}
}