服务端应用层管理数据包
1、一个完整的数据包接收完毕后悔去掉head部分交由应用层,应用层可根据自己的需要制定应用层协议,这里是从body部分读取第一个字符串作为处理方法。
服务端和客户端通用,Update需要在一个线程内执行
namespace SelfFramework.Network
{
public class NetManager
{
private INetEventListener eventListener;
private ThreadSynchronizationContext threadSynchronizationContext = ThreadSynchronizationContext.Instance;
private bool isService;
private TService service;
private TChannel channel;
private Stopwatch stopwatch;
private readonly int updateTime;
public NetManager(INetEventListener eventListener,int updateTime)
{
this.eventListener = eventListener;
this.updateTime = updateTime;
this.stopwatch = new Stopwatch();
}
public TService StartServer(IPEndPoint ipEndPoint)
{
this.service=new TService(this.threadSynchronizationContext, ipEndPoint, eventListener);
this.isService = true;
return this.service;
}
public TChannel StartConnect(IPEndPoint ipEndPoint)
{
if (this.channel == null || this.channel.IsUsing == false)
{
this.channel = new TChannel(this.threadSynchronizationContext, ipEndPoint, this.eventListener);
}
else
{
this.channel.ConnectAsync();
}
this.isService = false;
return this.channel;
}
public void Update()
{
stopwatch.Start();
threadSynchronizationContext.Update();
if (this.isService)
{
this.service.Update();
}
else
{
this.channel.Update();
}
stopwatch.Stop();
int sleepTime = Math.Max(0, updateTime - (int)stopwatch.ElapsedMilliseconds);
Thread.Sleep(sleepTime);
stopwatch.Reset();
}
}
}
服务端应用层接口
namespace SelfFramework.Manager
{
public class ServerListener:BaseBehaviour,INetEventListener
{
private NetManager mServer;
private TService service;
public void StartUp(IPEndPoint ipEndPoint, bool startHeartBeat)
{
this.mServer = new NetManager(this, 15);
service = this.mServer.StartServer(ipEndPoint);
if (startHeartBeat)
{
service.StartCheckHeartBeat();
}
}
public void OnNetworkError(TChannel channel, SocketError socketError, SocketAsyncOperation lastOperation)
{
Console.WriteLine("错误:" + socketError + "最后的操作:>" + lastOperation);
}
public void OnNetworkReceive(TChannel channel, byte[] data)
{
networkManager.EnPacket(new NetMsg(channel, data));
}
public void OnPeerConnected(TChannel channel)
{
clientPeerManager.AddClientPeer(channel);
}
public void OnPeerDisconnected(TChannel channel, DisconnectInfo disconnectInfo)
{
clientPeerManager.RemoveClientPeer(channel.channelId);
}
public void StopServer()
{
//mServer?.Stop();
//mServer = null;
}
public void Update()
{
mServer.Update();
}
}
}
应用层网络管理器
namespace SelfFramework.Manager
{
public struct NetMsg
{
public TChannel channel;
public byte[] data;
public NetMsg(TChannel channel,byte[] data)
{
this.channel = channel;
this.data = data;
}
}
public class NetworkManager : BaseManager
{
//分发器
private ConcurrentBag<IDispatcher> dispatcherBag;
private ServerListener serverListener;
private ConcurrentQueue<NetMsg> taskQueue = new ConcurrentQueue<NetMsg>();
private static readonly object lockObj = new object();
private readonly static log4net.ILog Log = LogHelper.GetLogger();
public override void InitManager()
{
dispatcherBag = new ConcurrentBag<IDispatcher>()
{
new NetworkDispatcher()
};
this.IsUpdate = true;
serverListener = new ServerListener();
serverListener.StartUp(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 17000), true);
Task.Run(() =>
{
while (true)
{
serverListener.Update(); //socket线程
}
});
}
public override void OnDispose()
{
taskQueue?.Clear();
}
public void EnPacket(NetMsg msg)
{
lock (lockObj)
{
taskQueue.Enqueue(msg);
}
}
private NetMsg DePacket()
{
lock (lockObj)
{
taskQueue.TryDequeue(out NetMsg msg);
return msg;
}
}
public override void OnUpdate()
{
if (taskQueue.IsEmpty)
{
return;
}
try
{
NetMsg msg = DePacket(); //数据包出列
foreach (IDispatcher dispatcher in dispatcherBag)
{
dispatcher.OnMessage(msg.channel, msg.data);
}
}
catch (Exception e)
{
Log.ErrorFormat("数据包处理出错:>{0}", e.Message);
}
}
}
}
服务端协议分发器,这里是读取一个字符串,剩余部分作为数据,可自行制定应用层协议
namespace SelfFramework.Manager
{
public sealed class NetworkDispatcher : BaseBehaviour,IDispatcher
{
private ConcurrentDictionary<string, HandlerDelegate> handleTable;
public NetworkDispatcher()
{
handleTable = new ConcurrentDictionary<string, HandlerDelegate>();
RegistHandlers();
}
public void OnMessage(TChannel channel, byte[] body)
{
NetDataReader reader = new NetDataReader(body, 0, body.Length);
string protocol = reader.GetString();
byte[] data = null;
if (reader.ReadPos < reader.Capacity)
{
data = reader.GetBytes();
}
if (handleTable.TryGetValue(protocol, out HandlerDelegate handler))
{
ClientPeer clientPeer = clientPeerManager.GetClientPeer(channel.channelId);
if (clientPeer!=null)
{
handler.Invoke(clientPeer, data);
}
}
}
private void RegistHandlers()
{
Type parentType = typeof(IHandler);
Type[] assemblyTypes = parentType.Assembly.GetTypes();
for (int i = 0; i < assemblyTypes.Length; i++)
{
Type type = assemblyTypes[i];
if (type.IsAbstract || type.IsInterface)
{
continue;
}
if (parentType.IsAssignableFrom(type) == false)
{
continue;
}
IHandler handler = Activator.CreateInstance(type) as IHandler;
MethodInfo[] methodInfos = handler.GetType().GetMethods();
for (int j = 0; j < methodInfos.Length; j++)
{
MethodInfo methodInfo = methodInfos[j];
HandlerAttribute attribute = methodInfo.GetCustomAttribute(typeof(HandlerAttribute)) as HandlerAttribute;
if (attribute != null)
{
HandlerDelegate handlerDelegate = methodInfo.CreateDelegate(typeof(HandlerDelegate), handler) as HandlerDelegate;
RegistHandler(attribute.protocol, handlerDelegate);
}
}
}
}
private void RegistHandler(string cmd, HandlerDelegate handler)
{
if (handleTable.ContainsKey(cmd))
{
handleTable[cmd] = handler;
}
else
{
handleTable.TryAdd(cmd, handler);
}
}
private void UnregistHandler(string protocol)
{
handleTable.TryRemove(protocol, out HandlerDelegate handler);
}
}
}
一个测试例子
namespace SelfFramework.Handler
{
public sealed class TestHandler : IHandler
{
[Handler(Protocol.TestFrame)]
public void ResponseTestFrame(ClientPeer clientPeer, byte[] data)
{
}
[Handler(Protocol.Test)]
public void ResponseTest(ClientPeer clientPeer, byte[] data)
{
NetDataReader dataReader = new NetDataReader(data, 0, data.Length);
Console.WriteLine(dataReader.GetInt32());
clientPeer.Send(Protocol.Test, new byte[] { 123, 0, 0, 0 });
}
}
}