You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
42 lines
1.4 KiB
42 lines
1.4 KiB
1 year ago
|
using System;
|
||
|
|
||
|
namespace UniRx.Operators
|
||
|
{
|
||
|
// implements note : all field must be readonly.
|
||
|
public abstract class OperatorObservableBase<T> : IObservable<T>, IOptimizedObservable<T>
|
||
|
{
|
||
|
readonly bool isRequiredSubscribeOnCurrentThread;
|
||
|
|
||
|
public OperatorObservableBase(bool isRequiredSubscribeOnCurrentThread)
|
||
|
{
|
||
|
this.isRequiredSubscribeOnCurrentThread = isRequiredSubscribeOnCurrentThread;
|
||
|
}
|
||
|
|
||
|
public bool IsRequiredSubscribeOnCurrentThread()
|
||
|
{
|
||
|
return isRequiredSubscribeOnCurrentThread;
|
||
|
}
|
||
|
|
||
|
public IDisposable Subscribe(IObserver<T> observer)
|
||
|
{
|
||
|
var subscription = new SingleAssignmentDisposable();
|
||
|
|
||
|
// note:
|
||
|
// does not make the safe observer, it breaks exception durability.
|
||
|
// var safeObserver = Observer.CreateAutoDetachObserver<T>(observer, subscription);
|
||
|
|
||
|
if (isRequiredSubscribeOnCurrentThread && Scheduler.IsCurrentThreadSchedulerScheduleRequired)
|
||
|
{
|
||
|
Scheduler.CurrentThread.Schedule(() => subscription.Disposable = SubscribeCore(observer, subscription));
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
subscription.Disposable = SubscribeCore(observer, subscription);
|
||
|
}
|
||
|
|
||
|
return subscription;
|
||
|
}
|
||
|
|
||
|
protected abstract IDisposable SubscribeCore(IObserver<T> observer, IDisposable cancel);
|
||
|
}
|
||
|
}
|