You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
209 lines
6.1 KiB
209 lines
6.1 KiB
using System; |
|
using System.Collections.Generic; |
|
using UniRx.InternalUtil; |
|
|
|
namespace UniRx |
|
{ |
|
public interface IMessagePublisher |
|
{ |
|
/// <summary> |
|
/// Send Message to all receiver. |
|
/// </summary> |
|
void Publish<T>(T message); |
|
} |
|
|
|
public interface IMessageReceiver |
|
{ |
|
/// <summary> |
|
/// Subscribe typed message. |
|
/// </summary> |
|
IObservable<T> Receive<T>(); |
|
} |
|
|
|
public interface IMessageBroker : IMessagePublisher, IMessageReceiver |
|
{ |
|
} |
|
|
|
public interface IAsyncMessagePublisher |
|
{ |
|
/// <summary> |
|
/// Send Message to all receiver and await complete. |
|
/// </summary> |
|
IObservable<Unit> PublishAsync<T>(T message); |
|
} |
|
|
|
public interface IAsyncMessageReceiver |
|
{ |
|
/// <summary> |
|
/// Subscribe typed message. |
|
/// </summary> |
|
IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver); |
|
} |
|
|
|
public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver |
|
{ |
|
} |
|
|
|
/// <summary> |
|
/// In-Memory PubSub filtered by Type. |
|
/// </summary> |
|
public class MessageBroker : IMessageBroker, IDisposable |
|
{ |
|
/// <summary> |
|
/// MessageBroker in Global scope. |
|
/// </summary> |
|
public static readonly IMessageBroker Default = new MessageBroker(); |
|
|
|
bool isDisposed = false; |
|
readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>(); |
|
|
|
public void Publish<T>(T message) |
|
{ |
|
object notifier; |
|
lock (notifiers) |
|
{ |
|
if (isDisposed) return; |
|
|
|
if (!notifiers.TryGetValue(typeof(T), out notifier)) |
|
{ |
|
return; |
|
} |
|
} |
|
((ISubject<T>)notifier).OnNext(message); |
|
} |
|
|
|
public IObservable<T> Receive<T>() |
|
{ |
|
object notifier; |
|
lock (notifiers) |
|
{ |
|
if (isDisposed) throw new ObjectDisposedException("MessageBroker"); |
|
|
|
if (!notifiers.TryGetValue(typeof(T), out notifier)) |
|
{ |
|
ISubject<T> n = new Subject<T>().Synchronize(); |
|
notifier = n; |
|
notifiers.Add(typeof(T), notifier); |
|
} |
|
} |
|
|
|
return ((IObservable<T>)notifier).AsObservable(); |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
lock (notifiers) |
|
{ |
|
if (!isDisposed) |
|
{ |
|
isDisposed = true; |
|
notifiers.Clear(); |
|
} |
|
} |
|
} |
|
} |
|
|
|
/// <summary> |
|
/// In-Memory PubSub filtered by Type. |
|
/// </summary> |
|
public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable |
|
{ |
|
/// <summary> |
|
/// AsyncMessageBroker in Global scope. |
|
/// </summary> |
|
public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker(); |
|
|
|
bool isDisposed = false; |
|
readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>(); |
|
|
|
public IObservable<Unit> PublishAsync<T>(T message) |
|
{ |
|
UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>> notifier; |
|
lock (notifiers) |
|
{ |
|
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); |
|
|
|
object _notifier; |
|
if (notifiers.TryGetValue(typeof(T), out _notifier)) |
|
{ |
|
notifier = (UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>)_notifier; |
|
} |
|
else |
|
{ |
|
return Observable.ReturnUnit(); |
|
} |
|
} |
|
|
|
var data = notifier.Data; |
|
var awaiter = new IObservable<Unit>[data.Length]; |
|
for (int i = 0; i < data.Length; i++) |
|
{ |
|
awaiter[i] = data[i].Invoke(message); |
|
} |
|
return Observable.WhenAll(awaiter); |
|
} |
|
|
|
public IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver) |
|
{ |
|
lock (notifiers) |
|
{ |
|
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker"); |
|
|
|
object _notifier; |
|
if (!notifiers.TryGetValue(typeof(T), out _notifier)) |
|
{ |
|
var notifier = UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>.Empty; |
|
notifier = notifier.Add(asyncMessageReceiver); |
|
notifiers.Add(typeof(T), notifier); |
|
} |
|
else |
|
{ |
|
var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier; |
|
notifier = notifier.Add(asyncMessageReceiver); |
|
notifiers[typeof(T)] = notifier; |
|
} |
|
} |
|
|
|
return new Subscription<T>(this, asyncMessageReceiver); |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
lock (notifiers) |
|
{ |
|
if (!isDisposed) |
|
{ |
|
isDisposed = true; |
|
notifiers.Clear(); |
|
} |
|
} |
|
} |
|
|
|
class Subscription<T> : IDisposable |
|
{ |
|
readonly AsyncMessageBroker parent; |
|
readonly Func<T, IObservable<Unit>> asyncMessageReceiver; |
|
|
|
public Subscription(AsyncMessageBroker parent, Func<T, IObservable<Unit>> asyncMessageReceiver) |
|
{ |
|
this.parent = parent; |
|
this.asyncMessageReceiver = asyncMessageReceiver; |
|
} |
|
|
|
public void Dispose() |
|
{ |
|
lock (parent.notifiers) |
|
{ |
|
object _notifier; |
|
if (parent.notifiers.TryGetValue(typeof(T), out _notifier)) |
|
{ |
|
var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier; |
|
notifier = notifier.Remove(asyncMessageReceiver); |
|
|
|
parent.notifiers[typeof(T)] = notifier; |
|
} |
|
} |
|
} |
|
} |
|
} |
|
} |