You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
209 lines
6.1 KiB
209 lines
6.1 KiB
1 year ago
|
using System;
|
||
|
using System.Collections.Generic;
|
||
|
using UniRx.InternalUtil;
|
||
|
|
||
|
namespace UniRx
|
||
|
{
|
||
|
public interface IMessagePublisher
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// Send Message to all receiver.
|
||
|
/// </summary>
|
||
|
void Publish<T>(T message);
|
||
|
}
|
||
|
|
||
|
public interface IMessageReceiver
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// Subscribe typed message.
|
||
|
/// </summary>
|
||
|
IObservable<T> Receive<T>();
|
||
|
}
|
||
|
|
||
|
public interface IMessageBroker : IMessagePublisher, IMessageReceiver
|
||
|
{
|
||
|
}
|
||
|
|
||
|
public interface IAsyncMessagePublisher
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// Send Message to all receiver and await complete.
|
||
|
/// </summary>
|
||
|
IObservable<Unit> PublishAsync<T>(T message);
|
||
|
}
|
||
|
|
||
|
public interface IAsyncMessageReceiver
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// Subscribe typed message.
|
||
|
/// </summary>
|
||
|
IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver);
|
||
|
}
|
||
|
|
||
|
public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver
|
||
|
{
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// In-Memory PubSub filtered by Type.
|
||
|
/// </summary>
|
||
|
public class MessageBroker : IMessageBroker, IDisposable
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// MessageBroker in Global scope.
|
||
|
/// </summary>
|
||
|
public static readonly IMessageBroker Default = new MessageBroker();
|
||
|
|
||
|
bool isDisposed = false;
|
||
|
readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>();
|
||
|
|
||
|
public void Publish<T>(T message)
|
||
|
{
|
||
|
object notifier;
|
||
|
lock (notifiers)
|
||
|
{
|
||
|
if (isDisposed) return;
|
||
|
|
||
|
if (!notifiers.TryGetValue(typeof(T), out notifier))
|
||
|
{
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
((ISubject<T>)notifier).OnNext(message);
|
||
|
}
|
||
|
|
||
|
public IObservable<T> Receive<T>()
|
||
|
{
|
||
|
object notifier;
|
||
|
lock (notifiers)
|
||
|
{
|
||
|
if (isDisposed) throw new ObjectDisposedException("MessageBroker");
|
||
|
|
||
|
if (!notifiers.TryGetValue(typeof(T), out notifier))
|
||
|
{
|
||
|
ISubject<T> n = new Subject<T>().Synchronize();
|
||
|
notifier = n;
|
||
|
notifiers.Add(typeof(T), notifier);
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return ((IObservable<T>)notifier).AsObservable();
|
||
|
}
|
||
|
|
||
|
public void Dispose()
|
||
|
{
|
||
|
lock (notifiers)
|
||
|
{
|
||
|
if (!isDisposed)
|
||
|
{
|
||
|
isDisposed = true;
|
||
|
notifiers.Clear();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// <summary>
|
||
|
/// In-Memory PubSub filtered by Type.
|
||
|
/// </summary>
|
||
|
public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable
|
||
|
{
|
||
|
/// <summary>
|
||
|
/// AsyncMessageBroker in Global scope.
|
||
|
/// </summary>
|
||
|
public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker();
|
||
|
|
||
|
bool isDisposed = false;
|
||
|
readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>();
|
||
|
|
||
|
public IObservable<Unit> PublishAsync<T>(T message)
|
||
|
{
|
||
|
UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>> notifier;
|
||
|
lock (notifiers)
|
||
|
{
|
||
|
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");
|
||
|
|
||
|
object _notifier;
|
||
|
if (notifiers.TryGetValue(typeof(T), out _notifier))
|
||
|
{
|
||
|
notifier = (UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
return Observable.ReturnUnit();
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var data = notifier.Data;
|
||
|
var awaiter = new IObservable<Unit>[data.Length];
|
||
|
for (int i = 0; i < data.Length; i++)
|
||
|
{
|
||
|
awaiter[i] = data[i].Invoke(message);
|
||
|
}
|
||
|
return Observable.WhenAll(awaiter);
|
||
|
}
|
||
|
|
||
|
public IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver)
|
||
|
{
|
||
|
lock (notifiers)
|
||
|
{
|
||
|
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");
|
||
|
|
||
|
object _notifier;
|
||
|
if (!notifiers.TryGetValue(typeof(T), out _notifier))
|
||
|
{
|
||
|
var notifier = UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>.Empty;
|
||
|
notifier = notifier.Add(asyncMessageReceiver);
|
||
|
notifiers.Add(typeof(T), notifier);
|
||
|
}
|
||
|
else
|
||
|
{
|
||
|
var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
|
||
|
notifier = notifier.Add(asyncMessageReceiver);
|
||
|
notifiers[typeof(T)] = notifier;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return new Subscription<T>(this, asyncMessageReceiver);
|
||
|
}
|
||
|
|
||
|
public void Dispose()
|
||
|
{
|
||
|
lock (notifiers)
|
||
|
{
|
||
|
if (!isDisposed)
|
||
|
{
|
||
|
isDisposed = true;
|
||
|
notifiers.Clear();
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
class Subscription<T> : IDisposable
|
||
|
{
|
||
|
readonly AsyncMessageBroker parent;
|
||
|
readonly Func<T, IObservable<Unit>> asyncMessageReceiver;
|
||
|
|
||
|
public Subscription(AsyncMessageBroker parent, Func<T, IObservable<Unit>> asyncMessageReceiver)
|
||
|
{
|
||
|
this.parent = parent;
|
||
|
this.asyncMessageReceiver = asyncMessageReceiver;
|
||
|
}
|
||
|
|
||
|
public void Dispose()
|
||
|
{
|
||
|
lock (parent.notifiers)
|
||
|
{
|
||
|
object _notifier;
|
||
|
if (parent.notifiers.TryGetValue(typeof(T), out _notifier))
|
||
|
{
|
||
|
var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
|
||
|
notifier = notifier.Remove(asyncMessageReceiver);
|
||
|
|
||
|
parent.notifiers[typeof(T)] = notifier;
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|