当前位置: 移动技术网 > IT编程>开发语言>c# > 基于TCP异步Socket模型的介绍

基于TCP异步Socket模型的介绍

2019年07月18日  | 移动技术网IT编程  | 我要评论

tcp异步socket模型
c#的tcp异步socket模型是通过begin-end模式实现的。例如提供beginconnect、beginaccept、 beginsend 和 beginreceive等。

复制代码 代码如下:

iasyncresult beginaccept(asynccallback callback, object state);

asynccallback回调在函数执行完毕后执行。state对象被用于在执行函数和回调函数间传输信息。
复制代码 代码如下:

socket socket = new socket(
                  addressfamily.internetwork,
                  sockettype.stream,
                  protocoltype.tcp);
ipendpoint iep = new ipendpoint(ipaddress.any, 8888);
socket.bind(iep);
socket.listen(5);
socket.beginaccept (new asynccallback(callbackaccept), socket);

private void callbackaccept(iasyncresult iar)
{
  socket server = (socket)iar.asyncstate;
  socket client = server.endaccept(iar);
}


则在accept一个tcpclient,需要维护tcpclient列表。
复制代码 代码如下:

private list<tcpclientstate> clients;

异步tcp服务器完整实现
复制代码 代码如下:

/// <summary>
   /// 异步tcp服务器
   /// </summary>
   public class asynctcpserver : idisposable
   {
     #region fields

     private tcplistener listener;
     private list<tcpclientstate> clients;
     private bool disposed = false;

     #endregion

     #region ctors

     /// <summary>
     /// 异步tcp服务器
     /// </summary>
     /// <param name="listenport">监听的端口</param>
     public asynctcpserver(int listenport)
       : this(ipaddress.any, listenport)
     {
     }

     /// <summary>
     /// 异步tcp服务器
     /// </summary>
     /// <param name="localep">监听的终结点</param>
     public asynctcpserver(ipendpoint localep)
       : this(localep.address, localep.port)
     {
     }

     /// <summary>
     /// 异步tcp服务器
     /// </summary>
     /// <param name="localipaddress">监听的ip地址</param>
     /// <param name="listenport">监听的端口</param>
     public asynctcpserver(ipaddress localipaddress, int listenport)
     {
       address = localipaddress;
       port = listenport;
       this.encoding = encoding.default;

       clients = new list<tcpclientstate>();

       listener = new tcplistener(address, port);
       listener.allownattraversal(true);
     }

     #endregion

     #region properties

     /// <summary>
     /// 服务器是否正在运行
     /// </summary>
     public bool isrunning { get; private set; }
     /// <summary>
     /// 监听的ip地址
     /// </summary>
     public ipaddress address { get; private set; }
     /// <summary>
     /// 监听的端口
     /// </summary>
     public int port { get; private set; }
     /// <summary>
     /// 通信使用的编码
     /// </summary>
     public encoding encoding { get; set; }

     #endregion

     #region server

     /// <summary>
     /// 启动服务器
     /// </summary>
     /// <returns>异步tcp服务器</returns>
     public asynctcpserver start()
     {
       if (!isrunning)
       {
         isrunning = true;
         listener.start();
         listener.beginaccepttcpclient(
           new asynccallback(handletcpclientaccepted), listener);
       }
       return this;
     }

     /// <summary>
     /// 启动服务器
     /// </summary>
     /// <param name="backlog">
     /// 服务器所允许的挂起连接序列的最大长度
     /// </param>
     /// <returns>异步tcp服务器</returns>
     public asynctcpserver start(int backlog)
     {
       if (!isrunning)
       {
         isrunning = true;
         listener.start(backlog);
         listener.beginaccepttcpclient(
           new asynccallback(handletcpclientaccepted), listener);
       }
       return this;
     }

     /// <summary>
     /// 停止服务器
     /// </summary>
     /// <returns>异步tcp服务器</returns>
     public asynctcpserver stop()
     {
       if (isrunning)
       {
         isrunning = false;
         listener.stop();

         lock (this.clients)
         {
           for (int i = 0; i < this.clients.count; i++)
           {
             this.clients[i].tcpclient.client.disconnect(false);
           }
           this.clients.clear();
         }

       }
       return this;
     }

     #endregion

     #region receive

     private void handletcpclientaccepted(iasyncresult ar)
     {
       if (isrunning)
       {
         tcplistener tcplistener = (tcplistener)ar.asyncstate;

         tcpclient tcpclient = tcplistener.endaccepttcpclient(ar);
         byte[] buffer = new byte[tcpclient.receivebuffersize];

         tcpclientstate internalclient
           = new tcpclientstate(tcpclient, buffer);
         lock (this.clients)
         {
           this.clients.add(internalclient);
           raiseclientconnected(tcpclient);
         }

         networkstream networkstream = internalclient.networkstream;
         networkstream.beginread(
           internalclient.buffer,
           0,
           internalclient.buffer.length,
           handledatagramreceived,
           internalclient);

         tcplistener.beginaccepttcpclient(
           new asynccallback(handletcpclientaccepted), ar.asyncstate);
       }
     }

     private void handledatagramreceived(iasyncresult ar)
     {
       if (isrunning)
       {
         tcpclientstate internalclient = (tcpclientstate)ar.asyncstate;
         networkstream networkstream = internalclient.networkstream;

         int numberofreadbytes = 0;
         try
         {
           numberofreadbytes = networkstream.endread(ar);
         }
         catch
         {
           numberofreadbytes = 0;
         }

         if (numberofreadbytes == 0)
         {
           // connection has been closed
           lock (this.clients)
           {
             this.clients.remove(internalclient);
             raiseclientdisconnected(internalclient.tcpclient);
             return;
           }
         }

         // received byte and trigger event notification
         byte[] receivedbytes = new byte[numberofreadbytes];
         buffer.blockcopy(
           internalclient.buffer, 0,
           receivedbytes, 0, numberofreadbytes);
         raisedatagramreceived(internalclient.tcpclient, receivedbytes);
         raiseplaintextreceived(internalclient.tcpclient, receivedbytes);

         // continue listening for tcp datagram packets
         networkstream.beginread(
           internalclient.buffer,
           0,
           internalclient.buffer.length,
           handledatagramreceived,
           internalclient);
       }
     }

     #endregion

     #region events

     /// <summary>
     /// 接收到数据报文事件
     /// </summary>
     public event eventhandler<tcpdatagramreceivedeventargs<byte[]>> datagramreceived;
     /// <summary>
     /// 接收到数据报文明文事件
     /// </summary>
     public event eventhandler<tcpdatagramreceivedeventargs<string>> plaintextreceived;

     private void raisedatagramreceived(tcpclient sender, byte[] datagram)
     {
       if (datagramreceived != null)
       {
         datagramreceived(this, new tcpdatagramreceivedeventargs<byte[]>(sender, datagram));
       }
     }

     private void raiseplaintextreceived(tcpclient sender, byte[] datagram)
     {
       if (plaintextreceived != null)
       {
         plaintextreceived(this, new tcpdatagramreceivedeventargs<string>(
           sender, this.encoding.getstring(datagram, 0, datagram.length)));
       }
     }

     /// <summary>
     /// 与客户端的连接已建立事件
     /// </summary>
     public event eventhandler<tcpclientconnectedeventargs> clientconnected;
     /// <summary>
     /// 与客户端的连接已断开事件
     /// </summary>
     public event eventhandler<tcpclientdisconnectedeventargs> clientdisconnected;

     private void raiseclientconnected(tcpclient tcpclient)
     {
       if (clientconnected != null)
       {
         clientconnected(this, new tcpclientconnectedeventargs(tcpclient));
       }
     }

     private void raiseclientdisconnected(tcpclient tcpclient)
     {
       if (clientdisconnected != null)
       {
         clientdisconnected(this, new tcpclientdisconnectedeventargs(tcpclient));
       }
     }

     #endregion

     #region send

     /// <summary>
     /// 发送报文至指定的客户端
     /// </summary>
     /// <param name="tcpclient">客户端</param>
     /// <param name="datagram">报文</param>
     public void send(tcpclient tcpclient, byte[] datagram)
     {
       if (!isrunning)
         throw new invalidprogramexception("this tcp server has not been started.");

       if (tcpclient == null)
         throw new argumentnullexception("tcpclient");

       if (datagram == null)
         throw new argumentnullexception("datagram");

       tcpclient.getstream().beginwrite(
         datagram, 0, datagram.length, handledatagramwritten, tcpclient);
     }

     private void handledatagramwritten(iasyncresult ar)
     {
       ((tcpclient)ar.asyncstate).getstream().endwrite(ar);
     }

     /// <summary>
     /// 发送报文至指定的客户端
     /// </summary>
     /// <param name="tcpclient">客户端</param>
     /// <param name="datagram">报文</param>
     public void send(tcpclient tcpclient, string datagram)
     {
       send(tcpclient, this.encoding.getbytes(datagram));
     }

     /// <summary>
     /// 发送报文至所有客户端
     /// </summary>
     /// <param name="datagram">报文</param>
     public void sendall(byte[] datagram)
     {
       if (!isrunning)
         throw new invalidprogramexception("this tcp server has not been started.");

       for (int i = 0; i < this.clients.count; i++)
       {
         send(this.clients[i].tcpclient, datagram);
       }
     }

     /// <summary>
     /// 发送报文至所有客户端
     /// </summary>
     /// <param name="datagram">报文</param>
     public void sendall(string datagram)
     {
       if (!isrunning)
         throw new invalidprogramexception("this tcp server has not been started.");

       sendall(this.encoding.getbytes(datagram));
     }

     #endregion

     #region idisposable members

     /// <summary>
     /// performs application-defined tasks associated with freeing,
     /// releasing, or resetting unmanaged resources.
     /// </summary>
     public void dispose()
     {
       dispose(true);
       gc.suppressfinalize(this);
     }

     /// <summary>
     /// releases unmanaged and - optionally - managed resources
     /// </summary>
     /// <param name="disposing"><c>true</c> to release
     /// both managed and unmanaged resources; <c>false</c>
     /// to release only unmanaged resources.</param>
     protected virtual void dispose(bool disposing)
     {
       if (!this.disposed)
       {
         if (disposing)
         {
           try
           {
             stop();

             if (listener != null)
             {
               listener = null;
             }
           }
           catch (socketexception ex)
           {
             exceptionhandler.handle(ex);
           }
         }

         disposed = true;
       }
     }

     #endregion
   }

使用举例
复制代码 代码如下:

class program
   {
     static asynctcpserver server;

     static void main(string[] args)
     {
       logfactory.assign(new consolelogfactory());

       server = new asynctcpserver(9999);
       server.encoding = encoding.utf8;
       server.clientconnected +=
         new eventhandler<tcpclientconnectedeventargs>(server_clientconnected);
       server.clientdisconnected +=
         new eventhandler<tcpclientdisconnectedeventargs>(server_clientdisconnected);
       server.plaintextreceived +=
         new eventhandler<tcpdatagramreceivedeventargs<string>>(server_plaintextreceived);
       server.start();

       console.writeline("tcp server has been started.");
       console.writeline("type something to send to client...");
       while (true)
       {
         string text = console.readline();
         server.sendall(text);
       }
     }

     static void server_clientconnected(object sender, tcpclientconnectedeventargs e)
     {
       logger.debug(string.format(cultureinfo.invariantculture,
         "tcp client {0} has connected.",
         e.tcpclient.client.remoteendpoint.tostring()));
     }

     static void server_clientdisconnected(object sender, tcpclientdisconnectedeventargs e)
     {
       logger.debug(string.format(cultureinfo.invariantculture,
         "tcp client {0} has disconnected.",
         e.tcpclient.client.remoteendpoint.tostring()));
     }

     static void server_plaintextreceived(object sender, tcpdatagramreceivedeventargs<string> e)
     {
       if (e.datagram != "received")
       {
         console.write(string.format("client : {0} --> ",
           e.tcpclient.client.remoteendpoint.tostring()));
         console.writeline(string.format("{0}", e.datagram));
         server.send(e.tcpclient, "server has received you text : " + e.datagram);
       }
     }
   }

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网