// Needed for NET40 #if !NET_4_6 using System; using System.Collections.Generic; using System.Linq; using System.Threading; using LinqInternal.Collections.ThreadSafe; using LinqInternal.Core; using LinqInternal.Threading; namespace LinqInternal.Collections { [Serializable] internal sealed class Progressor : IObservable { private ProxyObservable _proxy; private TryTake _tryTake; private bool _done; public Progressor(Progressor wrapped) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } var control = 0; Predicate newFilter = item => Volatile.Read(ref control) == 0; var buffer = new SafeQueue(); wrapped.SubscribeAction ( item => { if (newFilter(item)) { buffer.Add(item); } } ); _proxy = new ProxyObservable(); _tryTake = (out T value) => { Interlocked.Increment(ref control); try { if (buffer.TryTake(out value) || wrapped.TryTake(out value)) { _proxy.OnNext(value); return true; } else { _done = wrapped._done; return false; } } finally { Interlocked.Decrement(ref control); } }; } public Progressor(IEnumerable preface, Progressor wrapped) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } if (preface == null) { throw new ArgumentNullException("preface"); } var enumerator = preface.GetEnumerator(); if (enumerator == null) { throw new ArgumentException("preface.GetEnumerator()"); } var control = 0; var guard = 0; Predicate newFilter = item => Volatile.Read(ref control) == 0; var buffer = new SafeQueue(); wrapped.SubscribeAction ( item => { if (newFilter(item)) { buffer.Add(item); } } ); _proxy = new ProxyObservable(); TryTake tryTakeReplacement = (out T value) => { Interlocked.Increment(ref control); try { if (buffer.TryTake(out value) || wrapped.TryTake(out value)) { _proxy.OnNext(value); return true; } else { _done = wrapped._done; return false; } } finally { Interlocked.Decrement(ref control); } }; _tryTake = (out T value) => { value = default(T); if (Volatile.Read(ref guard) == 0) { bool result; // We need a lock, there is no way around it. IEnumerator is just awful. Use another overload if possible. lock (enumerator) { result = enumerator.MoveNext(); if (result) { value = enumerator.Current; } } if (result) { _proxy.OnNext(value); return true; } enumerator.Dispose(); Interlocked.CompareExchange(ref guard, 1, 0); } if (Interlocked.CompareExchange(ref guard, 2, 1) == 1) { _tryTake = tryTakeReplacement; Volatile.Write(ref guard, 3); } else { ThreadingHelper.SpinWaitUntil(ref guard, 3); } var tryTake = _tryTake; return tryTake(out value); }; } public Progressor(T[] wrapped) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } var guard = 0; var index = -1; _proxy = new ProxyObservable(); TryTake tryTakeReplacement = (out T value) => { value = default(T); return false; }; _tryTake = (out T value) => { value = default(T); if (Volatile.Read(ref guard) == 0) { var currentIndex = Interlocked.Increment(ref index); if (currentIndex < wrapped.Length) { value = wrapped[currentIndex]; _proxy.OnNext(value); return true; } Interlocked.CompareExchange(ref guard, 1, 0); } if (Interlocked.CompareExchange(ref guard, 2, 1) == 1) { _tryTake = tryTakeReplacement; } return false; }; } public Progressor(T[] preface, Progressor wrapped) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } if (preface == null) { throw new ArgumentNullException("preface"); } var control = 0; var guard = 0; var index = -1; Predicate newFilter = item => Volatile.Read(ref control) == 0; var buffer = new SafeQueue(); wrapped.SubscribeAction ( item => { if (newFilter(item)) { buffer.Add(item); } } ); _proxy = new ProxyObservable(); TryTake tryTakeReplacement = (out T value) => { Interlocked.Increment(ref control); try { if (buffer.TryTake(out value) || wrapped.TryTake(out value)) { _proxy.OnNext(value); return true; } else { _done = wrapped._done; return false; } } finally { Interlocked.Decrement(ref control); } }; _tryTake = (out T value) => { if (Volatile.Read(ref guard) == 0) { var currentIndex = Interlocked.Increment(ref index); if (currentIndex < preface.Length) { value = preface[currentIndex]; _proxy.OnNext(value); return true; } Interlocked.CompareExchange(ref guard, 1, 0); } if (Interlocked.CompareExchange(ref guard, 2, 1) == 1) { _tryTake = tryTakeReplacement; Volatile.Write(ref guard, 3); } else { ThreadingHelper.SpinWaitUntil(ref guard, 3); } var tryTake = _tryTake; return tryTake(out value); }; } public Progressor(IEnumerable wrapped) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } var enumerator = wrapped.GetEnumerator(); if (enumerator == null) { throw new ArgumentException("wrapped.GetEnumerator()"); } var guard = 0; _proxy = new ProxyObservable(); TryTake tryTakeReplacement = (out T value) => { value = default(T); return false; }; _tryTake = (out T value) => { value = default(T); if (Volatile.Read(ref guard) == 0) { bool result; // We need a lock, there is no way around it. IEnumerator is just awful. Use another overload if possible. lock (enumerator) { result = enumerator.MoveNext(); if (result) { value = enumerator.Current; } } if (result) { _proxy.OnNext(value); return true; } enumerator.Dispose(); Interlocked.CompareExchange(ref guard, 1, 0); } if (Interlocked.CompareExchange(ref guard, 2, 1) == 1) { _tryTake = tryTakeReplacement; } return false; }; } public Progressor(TryTake tryTake, bool doneOnFalse) { if (tryTake == null) { throw new ArgumentNullException("tryTake"); } var tryTakeCopy = tryTake; _proxy = new ProxyObservable(); _tryTake = (out T value) => { // This is not an overridable method, and it is not being called on the constructor. if (tryTakeCopy(out value)) { _proxy.OnNext(value); return true; } _done = doneOnFalse; return false; }; } public Progressor(TryTake tryTake, Func isDone) { if (tryTake == null) { throw new ArgumentNullException("tryTake"); } if (isDone == null) { throw new ArgumentNullException("isDone"); } var tryTakeCopy = tryTake; _proxy = new ProxyObservable(); _tryTake = (out T value) => { // This is not an overridable method, and it is not being called on the constructor. if (tryTakeCopy(out value)) { _proxy.OnNext(value); return true; } _done = new ValueFuncClosure(isDone).InvokeReturn(); return false; }; } public Progressor(IObservable wrapped) { var buffer = new SafeQueue(); wrapped.Subscribe ( new CustomObserver ( () => _done = true, exception => _done = true, buffer.Add ) ); _proxy = new ProxyObservable(); _tryTake = (out T value) => { if (buffer.TryTake(out value)) { _proxy.OnNext(value); return true; } value = default(T); return false; }; } private Progressor(TryTake tryTake, ProxyObservable proxy) { _proxy = proxy; _tryTake = tryTake; } public bool IsClosed { get { return _tryTake == null; } } public static Progressor CreateConverted(Progressor wrapped, Func converter) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } if (converter == null) { throw new ArgumentNullException("converter"); } var control = 0; Predicate newFilter = item => Volatile.Read(ref control) == 0; var buffer = new SafeQueue(); var proxy = new ProxyObservable(); var result = new Progressor( (out T value) => { Interlocked.Increment(ref control); try { TInput item; if (buffer.TryTake(out value)) { proxy.OnNext(value); return true; } else if (wrapped.TryTake(out item)) { value = converter(item); proxy.OnNext(value); return true; } value = default(T); return false; } finally { Interlocked.Decrement(ref control); } }, proxy ); wrapped.Subscribe ( new CustomObserver ( () => result._done = true, exception => result._done = true, item => { if (newFilter(item)) { buffer.Add(converter(item)); } } ) ); return result; } public static Progressor CreatedFiltered(Progressor wrapped, Predicate filter) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } if (filter == null) { throw new ArgumentNullException("filter"); } var control = 0; Predicate newFilter = item => Volatile.Read(ref control) == 0 && filter(item); var buffer = new SafeQueue(); var proxy = new ProxyObservable(); var result = new Progressor( (out T value) => { Volatile.Write(ref control, 1); try { again: if (buffer.TryTake(out value)) { proxy.OnNext(value); return true; } else if (wrapped.TryTake(out value)) { if (filter(value)) { proxy.OnNext(value); return true; } else { goto again; } } value = default(T); return false; } finally { Interlocked.Decrement(ref control); } }, proxy ); wrapped.Subscribe ( new CustomObserver ( () => result._done = true, exception => result._done = true, item => { if (newFilter(item)) { buffer.Add(item); } } ) ); return result; } public static Progressor CreatedFilteredConverted(Progressor wrapped, Predicate filter, Func converter) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } if (filter == null) { throw new ArgumentNullException("filter"); } if (converter == null) { throw new ArgumentNullException("converter"); } var control = 0; Predicate newFilter = item => Volatile.Read(ref control) == 0 && filter(item); var buffer = new SafeQueue(); var proxy = new ProxyObservable(); var result = new Progressor( (out T value) => { Interlocked.Increment(ref control); try { TInput item; again: if (buffer.TryTake(out value)) { proxy.OnNext(value); return true; } else if (wrapped.TryTake(out item)) { if (filter(item)) { value = converter(item); proxy.OnNext(value); return true; } else { goto again; } } value = default(T); return false; } finally { Interlocked.Decrement(ref control); } }, proxy ); wrapped.Subscribe ( new CustomObserver ( () => result._done = true, exception => result._done = true, item => { if (newFilter(item)) { buffer.Add(converter(item)); } } ) ); return result; } public static Progressor CreateDistinct(Progressor wrapped) { if (wrapped == null) { throw new ArgumentNullException("wrapped"); } var control = 0; var buffer = new SafeDictionary(); Predicate newFilter = item => Volatile.Read(ref control) == 0; var proxy = new ProxyObservable(); var result = new Progressor( (out T value) => { Interlocked.Increment(ref control); try { again: foreach (var item in buffer.Where(item => !item.Value)) { value = item.Key; buffer.Set(value, true); proxy.OnNext(value); return true; } if (wrapped.TryTake(out value)) { bool seen; if (!buffer.TryGetValue(value, out seen) || !seen) { buffer.Set(value, true); proxy.OnNext(value); return true; } else { goto again; } } else { return false; } } finally { Interlocked.Decrement(ref control); } }, proxy ); wrapped.Subscribe ( new CustomObserver ( () => result._done = true, exception => result._done = true, item => { if (newFilter(item)) { buffer.TryAdd(item, false); } } ) ); return result; } public IEnumerable AsEnumerable() { // After enumerating - the consumer of this method must check if the Progressor is closed. while (true) { T item; var tryTake = _tryTake; if (tryTake(out item)) { yield return item; } else { break; } } } public void Close() { _tryTake = null; _proxy.OnCompleted(); _proxy = null; } public IDisposable Subscribe(IObserver observer) { if (_proxy != null) { return _proxy.Subscribe(observer); } return Disposable.Create(ActionHelper.GetNoopAction()); } public bool TryTake(out T item) { if (_tryTake != null) { if (_tryTake.Invoke(out item)) { return true; } if (_done) { Close(); } return false; } item = default(T); return false; } public IEnumerable While(Predicate condition) { if (condition == null) { throw new ArgumentNullException("condition"); } while (true) { T item; var tryTake = _tryTake; if (tryTake(out item) && condition(item)) { yield return item; } else { break; } } } public IEnumerable While(Func condition) { if (condition == null) { throw new ArgumentNullException("condition"); } while (true) { T item; var tryTake = _tryTake; if (tryTake(out item) && condition()) { yield return item; } else { break; } } } } } #endif