上海虹口龙之梦项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

196 lines
6.2 KiB

using System;
using System.Collections.Generic;
using UniRx.Operators;
namespace UniRx.Operators
{
internal class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
{
readonly TKey key;
readonly IObservable<TElement> subject;
readonly RefCountDisposable refCount;
public TKey Key
{
get { return key; }
}
public GroupedObservable(TKey key, ISubject<TElement> subject, RefCountDisposable refCount)
{
this.key = key;
this.subject = subject;
this.refCount = refCount;
}
public IDisposable Subscribe(IObserver<TElement> observer)
{
var release = refCount.GetDisposable();
var subscription = subject.Subscribe(observer);
return StableCompositeDisposable.Create(release, subscription);
}
}
internal class GroupByObservable<TSource, TKey, TElement> : OperatorObservableBase<IGroupedObservable<TKey, TElement>>
{
readonly IObservable<TSource> source;
readonly Func<TSource, TKey> keySelector;
readonly Func<TSource, TElement> elementSelector;
readonly int? capacity;
readonly IEqualityComparer<TKey> comparer;
public GroupByObservable(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
: base(source.IsRequiredSubscribeOnCurrentThread())
{
this.source = source;
this.keySelector = keySelector;
this.elementSelector = elementSelector;
this.capacity = capacity;
this.comparer = comparer;
}
protected override IDisposable SubscribeCore(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
{
return new GroupBy(this, observer, cancel).Run();
}
class GroupBy : OperatorObserverBase<TSource, IGroupedObservable<TKey, TElement>>
{
readonly GroupByObservable<TSource, TKey, TElement> parent;
readonly Dictionary<TKey, ISubject<TElement>> map;
ISubject<TElement> nullKeySubject;
CompositeDisposable groupDisposable;
RefCountDisposable refCountDisposable;
public GroupBy(GroupByObservable<TSource, TKey, TElement> parent, IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
: base(observer, cancel)
{
this.parent = parent;
if (parent.capacity.HasValue)
{
map = new Dictionary<TKey, ISubject<TElement>>(parent.capacity.Value, parent.comparer);
}
else
{
map = new Dictionary<TKey, ISubject<TElement>>(parent.comparer);
}
}
public IDisposable Run()
{
groupDisposable = new CompositeDisposable();
refCountDisposable = new RefCountDisposable(groupDisposable);
groupDisposable.Add(parent.source.Subscribe(this));
return refCountDisposable;
}
public override void OnNext(TSource value)
{
var key = default(TKey);
try
{
key = parent.keySelector(value);
}
catch (Exception exception)
{
Error(exception);
return;
}
var fireNewMapEntry = false;
var writer = default(ISubject<TElement>);
try
{
if (key == null)
{
if (nullKeySubject == null)
{
nullKeySubject = new Subject<TElement>();
fireNewMapEntry = true;
}
writer = nullKeySubject;
}
else
{
if (!map.TryGetValue(key, out writer))
{
writer = new Subject<TElement>();
map.Add(key, writer);
fireNewMapEntry = true;
}
}
}
catch (Exception exception)
{
Error(exception);
return;
}
if (fireNewMapEntry)
{
var group = new GroupedObservable<TKey, TElement>(key, writer, refCountDisposable);
observer.OnNext(group);
}
var element = default(TElement);
try
{
element = parent.elementSelector(value);
}
catch (Exception exception)
{
Error(exception);
return;
}
writer.OnNext(element);
}
public override void OnError(Exception error)
{
Error(error);
}
public override void OnCompleted()
{
try
{
if (nullKeySubject != null) nullKeySubject.OnCompleted();
foreach (var s in map.Values)
{
s.OnCompleted();
}
observer.OnCompleted();
}
finally
{
Dispose();
}
}
void Error(Exception exception)
{
try
{
if (nullKeySubject != null) nullKeySubject.OnError(exception);
foreach (var s in map.Values)
{
s.OnError(exception);
}
observer.OnError(exception);
}
finally
{
Dispose();
}
}
}
}
}