上海虹口龙之梦项目
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

209 lines
6.1 KiB

using System;
using System.Collections.Generic;
using UniRx.InternalUtil;
namespace UniRx
{
public interface IMessagePublisher
{
/// <summary>
/// Send Message to all receiver.
/// </summary>
void Publish<T>(T message);
}
public interface IMessageReceiver
{
/// <summary>
/// Subscribe typed message.
/// </summary>
IObservable<T> Receive<T>();
}
public interface IMessageBroker : IMessagePublisher, IMessageReceiver
{
}
public interface IAsyncMessagePublisher
{
/// <summary>
/// Send Message to all receiver and await complete.
/// </summary>
IObservable<Unit> PublishAsync<T>(T message);
}
public interface IAsyncMessageReceiver
{
/// <summary>
/// Subscribe typed message.
/// </summary>
IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver);
}
public interface IAsyncMessageBroker : IAsyncMessagePublisher, IAsyncMessageReceiver
{
}
/// <summary>
/// In-Memory PubSub filtered by Type.
/// </summary>
public class MessageBroker : IMessageBroker, IDisposable
{
/// <summary>
/// MessageBroker in Global scope.
/// </summary>
public static readonly IMessageBroker Default = new MessageBroker();
bool isDisposed = false;
readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>();
public void Publish<T>(T message)
{
object notifier;
lock (notifiers)
{
if (isDisposed) return;
if (!notifiers.TryGetValue(typeof(T), out notifier))
{
return;
}
}
((ISubject<T>)notifier).OnNext(message);
}
public IObservable<T> Receive<T>()
{
object notifier;
lock (notifiers)
{
if (isDisposed) throw new ObjectDisposedException("MessageBroker");
if (!notifiers.TryGetValue(typeof(T), out notifier))
{
ISubject<T> n = new Subject<T>().Synchronize();
notifier = n;
notifiers.Add(typeof(T), notifier);
}
}
return ((IObservable<T>)notifier).AsObservable();
}
public void Dispose()
{
lock (notifiers)
{
if (!isDisposed)
{
isDisposed = true;
notifiers.Clear();
}
}
}
}
/// <summary>
/// In-Memory PubSub filtered by Type.
/// </summary>
public class AsyncMessageBroker : IAsyncMessageBroker, IDisposable
{
/// <summary>
/// AsyncMessageBroker in Global scope.
/// </summary>
public static readonly IAsyncMessageBroker Default = new AsyncMessageBroker();
bool isDisposed = false;
readonly Dictionary<Type, object> notifiers = new Dictionary<Type, object>();
public IObservable<Unit> PublishAsync<T>(T message)
{
UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>> notifier;
lock (notifiers)
{
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");
object _notifier;
if (notifiers.TryGetValue(typeof(T), out _notifier))
{
notifier = (UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
}
else
{
return Observable.ReturnUnit();
}
}
var data = notifier.Data;
var awaiter = new IObservable<Unit>[data.Length];
for (int i = 0; i < data.Length; i++)
{
awaiter[i] = data[i].Invoke(message);
}
return Observable.WhenAll(awaiter);
}
public IDisposable Subscribe<T>(Func<T, IObservable<Unit>> asyncMessageReceiver)
{
lock (notifiers)
{
if (isDisposed) throw new ObjectDisposedException("AsyncMessageBroker");
object _notifier;
if (!notifiers.TryGetValue(typeof(T), out _notifier))
{
var notifier = UniRx.InternalUtil.ImmutableList<Func<T, IObservable<Unit>>>.Empty;
notifier = notifier.Add(asyncMessageReceiver);
notifiers.Add(typeof(T), notifier);
}
else
{
var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
notifier = notifier.Add(asyncMessageReceiver);
notifiers[typeof(T)] = notifier;
}
}
return new Subscription<T>(this, asyncMessageReceiver);
}
public void Dispose()
{
lock (notifiers)
{
if (!isDisposed)
{
isDisposed = true;
notifiers.Clear();
}
}
}
class Subscription<T> : IDisposable
{
readonly AsyncMessageBroker parent;
readonly Func<T, IObservable<Unit>> asyncMessageReceiver;
public Subscription(AsyncMessageBroker parent, Func<T, IObservable<Unit>> asyncMessageReceiver)
{
this.parent = parent;
this.asyncMessageReceiver = asyncMessageReceiver;
}
public void Dispose()
{
lock (parent.notifiers)
{
object _notifier;
if (parent.notifiers.TryGetValue(typeof(T), out _notifier))
{
var notifier = (ImmutableList<Func<T, IObservable<Unit>>>)_notifier;
notifier = notifier.Remove(asyncMessageReceiver);
parent.notifiers[typeof(T)] = notifier;
}
}
}
}
}
}