You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1004 lines
32 KiB
1004 lines
32 KiB
using System; |
|
using System.Collections.Generic; |
|
|
|
namespace UniRx.Operators |
|
{ |
|
public delegate TR ZipFunc<T1, T2, T3, TR>(T1 arg1, T2 arg2, T3 arg3); |
|
public delegate TR ZipFunc<T1, T2, T3, T4, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4); |
|
public delegate TR ZipFunc<T1, T2, T3, T4, T5, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5); |
|
public delegate TR ZipFunc<T1, T2, T3, T4, T5, T6, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6); |
|
public delegate TR ZipFunc<T1, T2, T3, T4, T5, T6, T7, TR>(T1 arg1, T2 arg2, T3 arg3, T4 arg4, T5 arg5, T6 arg6, T7 arg7); |
|
|
|
// binary |
|
internal class ZipObservable<TLeft, TRight, TResult> : OperatorObservableBase<TResult> |
|
{ |
|
readonly IObservable<TLeft> left; |
|
readonly IObservable<TRight> right; |
|
readonly Func<TLeft, TRight, TResult> selector; |
|
|
|
public ZipObservable(IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, TRight, TResult> selector) |
|
: base(left.IsRequiredSubscribeOnCurrentThread() || right.IsRequiredSubscribeOnCurrentThread()) |
|
{ |
|
this.left = left; |
|
this.right = right; |
|
this.selector = selector; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<TResult> observer, IDisposable cancel) |
|
{ |
|
return new Zip(this, observer, cancel).Run(); |
|
} |
|
|
|
class Zip : OperatorObserverBase<TResult, TResult> |
|
{ |
|
readonly ZipObservable<TLeft, TRight, TResult> parent; |
|
|
|
readonly object gate = new object(); |
|
readonly Queue<TLeft> leftQ = new Queue<TLeft>(); |
|
bool leftCompleted = false; |
|
readonly Queue<TRight> rightQ = new Queue<TRight>(); |
|
bool rightCompleted = false; |
|
|
|
public Zip(ZipObservable<TLeft, TRight, TResult> parent, IObserver<TResult> observer, IDisposable cancel) |
|
: base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
var l = parent.left.Subscribe(new LeftZipObserver(this)); |
|
var r = parent.right.Subscribe(new RightZipObserver(this)); |
|
|
|
return StableCompositeDisposable.Create(l, r, Disposable.Create(() => |
|
{ |
|
lock (gate) |
|
{ |
|
leftQ.Clear(); |
|
rightQ.Clear(); |
|
} |
|
})); |
|
} |
|
|
|
// dequeue is in the lock |
|
void Dequeue() |
|
{ |
|
TLeft lv; |
|
TRight rv; |
|
TResult v; |
|
|
|
if (leftQ.Count != 0 && rightQ.Count != 0) |
|
{ |
|
lv = leftQ.Dequeue(); |
|
rv = rightQ.Dequeue(); |
|
} |
|
else if (leftCompleted || rightCompleted) |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
return; |
|
} |
|
else |
|
{ |
|
return; |
|
} |
|
|
|
try |
|
{ |
|
v = parent.selector(lv, rv); |
|
} |
|
catch (Exception ex) |
|
{ |
|
try { observer.OnError(ex); } |
|
finally { Dispose(); } |
|
return; |
|
} |
|
|
|
OnNext(v); |
|
} |
|
|
|
public override void OnNext(TResult value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
|
|
class LeftZipObserver : IObserver<TLeft> |
|
{ |
|
readonly Zip parent; |
|
|
|
public LeftZipObserver(Zip parent) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public void OnNext(TLeft value) |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.leftQ.Enqueue(value); |
|
parent.Dequeue(); |
|
} |
|
} |
|
|
|
public void OnError(Exception ex) |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.OnError(ex); |
|
} |
|
} |
|
|
|
public void OnCompleted() |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.leftCompleted = true; |
|
if (parent.rightCompleted) parent.OnCompleted(); |
|
} |
|
} |
|
} |
|
|
|
class RightZipObserver : IObserver<TRight> |
|
{ |
|
readonly Zip parent; |
|
|
|
public RightZipObserver(Zip parent) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public void OnNext(TRight value) |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.rightQ.Enqueue(value); |
|
parent.Dequeue(); |
|
} |
|
} |
|
|
|
public void OnError(Exception ex) |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.OnError(ex); |
|
} |
|
} |
|
|
|
public void OnCompleted() |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.rightCompleted = true; |
|
if (parent.leftCompleted) parent.OnCompleted(); |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
// array |
|
internal class ZipObservable<T> : OperatorObservableBase<IList<T>> |
|
{ |
|
readonly IObservable<T>[] sources; |
|
|
|
public ZipObservable(IObservable<T>[] sources) |
|
: base(true) |
|
{ |
|
this.sources = sources; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<IList<T>> observer, IDisposable cancel) |
|
{ |
|
return new Zip(this, observer, cancel).Run(); |
|
} |
|
|
|
class Zip : OperatorObserverBase<IList<T>, IList<T>> |
|
{ |
|
readonly ZipObservable<T> parent; |
|
readonly object gate = new object(); |
|
|
|
Queue<T>[] queues; |
|
bool[] isDone; |
|
int length; |
|
|
|
public Zip(ZipObservable<T> parent, IObserver<IList<T>> observer, IDisposable cancel) : base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
length = parent.sources.Length; |
|
queues = new Queue<T>[length]; |
|
isDone = new bool[length]; |
|
|
|
for (int i = 0; i < length; i++) |
|
{ |
|
queues[i] = new Queue<T>(); |
|
} |
|
|
|
var disposables = new IDisposable[length + 1]; |
|
for (int i = 0; i < length; i++) |
|
{ |
|
var source = parent.sources[i]; |
|
disposables[i] = source.Subscribe(new ZipObserver(this, i)); |
|
} |
|
|
|
disposables[length] = Disposable.Create(() => |
|
{ |
|
lock (gate) |
|
{ |
|
for (int i = 0; i < length; i++) |
|
{ |
|
var q = queues[i]; |
|
q.Clear(); |
|
} |
|
} |
|
}); |
|
|
|
return StableCompositeDisposable.CreateUnsafe(disposables); |
|
} |
|
|
|
// dequeue is in the lock |
|
void Dequeue(int index) |
|
{ |
|
var allQueueHasValue = true; |
|
for (int i = 0; i < length; i++) |
|
{ |
|
if (queues[i].Count == 0) |
|
{ |
|
allQueueHasValue = false; |
|
break; |
|
} |
|
} |
|
|
|
if (!allQueueHasValue) |
|
{ |
|
var allCompletedWithoutSelf = true; |
|
for (int i = 0; i < length; i++) |
|
{ |
|
if (i == index) continue; |
|
if (!isDone[i]) |
|
{ |
|
allCompletedWithoutSelf = false; |
|
break; |
|
} |
|
} |
|
|
|
if (allCompletedWithoutSelf) |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
return; |
|
} |
|
else |
|
{ |
|
return; |
|
} |
|
} |
|
|
|
var array = new T[length]; |
|
for (int i = 0; i < length; i++) |
|
{ |
|
array[i] = queues[i].Dequeue(); |
|
} |
|
|
|
OnNext(array); |
|
} |
|
|
|
public override void OnNext(IList<T> value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
|
|
class ZipObserver : IObserver<T> |
|
{ |
|
readonly Zip parent; |
|
readonly int index; |
|
|
|
public ZipObserver(Zip parent, int index) |
|
{ |
|
this.parent = parent; |
|
this.index = index; |
|
} |
|
|
|
public void OnNext(T value) |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.queues[index].Enqueue(value); |
|
parent.Dequeue(index); |
|
} |
|
} |
|
|
|
public void OnError(Exception ex) |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.OnError(ex); |
|
} |
|
} |
|
|
|
public void OnCompleted() |
|
{ |
|
lock (parent.gate) |
|
{ |
|
parent.isDone[index] = true; |
|
var allTrue = true; |
|
for (int i = 0; i < parent.length; i++) |
|
{ |
|
if (!parent.isDone[i]) |
|
{ |
|
allTrue = false; |
|
break; |
|
} |
|
} |
|
|
|
if (allTrue) |
|
{ |
|
parent.OnCompleted(); |
|
} |
|
} |
|
} |
|
} |
|
} |
|
} |
|
|
|
// Generated from UniRx.Console.ZipGenerator.tt |
|
#region NTH |
|
|
|
internal class ZipObservable<T1, T2, T3, TR> : OperatorObservableBase<TR> |
|
{ |
|
IObservable<T1> source1; |
|
IObservable<T2> source2; |
|
IObservable<T3> source3; |
|
ZipFunc<T1, T2, T3, TR> resultSelector; |
|
|
|
public ZipObservable( |
|
IObservable<T1> source1, |
|
IObservable<T2> source2, |
|
IObservable<T3> source3, |
|
ZipFunc<T1, T2, T3, TR> resultSelector) |
|
: base( |
|
source1.IsRequiredSubscribeOnCurrentThread() || |
|
source2.IsRequiredSubscribeOnCurrentThread() || |
|
source3.IsRequiredSubscribeOnCurrentThread() || |
|
false) |
|
{ |
|
this.source1 = source1; |
|
this.source2 = source2; |
|
this.source3 = source3; |
|
this.resultSelector = resultSelector; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel) |
|
{ |
|
return new Zip(this, observer, cancel).Run(); |
|
} |
|
|
|
class Zip : NthZipObserverBase<TR> |
|
{ |
|
readonly ZipObservable<T1, T2, T3, TR> parent; |
|
readonly object gate = new object(); |
|
readonly Queue<T1> q1 = new Queue<T1>(); |
|
readonly Queue<T2> q2 = new Queue<T2>(); |
|
readonly Queue<T3> q3 = new Queue<T3>(); |
|
|
|
public Zip(ZipObservable<T1, T2, T3, TR> parent, IObserver<TR> observer, IDisposable cancel) |
|
: base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
base.SetQueue(new System.Collections.ICollection[] { q1, q2, q3 }); |
|
var s1 = parent.source1.Subscribe(new ZipObserver<T1>(gate, this, 0, q1)); |
|
var s2 = parent.source2.Subscribe(new ZipObserver<T2>(gate, this, 1, q2)); |
|
var s3 = parent.source3.Subscribe(new ZipObserver<T3>(gate, this, 2, q3)); |
|
|
|
return StableCompositeDisposable.Create(s1, s2, s3, Disposable.Create(() => |
|
{ |
|
lock (gate) |
|
{ |
|
q1.Clear(); q2.Clear(); q3.Clear(); |
|
} |
|
})); |
|
} |
|
|
|
public override TR GetResult() |
|
{ |
|
return parent.resultSelector(q1.Dequeue(), q2.Dequeue(), q3.Dequeue()); |
|
} |
|
|
|
public override void OnNext(TR value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
} |
|
} |
|
|
|
|
|
internal class ZipObservable<T1, T2, T3, T4, TR> : OperatorObservableBase<TR> |
|
{ |
|
IObservable<T1> source1; |
|
IObservable<T2> source2; |
|
IObservable<T3> source3; |
|
IObservable<T4> source4; |
|
ZipFunc<T1, T2, T3, T4, TR> resultSelector; |
|
|
|
public ZipObservable( |
|
IObservable<T1> source1, |
|
IObservable<T2> source2, |
|
IObservable<T3> source3, |
|
IObservable<T4> source4, |
|
ZipFunc<T1, T2, T3, T4, TR> resultSelector) |
|
: base( |
|
source1.IsRequiredSubscribeOnCurrentThread() || |
|
source2.IsRequiredSubscribeOnCurrentThread() || |
|
source3.IsRequiredSubscribeOnCurrentThread() || |
|
source4.IsRequiredSubscribeOnCurrentThread() || |
|
false) |
|
{ |
|
this.source1 = source1; |
|
this.source2 = source2; |
|
this.source3 = source3; |
|
this.source4 = source4; |
|
this.resultSelector = resultSelector; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel) |
|
{ |
|
return new Zip(this, observer, cancel).Run(); |
|
} |
|
|
|
class Zip : NthZipObserverBase<TR> |
|
{ |
|
readonly ZipObservable<T1, T2, T3, T4, TR> parent; |
|
readonly object gate = new object(); |
|
readonly Queue<T1> q1 = new Queue<T1>(); |
|
readonly Queue<T2> q2 = new Queue<T2>(); |
|
readonly Queue<T3> q3 = new Queue<T3>(); |
|
readonly Queue<T4> q4 = new Queue<T4>(); |
|
|
|
public Zip(ZipObservable<T1, T2, T3, T4, TR> parent, IObserver<TR> observer, IDisposable cancel) |
|
: base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
base.SetQueue(new System.Collections.ICollection[] { q1, q2, q3, q4 }); |
|
var s1 = parent.source1.Subscribe(new ZipObserver<T1>(gate, this, 0, q1)); |
|
var s2 = parent.source2.Subscribe(new ZipObserver<T2>(gate, this, 1, q2)); |
|
var s3 = parent.source3.Subscribe(new ZipObserver<T3>(gate, this, 2, q3)); |
|
var s4 = parent.source4.Subscribe(new ZipObserver<T4>(gate, this, 3, q4)); |
|
|
|
return StableCompositeDisposable.Create(s1, s2, s3, s4, Disposable.Create(() => |
|
{ |
|
lock (gate) |
|
{ |
|
q1.Clear(); q2.Clear(); q3.Clear(); q4.Clear(); |
|
} |
|
})); |
|
} |
|
|
|
public override TR GetResult() |
|
{ |
|
return parent.resultSelector(q1.Dequeue(), q2.Dequeue(), q3.Dequeue(), q4.Dequeue()); |
|
} |
|
|
|
public override void OnNext(TR value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
} |
|
} |
|
|
|
|
|
internal class ZipObservable<T1, T2, T3, T4, T5, TR> : OperatorObservableBase<TR> |
|
{ |
|
IObservable<T1> source1; |
|
IObservable<T2> source2; |
|
IObservable<T3> source3; |
|
IObservable<T4> source4; |
|
IObservable<T5> source5; |
|
ZipFunc<T1, T2, T3, T4, T5, TR> resultSelector; |
|
|
|
public ZipObservable( |
|
IObservable<T1> source1, |
|
IObservable<T2> source2, |
|
IObservable<T3> source3, |
|
IObservable<T4> source4, |
|
IObservable<T5> source5, |
|
ZipFunc<T1, T2, T3, T4, T5, TR> resultSelector) |
|
: base( |
|
source1.IsRequiredSubscribeOnCurrentThread() || |
|
source2.IsRequiredSubscribeOnCurrentThread() || |
|
source3.IsRequiredSubscribeOnCurrentThread() || |
|
source4.IsRequiredSubscribeOnCurrentThread() || |
|
source5.IsRequiredSubscribeOnCurrentThread() || |
|
false) |
|
{ |
|
this.source1 = source1; |
|
this.source2 = source2; |
|
this.source3 = source3; |
|
this.source4 = source4; |
|
this.source5 = source5; |
|
this.resultSelector = resultSelector; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel) |
|
{ |
|
return new Zip(this, observer, cancel).Run(); |
|
} |
|
|
|
class Zip : NthZipObserverBase<TR> |
|
{ |
|
readonly ZipObservable<T1, T2, T3, T4, T5, TR> parent; |
|
readonly object gate = new object(); |
|
readonly Queue<T1> q1 = new Queue<T1>(); |
|
readonly Queue<T2> q2 = new Queue<T2>(); |
|
readonly Queue<T3> q3 = new Queue<T3>(); |
|
readonly Queue<T4> q4 = new Queue<T4>(); |
|
readonly Queue<T5> q5 = new Queue<T5>(); |
|
|
|
public Zip(ZipObservable<T1, T2, T3, T4, T5, TR> parent, IObserver<TR> observer, IDisposable cancel) |
|
: base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
base.SetQueue(new System.Collections.ICollection[] { q1, q2, q3, q4, q5 }); |
|
var s1 = parent.source1.Subscribe(new ZipObserver<T1>(gate, this, 0, q1)); |
|
var s2 = parent.source2.Subscribe(new ZipObserver<T2>(gate, this, 1, q2)); |
|
var s3 = parent.source3.Subscribe(new ZipObserver<T3>(gate, this, 2, q3)); |
|
var s4 = parent.source4.Subscribe(new ZipObserver<T4>(gate, this, 3, q4)); |
|
var s5 = parent.source5.Subscribe(new ZipObserver<T5>(gate, this, 4, q5)); |
|
|
|
return StableCompositeDisposable.Create(s1, s2, s3, s4, s5, Disposable.Create(() => |
|
{ |
|
lock (gate) |
|
{ |
|
q1.Clear(); q2.Clear(); q3.Clear(); q4.Clear(); q5.Clear(); |
|
} |
|
})); |
|
} |
|
|
|
public override TR GetResult() |
|
{ |
|
return parent.resultSelector(q1.Dequeue(), q2.Dequeue(), q3.Dequeue(), q4.Dequeue(), q5.Dequeue()); |
|
} |
|
|
|
public override void OnNext(TR value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
} |
|
} |
|
|
|
|
|
internal class ZipObservable<T1, T2, T3, T4, T5, T6, TR> : OperatorObservableBase<TR> |
|
{ |
|
IObservable<T1> source1; |
|
IObservable<T2> source2; |
|
IObservable<T3> source3; |
|
IObservable<T4> source4; |
|
IObservable<T5> source5; |
|
IObservable<T6> source6; |
|
ZipFunc<T1, T2, T3, T4, T5, T6, TR> resultSelector; |
|
|
|
public ZipObservable( |
|
IObservable<T1> source1, |
|
IObservable<T2> source2, |
|
IObservable<T3> source3, |
|
IObservable<T4> source4, |
|
IObservable<T5> source5, |
|
IObservable<T6> source6, |
|
ZipFunc<T1, T2, T3, T4, T5, T6, TR> resultSelector) |
|
: base( |
|
source1.IsRequiredSubscribeOnCurrentThread() || |
|
source2.IsRequiredSubscribeOnCurrentThread() || |
|
source3.IsRequiredSubscribeOnCurrentThread() || |
|
source4.IsRequiredSubscribeOnCurrentThread() || |
|
source5.IsRequiredSubscribeOnCurrentThread() || |
|
source6.IsRequiredSubscribeOnCurrentThread() || |
|
false) |
|
{ |
|
this.source1 = source1; |
|
this.source2 = source2; |
|
this.source3 = source3; |
|
this.source4 = source4; |
|
this.source5 = source5; |
|
this.source6 = source6; |
|
this.resultSelector = resultSelector; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel) |
|
{ |
|
return new Zip(this, observer, cancel).Run(); |
|
} |
|
|
|
class Zip : NthZipObserverBase<TR> |
|
{ |
|
readonly ZipObservable<T1, T2, T3, T4, T5, T6, TR> parent; |
|
readonly object gate = new object(); |
|
readonly Queue<T1> q1 = new Queue<T1>(); |
|
readonly Queue<T2> q2 = new Queue<T2>(); |
|
readonly Queue<T3> q3 = new Queue<T3>(); |
|
readonly Queue<T4> q4 = new Queue<T4>(); |
|
readonly Queue<T5> q5 = new Queue<T5>(); |
|
readonly Queue<T6> q6 = new Queue<T6>(); |
|
|
|
public Zip(ZipObservable<T1, T2, T3, T4, T5, T6, TR> parent, IObserver<TR> observer, IDisposable cancel) |
|
: base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
base.SetQueue(new System.Collections.ICollection[] { q1, q2, q3, q4, q5, q6 }); |
|
var s1 = parent.source1.Subscribe(new ZipObserver<T1>(gate, this, 0, q1)); |
|
var s2 = parent.source2.Subscribe(new ZipObserver<T2>(gate, this, 1, q2)); |
|
var s3 = parent.source3.Subscribe(new ZipObserver<T3>(gate, this, 2, q3)); |
|
var s4 = parent.source4.Subscribe(new ZipObserver<T4>(gate, this, 3, q4)); |
|
var s5 = parent.source5.Subscribe(new ZipObserver<T5>(gate, this, 4, q5)); |
|
var s6 = parent.source6.Subscribe(new ZipObserver<T6>(gate, this, 5, q6)); |
|
|
|
return StableCompositeDisposable.Create(s1, s2, s3, s4, s5, s6, Disposable.Create(() => |
|
{ |
|
lock (gate) |
|
{ |
|
q1.Clear(); q2.Clear(); q3.Clear(); q4.Clear(); q5.Clear(); q6.Clear(); |
|
} |
|
})); |
|
} |
|
|
|
public override TR GetResult() |
|
{ |
|
return parent.resultSelector(q1.Dequeue(), q2.Dequeue(), q3.Dequeue(), q4.Dequeue(), q5.Dequeue(), q6.Dequeue()); |
|
} |
|
|
|
public override void OnNext(TR value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
} |
|
} |
|
|
|
|
|
internal class ZipObservable<T1, T2, T3, T4, T5, T6, T7, TR> : OperatorObservableBase<TR> |
|
{ |
|
IObservable<T1> source1; |
|
IObservable<T2> source2; |
|
IObservable<T3> source3; |
|
IObservable<T4> source4; |
|
IObservable<T5> source5; |
|
IObservable<T6> source6; |
|
IObservable<T7> source7; |
|
ZipFunc<T1, T2, T3, T4, T5, T6, T7, TR> resultSelector; |
|
|
|
public ZipObservable( |
|
IObservable<T1> source1, |
|
IObservable<T2> source2, |
|
IObservable<T3> source3, |
|
IObservable<T4> source4, |
|
IObservable<T5> source5, |
|
IObservable<T6> source6, |
|
IObservable<T7> source7, |
|
ZipFunc<T1, T2, T3, T4, T5, T6, T7, TR> resultSelector) |
|
: base( |
|
source1.IsRequiredSubscribeOnCurrentThread() || |
|
source2.IsRequiredSubscribeOnCurrentThread() || |
|
source3.IsRequiredSubscribeOnCurrentThread() || |
|
source4.IsRequiredSubscribeOnCurrentThread() || |
|
source5.IsRequiredSubscribeOnCurrentThread() || |
|
source6.IsRequiredSubscribeOnCurrentThread() || |
|
source7.IsRequiredSubscribeOnCurrentThread() || |
|
false) |
|
{ |
|
this.source1 = source1; |
|
this.source2 = source2; |
|
this.source3 = source3; |
|
this.source4 = source4; |
|
this.source5 = source5; |
|
this.source6 = source6; |
|
this.source7 = source7; |
|
this.resultSelector = resultSelector; |
|
} |
|
|
|
protected override IDisposable SubscribeCore(IObserver<TR> observer, IDisposable cancel) |
|
{ |
|
return new Zip(this, observer, cancel).Run(); |
|
} |
|
|
|
class Zip : NthZipObserverBase<TR> |
|
{ |
|
readonly ZipObservable<T1, T2, T3, T4, T5, T6, T7, TR> parent; |
|
readonly object gate = new object(); |
|
readonly Queue<T1> q1 = new Queue<T1>(); |
|
readonly Queue<T2> q2 = new Queue<T2>(); |
|
readonly Queue<T3> q3 = new Queue<T3>(); |
|
readonly Queue<T4> q4 = new Queue<T4>(); |
|
readonly Queue<T5> q5 = new Queue<T5>(); |
|
readonly Queue<T6> q6 = new Queue<T6>(); |
|
readonly Queue<T7> q7 = new Queue<T7>(); |
|
|
|
public Zip(ZipObservable<T1, T2, T3, T4, T5, T6, T7, TR> parent, IObserver<TR> observer, IDisposable cancel) |
|
: base(observer, cancel) |
|
{ |
|
this.parent = parent; |
|
} |
|
|
|
public IDisposable Run() |
|
{ |
|
base.SetQueue(new System.Collections.ICollection[] { q1, q2, q3, q4, q5, q6, q7 }); |
|
var s1 = parent.source1.Subscribe(new ZipObserver<T1>(gate, this, 0, q1)); |
|
var s2 = parent.source2.Subscribe(new ZipObserver<T2>(gate, this, 1, q2)); |
|
var s3 = parent.source3.Subscribe(new ZipObserver<T3>(gate, this, 2, q3)); |
|
var s4 = parent.source4.Subscribe(new ZipObserver<T4>(gate, this, 3, q4)); |
|
var s5 = parent.source5.Subscribe(new ZipObserver<T5>(gate, this, 4, q5)); |
|
var s6 = parent.source6.Subscribe(new ZipObserver<T6>(gate, this, 5, q6)); |
|
var s7 = parent.source7.Subscribe(new ZipObserver<T7>(gate, this, 6, q7)); |
|
|
|
return StableCompositeDisposable.Create(s1, s2, s3, s4, s5, s6, s7, Disposable.Create(() => |
|
{ |
|
lock (gate) |
|
{ |
|
q1.Clear(); q2.Clear(); q3.Clear(); q4.Clear(); q5.Clear(); q6.Clear(); q7.Clear(); |
|
} |
|
})); |
|
} |
|
|
|
public override TR GetResult() |
|
{ |
|
return parent.resultSelector(q1.Dequeue(), q2.Dequeue(), q3.Dequeue(), q4.Dequeue(), q5.Dequeue(), q6.Dequeue(), q7.Dequeue()); |
|
} |
|
|
|
public override void OnNext(TR value) |
|
{ |
|
base.observer.OnNext(value); |
|
} |
|
|
|
public override void OnError(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
|
|
public override void OnCompleted() |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
} |
|
} |
|
|
|
#endregion |
|
|
|
// Nth infrastructure |
|
|
|
internal interface IZipObservable |
|
{ |
|
void Dequeue(int index); |
|
void Fail(Exception error); |
|
void Done(int index); |
|
} |
|
|
|
internal abstract class NthZipObserverBase<T> : OperatorObserverBase<T, T>, IZipObservable |
|
{ |
|
System.Collections.ICollection[] queues; |
|
bool[] isDone; |
|
int length; |
|
|
|
public NthZipObserverBase(IObserver<T> observer, IDisposable cancel) : base(observer, cancel) |
|
{ |
|
} |
|
|
|
protected void SetQueue(System.Collections.ICollection[] queues) |
|
{ |
|
this.queues = queues; |
|
this.length = queues.Length; |
|
this.isDone = new bool[length]; |
|
} |
|
|
|
public abstract T GetResult(); |
|
|
|
// operators in lock |
|
public void Dequeue(int index) |
|
{ |
|
var allQueueHasValue = true; |
|
for (int i = 0; i < length; i++) |
|
{ |
|
if (queues[i].Count == 0) |
|
{ |
|
allQueueHasValue = false; |
|
break; |
|
} |
|
} |
|
|
|
if (!allQueueHasValue) |
|
{ |
|
var allCompletedWithoutSelf = true; |
|
for (int i = 0; i < length; i++) |
|
{ |
|
if (i == index) continue; |
|
if (!isDone[i]) |
|
{ |
|
allCompletedWithoutSelf = false; |
|
break; |
|
} |
|
} |
|
|
|
if (allCompletedWithoutSelf) |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
return; |
|
} |
|
else |
|
{ |
|
return; |
|
} |
|
} |
|
|
|
var result = default(T); |
|
try |
|
{ |
|
result = GetResult(); |
|
} |
|
catch (Exception ex) |
|
{ |
|
try { observer.OnError(ex); } |
|
finally { Dispose(); } |
|
return; |
|
} |
|
OnNext(result); |
|
} |
|
|
|
public void Done(int index) |
|
{ |
|
isDone[index] = true; |
|
var allTrue = true; |
|
for (int i = 0; i < length; i++) |
|
{ |
|
if (!isDone[i]) |
|
{ |
|
allTrue = false; |
|
break; |
|
} |
|
} |
|
|
|
if (allTrue) |
|
{ |
|
try { observer.OnCompleted(); } |
|
finally { Dispose(); } |
|
} |
|
} |
|
|
|
public void Fail(Exception error) |
|
{ |
|
try { observer.OnError(error); } |
|
finally { Dispose(); } |
|
} |
|
} |
|
|
|
|
|
// nth |
|
internal class ZipObserver<T> : IObserver<T> |
|
{ |
|
readonly object gate; |
|
readonly IZipObservable parent; |
|
readonly int index; |
|
readonly Queue<T> queue; |
|
|
|
public ZipObserver(object gate, IZipObservable parent, int index, Queue<T> queue) |
|
{ |
|
this.gate = gate; |
|
this.parent = parent; |
|
this.index = index; |
|
this.queue = queue; |
|
} |
|
|
|
public void OnNext(T value) |
|
{ |
|
lock (gate) |
|
{ |
|
queue.Enqueue(value); |
|
parent.Dequeue(index); |
|
} |
|
} |
|
|
|
public void OnError(Exception error) |
|
{ |
|
lock (gate) |
|
{ |
|
parent.Fail(error); |
|
} |
|
} |
|
|
|
public void OnCompleted() |
|
{ |
|
lock (gate) |
|
{ |
|
parent.Done(index); |
|
} |
|
} |
|
} |
|
} |