You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
196 lines
6.2 KiB
196 lines
6.2 KiB
1 year ago
|
using System;
|
||
|
using System.Collections.Generic;
|
||
|
using UniRx.Operators;
|
||
|
|
||
|
namespace UniRx.Operators
|
||
|
{
|
||
|
internal class GroupedObservable<TKey, TElement> : IGroupedObservable<TKey, TElement>
|
||
|
{
|
||
|
readonly TKey key;
|
||
|
readonly IObservable<TElement> subject;
|
||
|
readonly RefCountDisposable refCount;
|
||
|
|
||
|
public TKey Key
|
||
|
{
|
||
|
get { return key; }
|
||
|
}
|
||
|
|
||
|
public GroupedObservable(TKey key, ISubject<TElement> subject, RefCountDisposable refCount)
|
||
|
{
|
||
|
this.key = key;
|
||
|
this.subject = subject;
|
||
|
this.refCount = refCount;
|
||
|
}
|
||
|
|
||
|
public IDisposable Subscribe(IObserver<TElement> observer)
|
||
|
{
|
||
|
var release = refCount.GetDisposable();
|
||
|
var subscription = subject.Subscribe(observer);
|
||
|
return StableCompositeDisposable.Create(release, subscription);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
internal class GroupByObservable<TSource, TKey, TElement> : OperatorObservableBase<IGroupedObservable<TKey, TElement>>
|
||
|
{
|
||
|
readonly IObservable<TSource> source;
|
||
|
readonly Func<TSource, TKey> keySelector;
|
||
|
readonly Func<TSource, TElement> elementSelector;
|
||
|
readonly int? capacity;
|
||
|
readonly IEqualityComparer<TKey> comparer;
|
||
|
|
||
|
public GroupByObservable(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
|
||
|
: base(source.IsRequiredSubscribeOnCurrentThread())
|
||
|
{
|
||
|
this.source = source;
|
||
|
this.keySelector = keySelector;
|
||
|
this.elementSelector = elementSelector;
|
||
|
this.capacity = capacity;
|
||
|
this.comparer = comparer;
|
||
|
}
|
||
|
|
||
|
protected override IDisposable SubscribeCore(IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
|
||
|
{
|
||
|
return new GroupBy(this, observer, cancel).Run();
|
||
|
}
|
||
|
|
||
|
class GroupBy : OperatorObserverBase<TSource, IGroupedObservable<TKey, TElement>>
|
||
|
{
|
||
|
readonly GroupByObservable<TSource, TKey, TElement> parent;
|
||
|
readonly Dictionary<TKey, ISubject<TElement>> map;
|
||
|
ISubject<TElement> nullKeySubject;
|
||
|
|
||
|
CompositeDisposable groupDisposable;
|
||
|
RefCountDisposable refCountDisposable;
|
||
|
|
||
|
public GroupBy(GroupByObservable<TSource, TKey, TElement> parent, IObserver<IGroupedObservable<TKey, TElement>> observer, IDisposable cancel)
|
||
|
: base(observer, cancel)
|
||
|
{
|
||
|
this.parent = parent;
|
||
|
if (parent.capacity.HasValue)
|
||
|
{
|
||
|
map = new Dictionary<TKey, ISubject<TElement>>(parent.capacity.Value, parent.comparer);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
map = new Dictionary<TKey, ISubject<TElement>>(parent.comparer);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
public IDisposable Run()
|
||
|
{
|
||
|
groupDisposable = new CompositeDisposable();
|
||
|
refCountDisposable = new RefCountDisposable(groupDisposable);
|
||
|
|
||
|
groupDisposable.Add(parent.source.Subscribe(this));
|
||
|
|
||
|
return refCountDisposable;
|
||
|
}
|
||
|
|
||
|
public override void OnNext(TSource value)
|
||
|
{
|
||
|
var key = default(TKey);
|
||
|
try
|
||
|
{
|
||
|
key = parent.keySelector(value);
|
||
|
}
|
||
|
catch (Exception exception)
|
||
|
{
|
||
|
Error(exception);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
var fireNewMapEntry = false;
|
||
|
var writer = default(ISubject<TElement>);
|
||
|
try
|
||
|
{
|
||
|
if (key == null)
|
||
|
{
|
||
|
if (nullKeySubject == null)
|
||
|
{
|
||
|
nullKeySubject = new Subject<TElement>();
|
||
|
fireNewMapEntry = true;
|
||
|
}
|
||
|
|
||
|
writer = nullKeySubject;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
if (!map.TryGetValue(key, out writer))
|
||
|
{
|
||
|
writer = new Subject<TElement>();
|
||
|
map.Add(key, writer);
|
||
|
fireNewMapEntry = true;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
catch (Exception exception)
|
||
|
{
|
||
|
Error(exception);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
if (fireNewMapEntry)
|
||
|
{
|
||
|
var group = new GroupedObservable<TKey, TElement>(key, writer, refCountDisposable);
|
||
|
observer.OnNext(group);
|
||
|
}
|
||
|
|
||
|
var element = default(TElement);
|
||
|
try
|
||
|
{
|
||
|
element = parent.elementSelector(value);
|
||
|
}
|
||
|
catch (Exception exception)
|
||
|
{
|
||
|
Error(exception);
|
||
|
return;
|
||
|
}
|
||
|
|
||
|
writer.OnNext(element);
|
||
|
}
|
||
|
|
||
|
public override void OnError(Exception error)
|
||
|
{
|
||
|
Error(error);
|
||
|
}
|
||
|
|
||
|
public override void OnCompleted()
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
if (nullKeySubject != null) nullKeySubject.OnCompleted();
|
||
|
|
||
|
foreach (var s in map.Values)
|
||
|
{
|
||
|
s.OnCompleted();
|
||
|
}
|
||
|
|
||
|
observer.OnCompleted();
|
||
|
}
|
||
|
finally
|
||
|
{
|
||
|
Dispose();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
void Error(Exception exception)
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
if (nullKeySubject != null) nullKeySubject.OnError(exception);
|
||
|
|
||
|
foreach (var s in map.Values)
|
||
|
{
|
||
|
s.OnError(exception);
|
||
|
}
|
||
|
|
||
|
observer.OnError(exception);
|
||
|
}
|
||
|
finally
|
||
|
{
|
||
|
Dispose();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|