大连中石油电子沙盘
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

224 lines
6.8 KiB

using UnityEngine;
using System;
using System.Collections.Generic;
using AX.Network.Common;
using AX.Network.Protocols;
using IAppSession = AX.Network.Common.IAppSession<AX.Network.Protocols.BinaryProtocol, AX.Network.Protocols.BinaryMessage>;
namespace AX.NetworkSystem
{
public static class NetworkMessageDispatcher
{
private static Dictionary<string, Action<BinaryMessage>> handlers = new Dictionary<string, Action<BinaryMessage>>();
private static ObjectPool<BinaryMessage> messagePool = new ObjectPool<BinaryMessage>(8, () => new BinaryMessage());
private static DynamicBufferPool bufferPool = new DynamicBufferPool();
private static Queue<BinaryMessage> messages = new Queue<BinaryMessage>();
private static Action<IAppSession> connectedHandler;
private static Action<IAppSession, CloseReason> closedHandler;
private static Queue<object> connectedMessages = new Queue<object>();
private static Queue<KeyValuePair<object, CloseReason>> closedMessages = new Queue<KeyValuePair<object, CloseReason>>();
private static object lockObj = new object();
public static void Update()
{
lock (lockObj)
{
while (messages.Count != 0)
{
var message = messages.Dequeue();
Action<BinaryMessage> action;
if (handlers.TryGetValue(message.Header, out action))
action(message);
else
{
if (Application.isEditor)
Debug.LogError("[NetworkSystem] 该消息类型 { " + message.Header + " } 没有对应的接收方处理。");
}
if (message.Body != null)
{
Array.Clear(message.Body, 0, message.Body.Length);
bufferPool.Release(message.Body);
}
message.Header = null;
message.Body = null;
message.Tag = null;
messagePool.Release(message);
}
while (connectedMessages.Count != 0)
{
var connectedMessage = connectedMessages.Dequeue();
if (connectedHandler != null)
connectedHandler((IAppSession)connectedMessage);
}
while (closedMessages.Count != 0)
{
var closedMessage = closedMessages.Dequeue();
if (closedHandler != null)
closedHandler((IAppSession)closedMessage.Key, closedMessage.Value);
}
}
}
public static void Clear()
{
handlers.Clear();
messagePool.Clear();
bufferPool.Clear();
messages.Clear();
}
public static void AddListener(string messageType, Action<BinaryMessage> handler)
{
if (handlers.ContainsKey(messageType))
handlers[messageType] += handler;
else
handlers[messageType] = handler;
}
public static void RemoveListener(string messageType, Action<BinaryMessage> handler)
{
if (handlers.ContainsKey(messageType))
{
handlers[messageType] -= handler;
if (handlers[messageType] == null)
handlers.Remove(messageType);
}
}
public static void AddConnectedListener(Action<IAppSession> action)
{
connectedHandler += action;
}
public static void RemoveConnectedListener(Action<IAppSession> action)
{
connectedHandler -= action;
}
public static void AddClosedListener(Action<IAppSession, CloseReason> action)
{
closedHandler += action;
}
public static void RemoveClosedListener(Action<IAppSession, CloseReason> action)
{
closedHandler -= action;
}
public static void SendMessage(BinaryMessage message, object tag = null)
{
if (message == null)
return;
lock (lockObj)
{
var binaryMessage = messagePool.Acquire();
binaryMessage.Header = message.Header;
if (!message.RawBody.IsEmpty())
{
binaryMessage.Body = bufferPool.Acquire(message.RawBody.Count);
message.RawBody.CopyTo(binaryMessage.Body);
}
binaryMessage.Tag = tag;
messages.Enqueue(binaryMessage);
}
}
public static void SendMessage(string header)
{
lock (lockObj)
{
var binaryMessage = messagePool.Acquire();
binaryMessage.Header = header;
messages.Enqueue(binaryMessage);
}
}
public static void SendMessage(string header, object tag)
{
lock (lockObj)
{
var binaryMessage = messagePool.Acquire();
binaryMessage.Header = header;
binaryMessage.Tag = tag;
messages.Enqueue(binaryMessage);
}
}
public static void SendMessage(string header, byte[] body, object tag = null)
{
lock (lockObj)
{
var binaryMessage = messagePool.Acquire();
binaryMessage.Header = header;
binaryMessage.Body = body;
binaryMessage.Tag = tag;
messages.Enqueue(binaryMessage);
}
}
public static void SendMessage(string header, ArraySegment<byte> body, object tag = null)
{
lock (lockObj)
{
var binaryMessage = messagePool.Acquire();
binaryMessage.Header = header;
if (body.Array != null && body.Count != 0)
{
binaryMessage.Body = bufferPool.Acquire(body.Count);
Buffer.BlockCopy(body.Array, body.Offset, binaryMessage.Body, 0, body.Count);
}
binaryMessage.Tag = tag;
messages.Enqueue(binaryMessage);
}
}
public static void SendConnectedMessage(object session)
{
if (session == null)
return;
lock (lockObj)
{
connectedMessages.Enqueue(session);
}
}
public static void SendClosedMessage(KeyValuePair<object, CloseReason> pair)
{
if (pair.Key == null)
return;
lock (lockObj)
{
closedMessages.Enqueue(pair);
}
}
}
}