当前位置: 移动技术网 > IT编程>开发语言>c# > C#中一个高性能异步socket封装库的实现思路分享

C#中一个高性能异步socket封装库的实现思路分享

2019年07月18日  | 移动技术网IT编程  | 我要评论
前言 socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。 异步通讯实际是利用windows完成端口(iocp)

前言

socket是软件之间通讯最常用的一种方式。c#实现socket通讯有很多中方法,其中效率最高就是异步通讯。

异步通讯实际是利用windows完成端口(iocp)来处理的,关于完成端口实现原理,大家可以参考网上文章。

我这里想强调的是采用完成端口机制的异步通讯是windows下效率最高的通讯方式,没有之一!

异步通讯比同步通讯处理要难很多,代码编写中会遇到许多“坑“。如果没有经验,很难完成。

我搜集了大量资料,完成了对异步socket的封装。此库已用稳定高效的运行几个月。

纵观网上的资料,我还没有遇到一个满意的封装库。许多文章把数据收发和协议处理杂糅在一块,代码非常难懂,也无法扩展。

在编写该库时,避免以上缺陷。将逻辑处理层次化,模块化!同时实现了高可用性与高性能。

为了使大家对通讯效率有初步了解,先看测试图。

主机配置情况

百兆带宽基本占满,cpu占用40%,我的电脑在空闲时,cpu占用大概20%,也就是说程序占用cpu 20%左右。

这个库是可扩展的,就是说即使10万个连接,收发同样的数据,cpu占用基本相同。

库的结构图

目标

即可作为服务端(监听)也可以作为客户端(主动连接)使用。

可以适应任何网络协议。收发的数据针对字节流或一个完整的包。对协议内容不做处理。

高可用性。将复杂的底层处理封装,对外接口非常友好。

高性能。最大限度优化处理。单机可支持数万连接,收发速度可达几百兆bit。

实现思路

网络处理逻辑可以分为以下几个部分:

网络监听 可以在多个端口实现监听。负责生成socket,生成的socket供后续处理。监听模块功能比较单一,如有必要,可对监听模块做进一步优化。

主动连接 可以异步或同步的连接对方。连接成功后,对socket的后续处理,与监听得到的socket完全一样。注:无论是监听得到的socket,还是连接得到的socket,后续处理完全一样。

socket收发处理 每个socket对应一个收发实例,socket收发只针对字节流处理。收发时,做了优化。比如发送时,对数据做了沾包,提高发送性能;接收时,一次投递1k的数据。

组包处理 一般数据包都有包长度指示;比如 报头的前俩个字节表示长度,根据这个值就可以组成一个完整的包。

netlistener 监听

using system;
using system.net;
using system.net.sockets;
using system.threading;
 
namespace iocpcore
{
 class netlistener
 {
  private socket listensocket;
  public listenparam _listenparam { get; set; }
  public event action<listenparam, asyncsocketclient> onacceptsocket;
 
  bool start;
 
  netserver _netserver;
  public netlistener(netserver netserver)
  {
   _netserver = netserver;
  }
 
  public int _acceptasynccount = 0;
  public bool startlisten()
  {
   try
   {
    start = true;
    ipendpoint listenpoint = new ipendpoint(ipaddress.parse("0.0.0.0"), _listenparam._port);
    listensocket = new socket(listenpoint.addressfamily, sockettype.stream, protocoltype.tcp);
    listensocket.bind(listenpoint);
    listensocket.listen(200);
 
    thread thread1 = new thread(new threadstart(netprocess));
    thread1.start();
    
    startaccept();
    return true;
   }
   catch (exception ex)
   {
    netlogger.log(string.format("**监听异常!{0}", ex.message));
    return false;
   }
  }
 
  autoresetevent _acceptevent = new autoresetevent(false);
  private void netprocess()
  {
   while (start)
   {
    dealnewaccept();
    _acceptevent.waitone(1000 * 10);
   }
  }
 
  private void dealnewaccept()
  {
   try
   {
    if(_acceptasynccount <= 10)
    {
     startaccept();
    }
 
    while (true)
    {
     asyncsocketclient client = _newsocketclientlist.getobj();
     if (client == null)
      break;
 
     dealnewaccept(client);
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("dealnewaccept 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }
 
  private void dealnewaccept(asyncsocketclient client)
  {
   client.sendbufferbytecount = _netserver.sendbufferbyteperclient;
   onacceptsocket?.invoke(_listenparam, client);
  }
 
  private void accepteventarg_completed(object sender, socketasynceventargs accepteventargs)
  {
   try
   {
    interlocked.decrement(ref _acceptasynccount);
    _acceptevent.set();
    accepteventargs.completed -= accepteventarg_completed;
    processaccept(accepteventargs);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("accepteventarg_completed {0}***{1}", ex.message, ex.stacktrace));
   }
  }
 
  public bool startaccept()
  {
   socketasynceventargs accepteventargs = new socketasynceventargs();
   accepteventargs.completed += accepteventarg_completed;
 
   bool willraiseevent = listensocket.acceptasync(accepteventargs);
   interlocked.increment(ref _acceptasynccount);
 
   if (!willraiseevent)
   {
    interlocked.decrement(ref _acceptasynccount);
    _acceptevent.set();
    accepteventargs.completed -= accepteventarg_completed;
    processaccept(accepteventargs);
   }
   return true;
  }
 
  objectpool<asyncsocketclient> _newsocketclientlist = new objectpool<asyncsocketclient>();
  private void processaccept(socketasynceventargs accepteventargs)
  {
   try
   {
    using (accepteventargs)
    {
     if (accepteventargs.acceptsocket != null)
     {
      asyncsocketclient client = new asyncsocketclient(accepteventargs.acceptsocket);
      client.createclientinfo(this);
 
      _newsocketclientlist.putobj(client);
      _acceptevent.set();
     }
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("processaccept {0}***{1}", ex.message, ex.stacktrace));
   }
  }
 }
}

netconnectmanage连接处理

using system;
using system.net;
using system.net.sockets;

namespace iocpcore
{
 class netconnectmanage
 {
  public event action<socketeventparam, asyncsocketclient> onsocketconnectevent;

  public bool connectasyn(string peerip, int peerport, object tag)
  {
   try
   {
    socket socket = new socket(sockettype.stream, protocoltype.tcp);
    socketasynceventargs socketeventargs = new socketasynceventargs();
    socketeventargs.remoteendpoint = new ipendpoint(ipaddress.parse(peerip), peerport);
    socketeventargs.completed += socketconnect_completed;

    socketclientinfo clientinfo = new socketclientinfo();
    socketeventargs.usertoken = clientinfo;
    clientinfo.peerip = peerip;
    clientinfo.peerport = peerport;
    clientinfo.tag = tag;

    bool willraiseevent = socket.connectasync(socketeventargs);
    if (!willraiseevent)
    {
     processconnect(socketeventargs);
     socketeventargs.completed -= socketconnect_completed;
     socketeventargs.dispose();
    }
    return true;
   }
   catch (exception ex)
   {
    netlogger.log("connectasyn",ex);
    return false;
   }
  }

  private void socketconnect_completed(object sender, socketasynceventargs socketeventargs)
  {
   processconnect(socketeventargs);
   socketeventargs.completed -= socketconnect_completed;
   socketeventargs.dispose();
  }

  private void processconnect(socketasynceventargs socketeventargs)
  {
   socketclientinfo clientinfo = socketeventargs.usertoken as socketclientinfo;
   if (socketeventargs.socketerror == socketerror.success)
   {
    dealconnectsocket(socketeventargs.connectsocket, clientinfo);
   }
   else
   {
    socketeventparam socketparam = new socketeventparam(en_socketevent.connect, null);
    socketparam.clientinfo = clientinfo;
    onsocketconnectevent?.invoke(socketparam, null);
   }
  }


  void dealconnectsocket(socket socket, socketclientinfo clientinfo)
  {
   clientinfo.setclientinfo(socket);

   asyncsocketclient client = new asyncsocketclient(socket);
   client.setclientinfo(clientinfo);

   //触发事件
   socketeventparam socketparam = new socketeventparam(en_socketevent.connect, socket);
   socketparam.clientinfo = clientinfo;
   onsocketconnectevent?.invoke(socketparam, client);
  }

  public bool connect(string peerip, int peerport, object tag, out socket socket)
  {
   socket = null;
   try
   {
    socket sockettmp = new socket(sockettype.stream, protocoltype.tcp);

    socketclientinfo clientinfo = new socketclientinfo();
    clientinfo.peerip = peerip;
    clientinfo.peerport = peerport;
    clientinfo.tag = tag;

    endpoint remoteep = new ipendpoint(ipaddress.parse(peerip), peerport);
    sockettmp.connect(remoteep);
    if (!sockettmp.connected)
     return false;

    dealconnectsocket(sockettmp, clientinfo);
    socket = sockettmp;
    return true;
   }
   catch (exception ex)
   {
    netlogger.log(string.format("连接对方:({0}:{1})出错!", peerip, peerport), ex);
    return false;
   }
  }
 }
}

asyncsocketclient socket收发处理

using system;
using system.collections.generic;
using system.diagnostics;
using system.net;
using system.net.sockets;

namespace iocpcore
{
 public class asyncsocketclient
 {
  public static int iocpreadlen = 1024;

  public readonly socket connectsocket;

  protected socketasynceventargs m_receiveeventargs;
  public socketasynceventargs receiveeventargs { get { return m_receiveeventargs; } set { m_receiveeventargs = value; } }
  protected byte[] m_asyncreceivebuffer;

  protected socketasynceventargs m_sendeventargs;
  public socketasynceventargs sendeventargs { get { return m_sendeventargs; } set { m_sendeventargs = value; } }
  protected byte[] m_asyncsendbuffer;

  public event action<asyncsocketclient, byte[]> onreaddata;
  public event action<asyncsocketclient, int> onsenddata;
  public event action<asyncsocketclient> onsocketclose;

  static object releaselock = new object();
  public static int createcount = 0;
  public static int releasecount = 0;

  ~asyncsocketclient()
  {
   lock (releaselock)
   {
    releasecount++;
   }
  }

  public asyncsocketclient(socket socket)
  {
   lock (releaselock)
   {
    createcount++;
   }

   connectsocket = socket;

   m_receiveeventargs = new socketasynceventargs();
   m_asyncreceivebuffer = new byte[iocpreadlen];
   m_receiveeventargs.acceptsocket = connectsocket;
   m_receiveeventargs.completed += receiveeventargs_completed;

   m_sendeventargs = new socketasynceventargs();
   m_asyncsendbuffer = new byte[iocpreadlen * 2];
   m_sendeventargs.acceptsocket = connectsocket;
   m_sendeventargs.completed += sendeventargs_completed;
  }

  socketclientinfo _clientinfo;

  public socketclientinfo clientinfo
  {
   get
   {
    return _clientinfo;
   }
  }

  internal void createclientinfo(netlistener netlistener)
  {
   _clientinfo = new socketclientinfo();
   try
   {
    _clientinfo.tag = netlistener._listenparam._tag;
    ipendpoint ip = connectsocket.localendpoint as ipendpoint;
    debug.assert(netlistener._listenparam._port == ip.port);

    _clientinfo.localip = ip.address.tostring();
    _clientinfo.localport = netlistener._listenparam._port;

    ip = connectsocket.remoteendpoint as ipendpoint;
    _clientinfo.peerip = ip.address.tostring();
    _clientinfo.peerport = ip.port;
   }
   catch (exception ex)
   {
    netlogger.log("createclientinfo", ex);
   }
  }
  internal void setclientinfo(socketclientinfo clientinfo)
  {
   _clientinfo = clientinfo;
  }

  #region read process
  bool _inreadpending = false;
  public en_socketreadresult readnextdata()
  {
   lock (this)
   {
    if (_socketerror)
     return en_socketreadresult.readerror;
    if (_inreadpending)
     return en_socketreadresult.inasyn;
    if(!connectsocket.connected)
    {
     onreaderror();
     return en_socketreadresult.readerror;
    }

    try
    {
     m_receiveeventargs.setbuffer(m_asyncreceivebuffer, 0, m_asyncreceivebuffer.length);
     _inreadpending = true;
     bool willraiseevent = connectsocket.receiveasync(receiveeventargs); //投递接收请求
     if (!willraiseevent)
     {
      _inreadpending = false;
      processreceive();
      if (_socketerror)
      {
       onreaderror();
       return en_socketreadresult.readerror;
      }
      return en_socketreadresult.haveread;
     }
     else
     {
      return en_socketreadresult.inasyn;
     }
    }
    catch (exception ex)
    {
     netlogger.log("readnextdata", ex);
     _inreadpending = false;
     onreaderror();
     return en_socketreadresult.readerror;
    }
   }
  }

  private void processreceive()
  {
   if (receiveeventargs.bytestransferred > 0
    && receiveeventargs.socketerror == socketerror.success)
   {
    int offset = receiveeventargs.offset;
    int count = receiveeventargs.bytestransferred;

    byte[] readdata = new byte[count];
    array.copy(m_asyncreceivebuffer, offset, readdata, 0, count);

    _inreadpending = false;
    if (!_socketerror)
     onreaddata?.invoke(this, readdata);
   }
   else
   {
    _inreadpending = false;
    onreaderror();
   }
  }

  private void receiveeventargs_completed(object sender, socketasynceventargs e)
  {
   lock (this)
   {
    _inreadpending = false;
    processreceive();
    if (_socketerror)
    {
     onreaderror();
    }
   }
  }

  bool _socketerror = false;
  private void onreaderror()
  {
   lock (this)
   {
    if (_socketerror == false)
    {
     _socketerror = true;
     onsocketclose?.invoke(this);
    }
    closeclient();
   }
  }
  #endregion

  #region send process
  int _sendbufferbytecount = 102400;
  public int sendbufferbytecount
  {
   get
   {
    return _sendbufferbytecount;
   }
   set
   {
    if (value < 1024)
    {
     _sendbufferbytecount = 1024;
    }
    else
    {
     _sendbufferbytecount = value;
    }
   }
  }

  sendbufferpool _senddatapool = new sendbufferpool();
  internal en_senddataresult putsenddata(byte[] data)
  {
   if (_socketerror)
    return en_senddataresult.no_client;

   if (_senddatapool._bufferbytecount >= _sendbufferbytecount)
   {
    return en_senddataresult.buffer_overflow;
   }

   if (data.length <= iocpreadlen)
   {
    _senddatapool.putobj(data);
   }
   else
   {
    list<byte[]> dataitems = splitdata(data, iocpreadlen);
    foreach (byte[] item in dataitems)
    {
     _senddatapool.putobj(item);
    }
   }

   return en_senddataresult.ok;
  }

  bool _insendpending = false;
  public en_socketsendresult sendnextdata()
  {
   lock (this)
   {
    if (_socketerror)
    {
     return en_socketsendresult.senderror;
    }

    if (_insendpending)
    {
     return en_socketsendresult.inasyn;
    }

    int sendbytecount = getsenddata();
    if (sendbytecount == 0)
    {
     return en_socketsendresult.nosenddata;
    }

    //防止抛出异常,否则影响性能
    if (!connectsocket.connected)
    {
     onsenderror();
     return en_socketsendresult.senderror;
    }

    try
    {
     m_sendeventargs.setbuffer(m_asyncsendbuffer, 0, sendbytecount);
     _insendpending = true;
     bool willraiseevent = connectsocket.sendasync(m_sendeventargs);
     if (!willraiseevent)
     {
      _insendpending = false;
      processsend(m_sendeventargs);
      if (_socketerror)
      {
       onsenderror();
       return en_socketsendresult.senderror;
      }
      else
      {
       onsenddata?.invoke(this, sendbytecount);
       //继续发下一条
       return en_socketsendresult.havesend;
      }
     }
     else
     {
      return en_socketsendresult.inasyn;
     }
    }
    catch (exception ex)
    {
     netlogger.log("sendnextdata", ex);
     _insendpending = false;
     onsenderror();
     return en_socketsendresult.senderror;
    }
   }
  }

  private void sendeventargs_completed(object sender, socketasynceventargs sendeventargs)
  {
   lock (this)
   {
    try
    {
     _insendpending = false;
     processsend(m_sendeventargs);

     int sendcount = 0;
     if (sendeventargs.socketerror == socketerror.success)
     {
      sendcount = sendeventargs.bytestransferred;
     }
     onsenddata?.invoke(this, sendcount);

     if (_socketerror)
     {
      onsenderror();
     }
    }
    catch (exception ex)
    {
     netlogger.log("sendeventargs_completed", ex);
    }
   }
  }

  private bool processsend(socketasynceventargs sendeventargs)
  {
   if (sendeventargs.socketerror == socketerror.success)
   {
    return true;
   }
   else
   {
    onsenderror();
    return false;
   }
  }

  private int getsenddata()
  {
   int datalen = 0;
   while (true)
   {
    byte[] data = _senddatapool.getobj();
    if (data == null)
     return datalen;
    array.copy(data, 0, m_asyncsendbuffer, datalen, data.length);
    datalen += data.length;
    if (datalen > iocpreadlen)
     break;
   }
   return datalen;
  }
  private void onsenderror()
  {
   lock (this)
   {
    if (_socketerror == false)
    {
     _socketerror = true;
     onsocketclose?.invoke(this);
    }
    closeclient();
   }
  }
  #endregion

  internal void closesocket()
  {
   try
   {
    connectsocket.close();
   }
   catch (exception ex)
   {
    netlogger.log("closesocket", ex);
   }
  }

  static object socketcloselock = new object();
  public static int closesendcount = 0;
  public static int closereadcount = 0;

  bool _disposesend = false;
  void closesend()
  {
   if (!_disposesend && !_insendpending)
   {
    lock (socketcloselock)
     closesendcount++;

    _disposesend = true;
    m_sendeventargs.setbuffer(null, 0, 0);
    m_sendeventargs.completed -= sendeventargs_completed;
    m_sendeventargs.dispose();
   }
  }

  bool _disposeread = false;
  void closeread()
  {
   if (!_disposeread && !_inreadpending)
   {
    lock (socketcloselock)
     closereadcount++;

    _disposeread = true;
    m_receiveeventargs.setbuffer(null, 0, 0);
    m_receiveeventargs.completed -= receiveeventargs_completed;
    m_receiveeventargs.dispose();
   }
  }
  private void closeclient()
  {
   try
   {
    closesend();
    closeread();
    connectsocket.close();
   }
   catch (exception ex)
   {
    netlogger.log("closeclient", ex);
   }
  }

  //发送缓冲大小
  private list<byte[]> splitdata(byte[] data, int maxlen)
  {
   list<byte[]> items = new list<byte[]>();

   int start = 0;
   while (true)
   {
    int itemlen = math.min(maxlen, data.length - start);
    if (itemlen == 0)
     break;
    byte[] item = new byte[itemlen];
    array.copy(data, start, item, 0, itemlen);
    items.add(item);

    start += itemlen;
   }
   return items;
  }
 }

 public enum en_socketreadresult
 {
  inasyn,
  haveread,
  readerror
 }

 public enum en_socketsendresult
 {
  inasyn,
  havesend,
  nosenddata,
  senderror
 }

 class sendbufferpool
 {
  objectpool<byte[]> _bufferpool = new objectpool<byte[]>();

  public int64 _bufferbytecount = 0;
  public bool putobj(byte[] obj)
  {
   if (_bufferpool.putobj(obj))
   {
    lock (this)
    {
     _bufferbytecount += obj.length;
    }
    return true;
   }
   else
   {
    return false;
   }
  }

  public byte[] getobj()
  {
   byte[] result = _bufferpool.getobj();
   if (result != null)
   {
    lock (this)
    {
     _bufferbytecount -= result.length;
    }
   }
   return result;
  }
 }
}

netserver 聚合其他类

using system;
using system.collections.generic;
using system.diagnostics;
using system.linq;
using system.net.sockets;
using system.threading;

namespace iocpcore
{
 public class netserver
 {
  public action<socketeventparam> onsocketpacketevent;

  //每个连接发送缓冲大小
  public int sendbufferbyteperclient { get; set; } = 1024 * 100;

  bool _serverstart = false;
  list<netlistener> _listlistener = new list<netlistener>();

  //负责对收到的字节流 组成完成的包
  clientpacketmanage _clientpacketmanage;

  public int64 sendbytecount { get; set; }
  public int64 readbytecount { get; set; }

  list<listenparam> _listlistenport = new list<listenparam>();
  public void addlistenport(int port, object tag)
  {
   _listlistenport.add(new listenparam(port, tag));
  }
  /// <summary>
  /// 
  /// </summary>
  /// <param name="listenfault">监听失败的端口</param>
  /// <returns></returns>
  public bool startlisten(out list<int> listenfault)
  {
   _serverstart = true;

   _clientpacketmanage = new clientpacketmanage(this);
   _clientpacketmanage.onsocketpacketevent += putclientpacket;

   _netconnectmanage.onsocketconnectevent += socketconnectevent;

   _listlistener.clear();
   thread thread1 = new thread(new threadstart(netpacketprocess));
   thread1.start();

   thread thread2 = new thread(new threadstart(netsendprocess));
   thread2.start();

   thread thread3 = new thread(new threadstart(netreadprocess));
   thread3.start();

   listenfault = new list<int>();
   foreach (listenparam param in _listlistenport)
   {
    netlistener listener = new netlistener(this);
    listener._listenparam = param;
    listener.onacceptsocket += listener_onacceptsocket;
    if (!listener.startlisten())
    {
     listenfault.add(param._port);
    }
    else
    {
     _listlistener.add(listener);
     netlogger.log(string.format("监听成功!端口:{0}", param._port));
    }
   }

   return listenfault.count == 0;
  }

  public void putclientpacket(socketeventparam param)
  {
   onsocketpacketevent?.invoke(param);
  }

  //获取包的最小长度
  int _packetminlen;
  int _packetmaxlen;
  public int packetminlen
  {
   get { return _packetminlen; }
  }
  public int packetmaxlen
  {
   get { return _packetmaxlen; }
  }

  /// <summary>
  /// 设置包的最小和最大长度
  /// 当minlen=0时,认为是接收字节流
  /// </summary>
  /// <param name="minlen"></param>
  /// <param name="maxlen"></param>
  public void setpacketparam(int minlen, int maxlen)
  {
   debug.assert(minlen >= 0);
   debug.assert(maxlen > minlen);
   _packetminlen = minlen;
   _packetmaxlen = maxlen;
  }

  //获取包的总长度
  public delegate int delegate_getpackettotallen(byte[] data, int offset);
  public delegate_getpackettotallen getpackettotallen_callback;

  objectpoolwithevent<socketeventparam> _socketeventpool = new objectpoolwithevent<socketeventparam>();
  private void netpacketprocess()
  {
   while (_serverstart)
   {
    try
    {
     dealeventpool();
    }
    catch (exception ex)
    {
     netlogger.log(string.format("dealeventpool 异常 {0}***{1}", ex.message, ex.stacktrace));
    }
    _socketeventpool.waitone(1000);
   }
  }

  dictionary<socket, asyncsocketclient> _clientgroup = new dictionary<socket, asyncsocketclient>();
  public int clientcount
  {
   get
   {
    lock (_clientgroup)
    {
     return _clientgroup.count;
    }
   }
  }
  public list<socket> clientlist
  {
   get
   {
    lock (_clientgroup)
    {
     return _clientgroup.keys.tolist();
    }
   }
  }

  private void dealeventpool()
  {
   while (true)
   {
    socketeventparam param = _socketeventpool.getobj();
    if (param == null)
     return;

    if (param.socketevent == en_socketevent.close)
    {
     lock (_clientgroup)
     {
      _clientgroup.remove(param.socket);
     }
    }

    if (_packetminlen == 0)//字节流处理
    {
     onsocketpacketevent?.invoke(param);
    }
    else
    {
     //组成一个完整的包 逻辑
     _clientpacketmanage.putsocketparam(param);
    }
   }
  }

  private void socketconnectevent(socketeventparam param, asyncsocketclient client)
  {
   try
   {
    if (param.socket == null || client == null) //连接失败
    {
     
    }
    else
    {
     lock (_clientgroup)
     {
      bool remove = _clientgroup.remove(client.connectsocket);
      debug.assert(!remove);
      _clientgroup.add(client.connectsocket, client);
     }

     client.onsocketclose += client_onsocketclose;
     client.onreaddata += client_onreaddata;
     client.onsenddata += client_onsenddata;

     _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));
    }
    _socketeventpool.putobj(param);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("socketconnectevent 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }

  internal void onrcvpacketlenerror(socket socket, byte[] buffer, int offset, int packetlen)
  {
   try
   {
    lock (_clientgroup)
    {
     if (!_clientgroup.containskey(socket))
     {
      debug.assert(false);
      return;
     }

     netlogger.log(string.format("报长度异常!包长:{0}", packetlen));
     asyncsocketclient client = _clientgroup[socket];
     client.closesocket();
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("onrcvpacketlenerror 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }

  #region listen port
  private void listener_onacceptsocket(listenparam listenpatam, asyncsocketclient client)
  {
   try
   {
    lock (_clientgroup)
    {
     bool remove = _clientgroup.remove(client.connectsocket);
     debug.assert(!remove);
     _clientgroup.add(client.connectsocket, client);
    }

    client.onsocketclose += client_onsocketclose;
    client.onreaddata += client_onreaddata;
    client.onsenddata += client_onsenddata;

    _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));

    socketeventparam param = new socketeventparam(en_socketevent.accept, client.connectsocket);
    param.clientinfo = client.clientinfo;

    _socketeventpool.putobj(param);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("listener_onacceptsocket 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }


  objectpoolwithevent<socketeventdeal> _listsendevent = new objectpoolwithevent<socketeventdeal>();
  private void netsendprocess()
  {
   while (true)
   {
    dealsendevent();
    _listsendevent.waitone(1000);
   }
  }

  objectpoolwithevent<socketeventdeal> _listreadevent = new objectpoolwithevent<socketeventdeal>();
  private void netreadprocess()
  {
   while (true)
   {
    dealreadevent();
    _listreadevent.waitone(1000);
   }
  }

  
  private void dealsendevent()
  {
   while (true)
   {
    socketeventdeal item = _listsendevent.getobj();
    if (item == null)
     break;
    switch (item.socketevent)
    {
     case en_socketdealevent.send:
      {
       while (true)
       {
        en_socketsendresult result = item.client.sendnextdata();
        if (result == en_socketsendresult.havesend)
         continue;
        else
         break;
       }
      }
      break;
     case en_socketdealevent.read:
      {
       debug.assert(false);
      }
      break;     
    }
   }
  }

  private void dealreadevent()
  {
   while (true)
   {
    socketeventdeal item = _listreadevent.getobj();
    if (item == null)
     break;
    switch (item.socketevent)
    {
     case en_socketdealevent.read:
      {
       while (true)
       {
        en_socketreadresult result = item.client.readnextdata();
        if (result == en_socketreadresult.haveread)
         continue;
        else
         break;
       }
      }
      break;
     case en_socketdealevent.send:
      {
       debug.assert(false);
      }
      break;
    }
   }
  }

  private void client_onreaddata(asyncsocketclient client, byte[] readdata)
  {
   //读下一条
   _listreadevent.putobj(new socketeventdeal(client, en_socketdealevent.read));

   try
   {
    socketeventparam param = new socketeventparam(en_socketevent.read, client.connectsocket);
    param.clientinfo = client.clientinfo;
    param.data = readdata;
    _socketeventpool.putobj(param);

    lock (this)
    {
     readbytecount += readdata.length;
    }
   }
   catch (exception ex)
   {
    netlogger.log(string.format("client_onreaddata 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }
#endregion

  private void client_onsenddata(asyncsocketclient client, int sendcount)
  {
   //发送下一条
   _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));
   lock (this)
   {
    sendbytecount += sendcount;
   }
  }

  private void client_onsocketclose(asyncsocketclient client)
  {
   try
   {
    socketeventparam param = new socketeventparam(en_socketevent.close, client.connectsocket);
    param.clientinfo = client.clientinfo;
    _socketeventpool.putobj(param);
   }
   catch (exception ex)
   {
    netlogger.log(string.format("client_onsocketclose 异常 {0}***{1}", ex.message, ex.stacktrace));
   }
  }

  /// <summary>
  /// 放到发送缓冲
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="data"></param>
  /// <returns></returns>
  public en_senddataresult senddata(socket socket, byte[] data)
  {
   if (socket == null)
    return en_senddataresult.no_client;
   lock (_clientgroup)
   {
    if (!_clientgroup.containskey(socket))
     return en_senddataresult.no_client;
    asyncsocketclient client = _clientgroup[socket];
    en_senddataresult result = client.putsenddata(data);
    if (result == en_senddataresult.ok)
    {
     //发送下一条
     _listsendevent.putobj(new socketeventdeal(client, en_socketdealevent.send));     
    }
    return result;
   }
  }

  /// <summary>
  /// 设置某个连接的发送缓冲大小
  /// </summary>
  /// <param name="socket"></param>
  /// <param name="bytecount"></param>
  /// <returns></returns>
  public bool setclientsendbuffer(socket socket, int bytecount)
  {
   lock (_clientgroup)
   {
    if (!_clientgroup.containskey(socket))
     return false;
    asyncsocketclient client = _clientgroup[socket];
    client.sendbufferbytecount = bytecount;
    return true;
   }
  }


  #region connect process
  netconnectmanage _netconnectmanage = new netconnectmanage();
  /// <summary>
  /// 异步连接一个客户端
  /// </summary>
  /// <param name="peerip"></param>
  /// <param name="peerport"></param>
  /// <param name="tag"></param>
  /// <returns></returns>
  public bool connectasyn(string peerip, int peerport, object tag)
  {
   return _netconnectmanage.connectasyn(peerip, peerport, tag);
  }

  /// <summary>
  /// 同步连接一个客户端
  /// </summary>
  /// <param name="peerip"></param>
  /// <param name="peerport"></param>
  /// <param name="tag"></param>
  /// <param name="socket"></param>
  /// <returns></returns>
  public bool connect(string peerip, int peerport, object tag, out socket socket)
  {
   return _netconnectmanage.connect(peerip, peerport, tag, out socket);
  }
  #endregion
 }

 enum en_socketdealevent
 {
  read,
  send,
 }
 class socketeventdeal
 {
  public asyncsocketclient client { get; set; }
  public en_socketdealevent socketevent { get; set; }
  public socketeventdeal(asyncsocketclient client, en_socketdealevent socketevent)
  {
   client = client;
   socketevent = socketevent;
  }
 }
}

库的使用

使用起来非常简单,示例如下

using iocpcore;
using system;
using system.collections.generic;
using system.linq;
using system.net.sockets;
using system.text;
using system.threading.tasks;
using system.windows;

namespace warningclient
{
 public class socketserver
 {
  public action<socketeventparam> onsocketevent;

  public int64 sendbytecount
  {
   get
   {
    if (_netserver == null)
     return 0;
    return _netserver.sendbytecount;
   }
  }
  public int64 readbytecount
  {
   get
   {
    if (_netserver == null)
     return 0;
    return _netserver.readbytecount;
   }
  }

  netserver _netserver;
  en_packettype _packettype = en_packettype.bytestream;
  public void setpackttype(en_packettype packettype)
  {
   _packettype = packettype;
   if (_netserver == null)
    return;
   if (packettype == en_packettype.bytestream)
   {
    _netserver.setpacketparam(0, 1024);
   }
   else
   {
    _netserver.setpacketparam(9, 1024);
   }
  }

  public bool init(list<int> listenport)
  {
   netlogger.onlogevent += netlogger_onlogevent;
   _netserver = new netserver();
   setpackttype(_packettype);
   _netserver.getpackettotallen_callback += getpackettotallen;
   _netserver.onsocketpacketevent += socketpacketdeal;

   foreach (int n in listenport)
   {
    _netserver.addlistenport(n, n);
   }

   list<int> listenfault;
   bool start = _netserver.startlisten(out listenfault);
   return start;
  }

  int getpackettotallen(byte[] data, int offset)
  {
   if (mainwindow._packettype == en_packettype.znss)
    return getpacketznss(data, offset);
   else
    return getpacketanzhiyuan(data, offset);
  }

  int getpacketanzhiyuan(byte[] data, int offset)
  {
   int n = data[offset + 5] + 6;
   return n;
  }

  int getpacketznss(byte[] data, int offset)
  {
   int packetlen = (int)(data[4]) + 5;
   return packetlen;
  }


  public bool connectasyn(string peerip, int peerport, object tag)
  {
   return _netserver.connectasyn(peerip, peerport, tag);
  }

  public bool connect(string peerip, int peerport, object tag, out socket socket)
  {
   return _netserver.connect(peerip, peerport, tag, out socket);
  }

  private void netlogger_onlogevent(string message)
  {
   applog.log(message);
  }

  dictionary<socket, socketeventparam> _clientgroup = new dictionary<socket, socketeventparam>();

  public int clientcount
  {
   get
   {
    lock (_clientgroup)
    {
     return _clientgroup.count;
    }
   }
  }
  public list<socket> clientlist
  {
   get
   {
    if (_netserver != null)
     return _netserver.clientlist;
    return new list<socket>();
   }
  }
  void addclient(socketeventparam socketparam)
  {
   lock (_clientgroup)
   {
    _clientgroup.remove(socketparam.socket);
    _clientgroup.add(socketparam.socket, socketparam);
   }
  }

  void removeclient(socketeventparam socketparam)
  {
   lock (_clientgroup)
   {
    _clientgroup.remove(socketparam.socket);
   }
  }

  objectpool<socketeventparam> _readdatapool = new objectpool<socketeventparam>();

  public objectpool<socketeventparam> readdatapool
  {
   get
   {
    return _readdatapool;
   }
  }

  private void socketpacketdeal(socketeventparam socketparam)
  {
   onsocketevent?.invoke(socketparam);
   if (socketparam.socketevent == en_socketevent.read)
   {
    if (mainwindow._isshowreadpacket)
     _readdatapool.putobj(socketparam);
   }
   else if (socketparam.socketevent == en_socketevent.accept)
   {
    addclient(socketparam);
    string peerip = socketparam.clientinfo.peeripport;
    applog.log(string.format("客户端链接!本地端口:{0},对端:{1}",
     socketparam.clientinfo.localport, peerip));
   }
   else if (socketparam.socketevent == en_socketevent.connect)
   {
    string peerip = socketparam.clientinfo.peeripport;
    if (socketparam.socket != null)
    {
     addclient(socketparam);

     applog.log(string.format("连接对端成功!本地端口:{0},对端:{1}",
      socketparam.clientinfo.localport, peerip));
    }
    else
    {
     applog.log(string.format("连接对端失败!本地端口:{0},对端:{1}",
      socketparam.clientinfo.localport, peerip));
    }
   }
   else if (socketparam.socketevent == en_socketevent.close)
   {
    mainwindow.mainwnd.onsocketdisconnect(socketparam.socket);
    removeclient(socketparam);
    string peerip = socketparam.clientinfo.peeripport;
    applog.log(string.format("客户端断开!本地端口:{0},对端:{1},",
     socketparam.clientinfo.localport, peerip));
   }
  }

  public en_senddataresult senddata(socket socket, byte[] data)
  {
   if(socket == null)
   {
    messagebox.show("还没连接!");
    return en_senddataresult.no_client;
   }
   return _netserver.senddata(socket, data);
  }

  internal void sendtoall(byte[] data)
  {
   lock (_clientgroup)
   {
    foreach (socket socket in _clientgroup.keys)
    {
     senddata(socket, data);
    }
   }
  }
 }
}

以上这篇c#中一个高性能异步socket封装库的实现思路分享就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持移动技术网。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网