1、开始正常监听以后,就要开始接受数据了,整体流程图如下:
2、上一节看到我们在程序初始化的时候,初始化了很多个SocketConnection,用于管理客户端的链接,那应用层如何来操作,又什么时候来接受数据?于是我们便有了SocketSession,用于给应用层来管理整个会话过程,代码如下:
public class SocketSession : IDisposable { public string SessionId { get; private set; } private System.Net.Sockets.Socket _connectSocket; private IProtocol _protocol; private SocketConnection _connect; public SocketConnection Connection { get { return _connect; } } private MemoryStream _memStream; private delegate void ReceiveDataHandler(SocketAsyncEventArgs e); private ReceiveDataHandler ReceiveHandler; private delegate void ReceiveReadPackageHandler(byte[] b, int offset, SocketAsyncEventArgs e); private ReceiveReadPackageHandler ReadPackageHandler; public System.Net.Sockets.Socket ConnectSocket { get { return _connectSocket; } private set { } } public SocketSession(string sessionId) { this.SessionId = sessionId; } public SocketSession(System.Net.Sockets.Socket client, SocketConnection connect) : this(Guid.NewGuid().ToString()) { this._connectSocket = client; this._connect = connect; this._protocol = connect.Pool.AppServer.AppProtocol; _memStream = new MemoryStream(); ReceiveHandler = ReceiveData; ReadPackageHandler = this.ReadPackage; } internal void ReceiveData(SocketAsyncEventArgs e) { if (e.SocketError != SocketError.Success) { this.Close(); return; } if (e.BytesTransferred <= 0) { this.Close(); return; } try { if (this.Connection.Flag == SocketFlag.Busy) { byte[] buffer = new byte[e.BytesTransferred]; Array.Copy(e.Buffer, 0, buffer, 0, e.BytesTransferred); ReadPackage(buffer, 0, e); buffer = null; } } catch (Exception ex) { this.Close(); return; } } internal void ReceiveAsync(SocketAsyncEventArgs e) { if (e == null) { return; } bool isCompleted = true; try { isCompleted = this._connectSocket.ReceiveAsync(e); } catch (Exception ex) { LogHelper.Debug(this.SessionId + ex.ToString()); this.Close(); } if (!isCompleted) { this.ReceiveHandler.BeginInvoke(e, ReceiveHandlerCallBack, ReceiveHandler); } } void ReceiveHandlerCallBack(IAsyncResult result) { try { (result.AsyncState as ReceiveDataHandler).EndInvoke(result); } catch (Exception e) { LogHelper.Debug(e.Message); } } internal void OnDataRecevied(SessionEventArgs arg) { if (DataRecevied != null) { this._memStream.SetLength(0); DataRecevied.Invoke(this, arg); } } internal void Close() { try { this._connectSocket.Close(); } catch (Exception ex) { LogHelper.Debug("关闭socket异常" + ex.ToString()); } if (this.Closed != null) { this.Closed(); } } internal Action Closed; internal Action<SocketSession, SessionEventArgs> DataRecevied; public void Dispose() { if (_memStream != null) { _memStream.Close(); _memStream.Dispose(); _memStream = null; } } public void Send(byte[] data) { try { if (this.Connection.Flag == SocketFlag.Busy) { this._connectSocket.Send(data); } } catch (Exception ex) { this.Close(); } } private void ReadPackage(byte[] data, int offset, SocketAsyncEventArgs e) { if (data == null || data.Length == 0) { return; } if (offset >= data.Length) { return; } if (offset == 0) { if (_memStream.Length > 0) { _memStream.Write(data, 0, data.Length); data = _memStream.ToArray(); } } //粘包处理 OnReceivedCallBack(data, offset, e); data = null; } private void OnReceivedCallBack(byte[] buffer, int offset, SocketAsyncEventArgs e) { byte[] data = this._protocol.OnDataReceivedCallBack(buffer, ref offset); if (offset == -1) { this.Close(); return; } if (data == null || data.Length == 0) { this._memStream.Write(buffer, offset, buffer.Length - offset); this.ReceiveAsync(e); return; } SessionEventArgs session_args = new SessionEventArgs(); session_args.Data = data; this.OnDataRecevied(session_args); if (offset < buffer.Length) { this.ReadPackageHandler.BeginInvoke(buffer, offset, e, ReadPackageCallBack, ReadPackageHandler); } else { this.ReceiveAsync(e); } data = null; } void ReadPackageCallBack(IAsyncResult result) { try { (result.AsyncState as ReceiveReadPackageHandler).EndInvoke(result); } catch (Exception ex) { LogHelper.Debug(ex.Message); } } }
细心的童鞋可以发现,在ReceiveAsync方法里面,接收数据的地方,当同步接收完成的时候,我们调用了一个异步委托ReceiveHandler.BeginInvoke。
在解析出一个独立的包,并且缓冲区的数据里面还有多余的包的时候,我们也调用了一个异步的委托ReadPackageHandler.BeginInvoke。
如果缓冲区比较大,比如我现在是8K,而单个包很小,客户端又发送比较频繁的时候。会导致在解析包的时候,形成一个短暂的递归。递归就会不停的压堆,资源得不到释放。
运行一段时间后,有可能导致OutOfMemoryException,如果一直是同步接收数据,在Receive的地方,也有可能形成一个递归。于是便采用了异步调用的方式。
3、因为socket属于无边界的,代码层面的每一次Send,并不是真正意义上的直接发送给服务器,而只是写到了缓冲区,由系统来决定什么时候发。如果客户 端发送非常频繁的情况下,就可能导致服务器从缓冲区取出来的包,是由多个包一起组成的。从缓冲区取出来的包,并不能保证是一个独立的应用层的包,需要按既定的协议来解析包。
我们先假定一个简单的协议,一个包的前4个字节,表明这个包内容的长度。代码如下:
public class DefaultProtocol : IProtocol { public byte[] OnDataReceivedCallBack(byte[] data, ref int offset) { int length = BitConverter.ToInt32(data, offset); int package_head = 4; int package_length = length + package_head; byte[] buffer = null; if (length > 0) { if (offset + package_length <= data.Length) { buffer = new byte[length]; Array.Copy(data, offset + package_head, buffer, 0, length); offset += package_length; } } else { offset = -1; } return buffer; } }
如果协议无法正常解析,则offset=-1,并关闭掉该链接。如果在解析完一个包以后,还有剩余的包, 于是在抛给应用层以后,便继续解析。如果单个包比较大,缓冲区一次放不下的时候,我们将数据暂时写入到内存流里面,然后将下一次接收到的数据,一并拿出来解析。
4、接收数据已经准备完毕以后,就需要将SocketConnection和SocketSession关联起来,代码如下:
public class AppServer : IAppServer { public delegate void DataRecevieHandler(SocketSession o, SessionEventArgs e); public delegate void NewConnectionHandler(SocketSession o, EventArgs e); public delegate void OnErrorHandler(Exception e); public event DataRecevieHandler DataRecevied; public event NewConnectionHandler NewConnected; public event OnErrorHandler OnError; private ISocketListener _listener; private SocketConnectionPool _connectPool; public AppServer(ServerConfig serverConfig) { this.AppConfig = serverConfig; if (this.AppProtocol == null) { this.AppProtocol = new DefaultProtocol(); } _connectPool = new SocketConnectionPool(this); _connectPool.Connected = OnConnected; _listener = new SocketListener(this.AppConfig); _listener.NewClientAccepted += new NewClientAcceptHandler(listener_NewClientAccepted); _listener.Error += new ErrorHandler(_listener_Error); } void OnDataRecevied(SocketSession session, SessionEventArgs e) { if (this.DataRecevied != null) { DataRecevied.BeginInvoke(session, e, DataReceviedCallBack, DataRecevied); } } public bool Start() { _connectPool.Init(); return _listener.Start(); } public void Stop() { _listener.Stop(); } void _listener_Error(ISocketListener listener, Exception e) { if (this.OnError != null) { this.OnError.Invoke(e); } } void listener_NewClientAccepted(ISocketListener listener, System.Net.Sockets.Socket client, object state) { _connectPool.Push(client); } public void OnConnected(System.Net.Sockets.Socket client, SocketConnection connect) { var session = new SocketSession(client, connect); session.DataRecevied = OnDataRecevied; connect.Initialise(session); if (NewConnected != null) { NewConnected.BeginInvoke(session, EventArgs.Empty, NewConnectedCallBack, NewConnected); } if (connect.RecevieEventArgs != null) { session.ReceiveAsync(connect.RecevieEventArgs); } } void DataReceviedCallBack(IAsyncResult result) { try { (result.AsyncState as DataRecevieHandler).EndInvoke(result); } catch (Exception e) { LogHelper.Debug(e.Message); } } void NewConnectedCallBack(IAsyncResult result) { try { (result.AsyncState as NewConnectionHandler).EndInvoke(result); } catch (Exception e) { LogHelper.Debug(e.Message); } } public ServerConfig AppConfig { get; set; } public IProtocol AppProtocol { get; set; } }
到这里,整个接收包的流程就结束了,但是发送的地方,我们发现是同步在发送,如果有特别需要的可以考虑写成异步方式,但我个人更倾向于,这一块留给应用层处理,在应用层写一个发送队列,然后有独立的线程来管理这个发送队列。
写自己的socket框架(二),布布扣,bubuko.com
原文:http://www.cnblogs.com/selfteam/p/3877172.html