接着上篇继续讲
1.KGNetSession socket的会话管理 就是管理接收/发送 数据消息的
这里流程是StartReciveData()开启了连接异步监听了回调ReciveHeadData(IAsyncResult)这个回调是用来判断出包长然后进行实际数据消息包异步回调处理ReciveData(IAsyncResult)
这里分两个异步接收主要是处理socket的粘包和分包情况,
大概的思维图
粘包的话是当消息频繁一起时候 socket有可能把他们打包一起发了过来,
分包 是当消息过长 socket会把他们分成两或多个个包分几次发过来 就会出现包不完整情况
为啥ReciveHeadData只接收 四字节嘞,因为 定义包长的数据就站byte四个字节 所以只接收这几个 socket的话一次没接收完他会把余下的继续分包发的 所以接收完 得到这个包的长度就知道接下来要接收包数据要多长
ReciveData(IAsyncResult)接收到之后 就进行了消息处理回调函数 OnReciveData(T) 然后进行新一轮监听了,如果这时候 socket 出现粘包还没接完 就会继续分包发过来重复这一轮操作就好 就可以处理掉粘包的情况
连接关闭时候 会触发OnDisRecive
如果是强行关闭的话会触发一次监听异步的回调 如果还是调用EndReceive就会报错所以前面要加个if(mSocket.Available==0)判断是否还有数据
/// <summary>
/// 单个的网络会话管理
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class KGNetSession<T> where T:KGNetData
{
public Socket mSocket;
public event Action<T> OnReciveDataEvent;//接收到的数据委托事件
public event Action OnDisReciveEvent;//关闭会话的事件
public event Action OnStartReciveEvent;//首次开启连接的事件
public KGNetPacket mKGNetPacket=new KGNetPacket();
/// <summary>
/// 这里开始数据接收
/// </summary>
/// <param name="socket"></param>
/// <param name="close"></param>
public void StartReciveData(Socket socket,Action close=null)
{
try
{
// 初始化赋值
mSocket = socket;
OnDisReciveEvent+= close;
//回调开启连接事件
OnStartRecive();
//首先是接收头4个字节确认包长
mKGNetPacket.PacketBuff = new byte[4];
mSocket.BeginReceive(mKGNetPacket.PacketBuff, 0, 4, SocketFlags.None, ReciveHeadData, null);
}
catch (Exception e)
{
("StartReciveDataError:" + e).KLog(LogLevel.Err);
}
}
/// <summary>
/// 这里是判断接收标头的就是长度
/// </summary>
/// <param name="ar"></param>
protected void ReciveHeadData(IAsyncResult ar)
{
try
{
//如果接收数据长度等于0就是断开连接了
//为啥要加这个勒 在断开的时候 异步会回调一次 直接调用EndReceive 会报错
if (mSocket.Available == 0)
{
Clear();
return;
}
int len = mSocket.EndReceive(ar);
if (len > 0)
{
mKGNetPacket.HeadIndex += len;
//这里如果是小于4的就是凑不成 就是分包了 要继续接收
if (mKGNetPacket.HeadIndex < mKGNetPacket.HeadLength)
{
mSocket.BeginReceive(mKGNetPacket.PacketBuff, mKGNetPacket.HeadIndex, mKGNetPacket.HeadLength - mKGNetPacket.HeadIndex, SocketFlags.None, ReciveHeadData, null);
}
//这里已经取出长度了
else
{
//赋值从那四个字节获取的byte[]的长度
mKGNetPacket.SetPackLen();
//进行真正的数据接收处理
mSocket.BeginReceive(mKGNetPacket.PacketBuff, 0, mKGNetPacket.PacketBuffLength, SocketFlags.None, ReciveData, null);
}
}
else
{
Clear();
}
}
catch (Exception e)
{
("ReciveHeadDataError:" + e).KLog(LogLevel.Err);
}
}
/// <summary>
/// 这里是接收到包里面的数据异步处理
/// </summary>
/// <param name="ar"></param>
protected void ReciveData(IAsyncResult ar)
{
try
{
//结束接收获取长度
int len = mSocket.EndReceive(ar);
if (len>0)
{
mKGNetPacket.PacketIndex += len;
//这里是如果接收到的包长和原本获取到的长度小,就是分包了 需要再次进行接收剩下的
if (mKGNetPacket.PacketIndex < mKGNetPacket.PacketBuffLength)
{
mSocket.BeginReceive(mKGNetPacket.PacketBuff, mKGNetPacket.PacketIndex, mKGNetPacket.PacketBuffLength - mKGNetPacket.PacketIndex, SocketFlags.None, ReciveData, null);
}
//已经接完一组数据了
else
{
//这里就可以进行回调函数了
OnReciveData(mKGNetPacket.PacketBuff.DeSerialization<T>());
//开始新一轮的从上往下接收了
mKGNetPacket.PacketBuff = new byte[4];
mSocket.BeginReceive(mKGNetPacket.PacketBuff, 0, 4, SocketFlags.None, ReciveHeadData, null);
}
}
else
{
Clear();
}
}
catch (Exception e)
{
("ReciveDataError:" + e).KLog(LogLevel.Err);
}
}
#region Send
/// <summary>
/// 发送消息
/// </summary>
/// <param name="data"></param>
public void SendData(T data)
{
//这里转回来 byte[]
byte[] bytedata = data.PackNetData();
//创建流准备异步写入发送
NetworkStream network = null;
try
{
//指定写入的socket
network = new NetworkStream(mSocket);
//判断是否可以支持写入消息
if (network.CanWrite)
{
//开始异步写入
network.BeginWrite(bytedata,0, bytedata.Length, SendDataAsync, network);
}
}
catch (Exception e)
{
("SendDataError:" + e).KLog(LogLevel.Err);
}
}
/// <summary>
/// 这里是异步写入回调
/// </summary>
/// <param name="ar"></param>
protected void SendDataAsync(IAsyncResult ar)
{
//拿到写入时候的流
NetworkStream network = (NetworkStream)ar.AsyncState;
try
{
//结束写入 就是发送了 然后进行关闭流
network.EndWrite(ar);
network.Flush();
network.Close();
}
catch (Exception e)
{
("SendDataAsyncError:" + e).KLog(LogLevel.Err);
}
}
#endregion
/// <summary>
/// 网络关闭
/// </summary>
protected void Clear()
{
OnDisRecive();
mSocket.Close();
}
protected virtual void OnReciveData(T data)
{
OnReciveDataEvent?.Invoke(data);
("接收到了一条消息:"+data).KLog();
}
protected virtual void OnDisRecive()
{
OnDisReciveEvent?.Invoke();
("关闭了一个连接:").KLog();
}
protected virtual void OnStartRecive()
{
OnStartReciveEvent?.Invoke();
("开始了一个连接:").KLog();
}
}
2.KGBaseNet KGSocketClient/KGSocketServe的父类
就提了一下共用部分
public abstract class KGBaseNet
{
public Socket mSocket;
public KGBaseNet()
{
//这里就是new一个socket出来 指定地址类型 和套接字类型( 就是传送数据类型),还有协议
mSocket = new Socket(AddressFamily.InterNetwork,SocketType.Stream,ProtocolType.Tcp);
}
//开启建立
public abstract void StartCreate(string ip,int port);
//建立的回调
public abstract void ConnectAsync(IAsyncResult ar);
//打印的调用
public void SetLog(Action<string, LogLevel> LogEvent,bool run=true)
{
LogEvent.SetLog(run);
}
}
3.KGSocketClient 建立客户端的
好像没啥好说的 看代码吧
/// <summary>
/// 建立客户端的
/// </summary>
/// <typeparam name="T"></typeparam>
/// <typeparam name="R"></typeparam>
public class KGSocketClient<T, R> : KGBaseNet where T : KGNetSession<R>, new() where R : KGNetData
{
public T Client;
public override void StartCreate(string ip, int port)
{
try
{
Client = new T();
//异步连接服务器
mSocket.BeginConnect(IPAddress.Parse(ip), port, ConnectAsync, Client);
("正在连接服务器").KLog();
}
catch (Exception e)
{
("StartCreateError:" + e).KLog(LogLevel.Err);
}
}
public override void ConnectAsync(IAsyncResult ar)
{
try
{
mSocket.EndConnect(ar);
//连接完成开始接收数据
Client.StartReciveData(mSocket,()=> { Client = null; });
}
catch (Exception e)
{
("ConnectAsyncError:" + e).KLog(LogLevel.Err);
}
}
}
4.KGSocketClient 建立服务器端的
public class KGSocketServe<T, R> : KGBaseNet where T : KGNetSession<R>, new() where R : KGNetData
{
public List<T> SessionList=new List<T>();//储存会话管理的
public int NetListen=10;//监听数
public override void StartCreate(string ip, int port)
{
try
{
//绑定地址
mSocket.Bind(new IPEndPoint(IPAddress.Parse(ip),port));
//监听数
mSocket.Listen(NetListen);
//进行异步监听
mSocket.BeginAccept(ConnectAsync, null);
("建立服务器........").KLog();
}
catch (Exception e)
{
("StartCreateError:" + e).KLog(LogLevel.Err);
}
}
//异步回调
public override void ConnectAsync(IAsyncResult ar)
{
try
{
T Client = new T();
//这里结束接收 获取刚刚连接的socket
Socket sk= mSocket.EndAccept(ar);
//开始监听 第二个是加入结束事件
Client.StartReciveData(sk,
()=>
{
SessionList.Remove(Client);
});
//添加进SessionList储存
SessionList.Add(Client);
//开始新一轮接收连接
mSocket.BeginAccept(ConnectAsync, null);
}
catch (Exception e)
{
("ConnectAsyncError:" + e).KLog(LogLevel.Err);
}
}
}
使用方法
这里大概说一下 下一篇会出个游戏例子
1.网络消息自定义类
都必须继承KGNetData 然后打上可序列化标签[Serializable]
[Serializable]
public class NetData : KGNetData {
public string dataname;
}
2.创建客户端/服务器端
// KGSocketClient<KGNetSession<KGNetData>, KGNetData> kg = new KGSocketClient<KGNetSession<NetData>, NetData>();
KGSocketServe<KGNetSession<KGNetData>, KGNetData> kg= new KGSocketServe<KGNetSession<NetData>,NetData>();
//都是调用这个创建
kg.StartCreate("127.0.0.1", 8897);
3.发送数据
就是调用KGNetSession里面的SendData(T)
kg.Client.SendData(new KGNetData { dataname = "123456" });
4.接收网络消息
这里留了一个回调事件和回调函数OnReciveDataEvent/OnReciveData
重写OnReciveData 就好了 如果别的要加事件可以往OnReciveDataEvent加
protected override void OnReciveData(T data)
{
OnReciveDataEvent?.Invoke(data);
("接收到了一条消息:"+data).KLog();
}
5.打印数据的
在KGBaseNet里面的 这里是给 在另外一些 Console.WriteLine打印不了留出来用的
public void SetLog(Action<string, LogLevel> LogEvent,bool run=true)
{
LogEvent.SetLog(run);
}