当前位置: 移动技术网 > IT编程>开发语言>.net > C#完成端口(IOCP)

C#完成端口(IOCP)

2020年05月09日  | 移动技术网IT编程  | 我要评论

余朕哲,雷豹子激活码,六一儿童网

pool

 /// <summary>
    /// 与每个客户socket相关联,进行send和receive投递时所需要的参数
    /// </summary>
   public class iocontextpool
    {
        list<socketasynceventargs> pool;        //为每一个socket客户端分配一个socketasynceventargs,用一个list管理,在程序启动时建立。
        int32 capacity;                         //pool对象池的容量
        int32 boundary;                         //已分配和未分配对象的边界,大的是已经分配的,小的是未分配的
        
        public iocontextpool(int32 capacity)
        {
            this.pool = new list<socketasynceventargs>(capacity);
            this.boundary = 0;
            this.capacity = capacity;
        }

        /// <summary>
        /// 往pool对象池中增加新建立的对象,因为这个程序在启动时会建立好所有对象,
        /// 故这个方法只在初始化时会被调用,因此,没有加锁。
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public bool add(socketasynceventargs arg)
        {
            if (arg != null && pool.count < capacity)
            {
                pool.add(arg);
                boundary++;
                return true;
            }
            else
                return false;
        }

        /// <summary>
        /// 取出集合中指定对象,内部使用
        /// </summary>
        /// <param name="index"></param>
        /// <returns></returns>
        //internal socketasynceventargs get(int index)
        //{
        //    if (index >= 0 && index < capacity)
        //        return pool[index];
        //    else
        //        return null;
        //}

        /// <summary>
        /// 从对象池中取出一个对象,交给一个socket来进行投递请求操作
        /// </summary>
        /// <returns></returns>
        public socketasynceventargs pop()
        {
            lock (this.pool)
            {
                if (boundary > 0)
                {
                    --boundary;
                    return pool[boundary];
                }
                else
                    return null;
            }
        }

        /// <summary>
        /// 一个socket客户断开,与其相关的iocontext被释放,重新投入pool中,备用。
        /// </summary>
        /// <param name="arg"></param>
        /// <returns></returns>
        public bool push(socketasynceventargs arg)
        {
            if (arg != null)
            {
                lock (this.pool)
                {
                    int index = this.pool.indexof(arg, boundary);  //找出被断开的客户,此处一定能查到,因此index不可能为-1,必定要大于0。
                    if (index == boundary)         //正好是边界元素
                        boundary++;
                    else
                    {
                        this.pool[index] = this.pool[boundary];     //将断开客户移到边界上,边界右移
                        this.pool[boundary++] = arg;
                    }
                }
                return true;
            }
            else
                return false;
        }
    }

 

 server

public partial class iocpserver : form
    {
        private delegate void setrichtextboxcallback(string str);
        private setrichtextboxcallback setrichtextboxcallback;
        public iocpserver()
        {
            setrichtextboxcallback = new setrichtextboxcallback(setrichtextboxreceive);
            initializecomponent();
        }
        /// <summary>
        /// // 监听socket,用于接受客户端的连接请求
        /// </summary>
        socket socketlistener;
        /// <summary>
        /// // 用于服务器执行的互斥同步对象
        /// </summary>
        private static mutex mutex = new mutex();
        //完成端口上进行投递所用的iocontext对象池
        //private iocontextpool iocontextpool;
        // 
        /// <summary>
        /// 服务器上连接的客户端总数
        /// </summary>
        private int32 numconnectedsockets;
       /// <summary>
        /// 服务器能接受的最大连接数量
       /// </summary>
        private int32 numconnections = 8192;
        /// <summary>
        /// 用于每个i/o socket操作的缓冲区大小
        /// </summary>
        private int32 buffersize = 4028;
        /// <summary>
        /// 端口
        /// </summary>
        private int32 bufferport = convert.toint32(configurationmanager.appsettings["serviceport"]); 
        //ip
        private string _getaddress = configurationmanager.appsettings["serviceaddress"];
        //所有设备用户信息
        //private list<equipment> listinfo = new list<equipment>();
        /// <summary>
        /// 所有设备用户信息
        /// </summary>
        private list<clientinformation> listinfo = new list<clientinformation>();
        /// <summary>
        /// 输出实体类
        /// </summary>
        httpdate hdate = new httpdate();
        //完成端口上进行投递所用的iocontext对象池
        private iocontextpool iocontextpool;
        datetime getdate;
        timespan udptime;
        string filetxt = application.startuppath + @"\filetxt";
        string filename ;
        private void iocpserver_load(object sender, eventargs e)
        {
            //获取所有设备用户信息
            //listinfo = adogetinfo.getequipmentuser();
            getdate = datetime.now.addhours(-1);
            filesave();


            this.numconnectedsockets = 0;

            this.iocontextpool = new iocontextpool(numconnections);

            // 为iocontextpool预分配socketasynceventargs对象
            for (int32 i = 0; i < this.numconnections; i++)
            {
                socketasynceventargs iocontext = new socketasynceventargs();
                iocontext.completed += new eventhandler<socketasynceventargs>(oniocompleted);
                iocontext.setbuffer(new byte[this.buffersize], 0, this.buffersize);
              //   将预分配的对象加入socketasynceventargs对象池中
                this.iocontextpool.add(iocontext);
            }
            // 获得主机相关信息
            ipaddress[] addresslist = dns.gethostentry(environment.machinename).addresslist;

            ipendpoint localendpoint = new ipendpoint(addresslist[addresslist.length - 1], bufferport);
                
            // 创建监听socket
            this.socketlistener = new socket(localendpoint.addressfamily, sockettype.stream, protocoltype.tcp);

            this.socketlistener.receivebuffersize = this.buffersize;

            this.socketlistener.sendbuffersize = this.buffersize;

            if (localendpoint.addressfamily == addressfamily.internetworkv6)
            {
                // 配置监听socket为 dual-mode (ipv4 & ipv6) 
                // 27 is equivalent to ipv6_v6only socket option in the winsock snippet below,
                this.socketlistener.setsocketoption(socketoptionlevel.ipv6, (socketoptionname)27, false);
                this.socketlistener.bind(new ipendpoint(ipaddress.ipv6any, localendpoint.port));
            }
            else
            {
                this.socketlistener.bind(localendpoint);
            }

            // 开始监听
            this.socketlistener.listen(this.numconnections);

            // 在监听socket上投递一个接受请求。
            this.startaccept(null);

            // blocks the current thread to receive incoming messages.
            mutex.waitone();

            rtboxinformation.invoke(setrichtextboxcallback, "服务器开始监听");
        }
        /// <summary>
        /// 监听socket接受处理
        /// </summary>
        /// <param name="e">socketasynceventarg associated with the completed accept operation.</param>
        private void processaccept(socketasynceventargs e)
        {
            socket s = e.acceptsocket;
            if (s.connected)
            {
                try
                {
                    socketasynceventargs iocontext = this.iocontextpool.pop();

                    if (iocontext != null)
                    {
                        // 从接受的客户端连接中取数据配置iocontext
                        //iocontext.completed += new eventhandler<socketasynceventargs>(oniocompleted);
                        //byte [] by=new byte[]{};
                        //iocontext.setbuffer(by, 0, by.length);
                        //iocontext.usertoken = s;
                        // 从接受的客户端连接中取数据配置iocontext

                        iocontext.usertoken = s;

                      
                        interlocked.increment(ref this.numconnectedsockets);

                        string outstr = string.format("客户 {0} 连入, 共有 {1} 个连接。", s.remoteendpoint.tostring(), this.numconnectedsockets);

                        rtboxinformation.invoke(setrichtextboxcallback, outstr);

                        if (!s.receiveasync(iocontext))
                        {
                            this.processreceive(iocontext);
                        }
                    }
                    else        //已经达到最大客户连接数量,在这接受连接,发送“连接已经达到最大数”,然后断开连接
                    {
                        s.send(encoding.default.getbytes("连接已经达到最大数!"));
                        string outstr = string.format("连接已满,拒绝 {0} 的连接。", s.remoteendpoint);
                        rtboxinformation.invoke(setrichtextboxcallback, outstr);
                        s.close();
                    }
                }
                catch (socketexception ex)
                {
                    socket token = e.usertoken as socket;
                    string outstr = string.format("接收客户 {0} 数据出错, 异常信息: {1} 。", token.remoteendpoint, ex.tostring());
                    adoinserttemp.addservererrorlog("接收客户数据出错:[icopserver代码行号177]" + ex.message);
                    rtboxinformation.invoke(setrichtextboxcallback, outstr);
                }
                catch (exception ex)
                {
                    rtboxinformation.invoke(setrichtextboxcallback, ex.message);
                    adoinserttemp.addservererrorlog("监听socket接受处理:[icopserver代码行号182]" + ex.message);
                }
                // 投递下一个接受请求
                this.startaccept(e);
            }
        }
        /// <summary>
        /// 从客户端开始接受一个连接操作
        /// </summary>
        /// <param name="accepteventarg">the context object to use when issuing 
        /// the accept operation on the server's listening socket.</param>
        private void startaccept(socketasynceventargs accepteventarg)
        {
            if (accepteventarg == null)
            {
                accepteventarg = new socketasynceventargs();
                accepteventarg.completed += new eventhandler<socketasynceventargs>(onacceptcompleted);
            }
            else
            {
                // 重用前进行对象清理
                accepteventarg.acceptsocket = null;
            }

            if (!this.socketlistener.acceptasync(accepteventarg))
            {
                this.processaccept(accepteventarg);
            }
        }
        /// <summary>
        ///接收完成时处理函数
        /// </summary>
        /// <param name="e">与接收完成操作相关联的socketasynceventarg对象</param>
        private void processreceive(socketasynceventargs e)
        {
            // 检查远程主机是否关闭连接
            if (e.bytestransferred > 0)
            {
                if (e.socketerror == socketerror.success)
                {

                    socket s = (socket)e.usertoken;

                    clientinformation client = hdate.addclient(listinfo, s.remoteendpoint.tostring(), e.bytestransferred, e.buffer);

                    //判断所有需接收的数据是否已经完成
                    if (s.available == 0)
                    {
                        ipendpoint localep = s.remoteendpoint as ipendpoint;
                        // 设置发送数据
                        byte[] _endread = new byte[client.transferred];
                        bool isclose = false;
                        client = hdate.getclient(listinfo,client);
                        string strtext = client.rend;  //encoding.utf8.getstring(e.buffer, 0, client.transferred);
                        requesttype requesttype = hdate.request_type(strtext);//数据类型
                        requestdeal requestdeal = hdate.request_deal(strtext);//命令方式
                        byte[] data = new byte[4028];
                        //初始化
                        if (requesttype == requesttype.typeget && requestdeal == requestdeal.getconfiguration)
                        {
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext));
                            data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.responsegetfromnew.replace("[getsn]", client.devicessn))));
                            e.setbuffer(data, e.offset, data.length);
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data)));
                        }
                        else if (requesttype == requesttype.typeget && requestdeal == requestdeal.getinfo)
                        {
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext));
                            data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok)));
                            e.setbuffer(data, e.offset, data.length);
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data)));

                        }
                        //是否有命令发送
                        else if (requesttype == requesttype.typeget && requestdeal == requestdeal.getorders)
                        {
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext));
                            if (client.waitingname != null)
                            {
                                if (client.waitingname.count > 0)
                                {
                                    data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(client.waitingname[0])));
                                }
                                else
                                {
                                    data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok)));
                                }
                            }
                            else
                            {
                                data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok)));
                            }
                            e.setbuffer(data, e.offset, data.length);
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data)));
                        }
                        //返回值说明:0 命令执行成功-1 参数错误-3 存取错误
                        else if (requesttype == requesttype.typepost && requestdeal == requestdeal.postinfo)
                        {
                            hdate.removelength(client, strtext);
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext));
                            data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok)));
                            e.setbuffer(data, e.offset, data.length);
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data)));

                        }
                        //post发送数据命令
                        else if (requesttype == requesttype.typepost && requestdeal == requestdeal.postatttable)
                        {
                            hdate.gettable(strtext, client);
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext));
                            data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok)));
                            e.setbuffer(data, e.offset, data.length);
                            rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data)));

                        }
                        else
                        {

                            if (!client.isdata)
                            {
                                hdate.removelength(client, strtext);
                                rtboxinformation.invoke(setrichtextboxcallback, string.format("[来自{0}]{1}", localep, strtext));
                                data = commonmethod.getsend(buffersize, encoding.ascii.getbytes(hdate.rtrunhttpnew(outputprint.capsok)));
                                e.setbuffer(data, e.offset, data.length);
                                rtboxinformation.invoke(setrichtextboxcallback, string.format("向{0}发送:{1}", localep, encoding.utf8.getstring(data)));
                            }
                            else
                            {
                                hdate.gettable(strtext, client);
                            }
                        }
                        try
                        {
                            if (!s.sendasync(e))        //投递发送请求,这个函数有可能同步发送出去,这时返回false,并且不会引发socketasynceventargs.completed事件
                            {
                                // 同步发送时处理发送完成事件
                                this.processsend(e, isclose);
                            }

                            if (client != null && !client.isdata)
                            {
                                hdate.addatt(client, listinfo);
                                thread.sleep(6000);
                                this.closeclientsocket(s, e);
                            }
                        }
                        catch (exception ex) 
                        {
                            adoinserttemp.addservererrorlog("接收完成时处理函数:[icopserver代码行号330]" + ex.message);
                        }

                    }
                    else if (!s.receiveasync(e))    //为接收下一段数据,投递接收请求,这个函数有可能同步完成,这时返回false,并且不会引发socketasynceventargs.completed事件
                    {
                        // 同步接收时处理接收完成事件
                        this.processreceive(e);
                    }
                }
                else
                {
                    this.processerror(e);
                }
            }
            else
            {
                this.closeclientsocket(e);
            }
        }
        /// <summary>
        /// 发送完成时处理函数
        /// </summary>
        /// <param name="e">与发送完成操作相关联的socketasynceventarg对象</param>
        private void processsend(socketasynceventargs e, bool isreceive)
        {
            try
            {
                if (e.socketerror == socketerror.success)
                {


                    socket s = (socket)e.usertoken;
                    if (s != null)
                    {
                        //this.closeclientsocket(s, e);
                        //接收时根据接收的字节数收缩了缓冲区的大小,因此投递接收请求时,恢复缓冲区大小
                        
                        //e.setbuffer(new byte[buffer_size], 0, buffer_size);

                        

                        e.setbuffer(0, buffersize);
                        
                            if (!s.receiveasync(e))     //投递接收请求
                            {
                                // 同步接收时处理接收完成事件
                                this.processreceive(e);
                            }
                        
                    }
                }
                else
                {
                    this.processerror(e);
                }
            }
            catch (exception ex) 
            { 
                rtboxinformation.invoke(setrichtextboxcallback, ex.message);
                adoinserttemp.addservererrorlog("发送完成时处理函数:[icopserver代码行号390]" + ex.message); 
                this.processerror(e); 
            }
        } 
        /// <summary>
        /// 当socket上的发送或接收请求被完成时,调用此函数
        /// </summary>
        /// <param name="sender">激发事件的对象</param>
        /// <param name="e">与发送或接收完成操作相关联的socketasynceventarg对象</param>
        private void oniocompleted(object sender, socketasynceventargs e)
        {
            // determine which type of operation just completed and call the associated handler.
            switch (e.lastoperation)
            {
                case socketasyncoperation.receive:
                    this.processreceive(e);
                    break;
                case socketasyncoperation.send:
                    this.processsend(e,true);
                    break;
                default:
                    throw new argumentexception("the last operation completed on the socket was not a receive or send");
            }
        }
        /// <summary>
        /// 处理socket错误
        /// </summary>
        /// <param name="e"></param>
        private void processerror(socketasynceventargs e)
        {
            try
            {
                socket s = e.usertoken as socket;
                ipendpoint localep = s.localendpoint as ipendpoint;

                this.closeclientsocket(s, e);

                string outstr = string.format("套接字错误 {0}, ip {1}, 操作 {2}。", (int32)e.socketerror, localep, e.lastoperation);

                rtboxinformation.invoke(setrichtextboxcallback, outstr);
            }
            catch (exception ex) { adoinserttemp.addservererrorlog("处理socket错误:[icopserver代码行号431]" + ex.message); }
        }
        /// <summary>
        /// 关闭socket连接
        /// </summary>
        /// <param name="e">socketasynceventarg associated with the completed send/receive operation.</param>
        private void closeclientsocket(socketasynceventargs e)
        {
            socket s = e.usertoken as socket;
            this.closeclientsocket(s, e);
        }
        /// <summary>
        /// accept 操作完成时回调函数
        /// </summary>
        /// <param name="sender">object who raised the event.</param>
        /// <param name="e">socketasynceventarg associated with the completed accept operation.</param>
        private void onacceptcompleted(object sender, socketasynceventargs e)
        {
            this.processaccept(e);
        }
        private void closeclientsocket(socket s, socketasynceventargs e)
        {
            try
            {

                if (s != null && this.numconnectedsockets > 0)
                {
                    interlocked.decrement(ref this.numconnectedsockets);

                    // socketasynceventarg 对象被释放,压入可重用队列。

                    

                    this.iocontextpool.push(e);

                    string outstr = string.format("客户 {0} 断开, 共有 {1} 个连接。", s.remoteendpoint.tostring(), this.numconnectedsockets);

                    rtboxinformation.invoke(setrichtextboxcallback, outstr);
                    try
                    {
                        s.shutdown(socketshutdown.send);
                        s.disconnect(true);
                    }
                    catch (exception ex)
                    {
                        rtboxinformation.invoke(setrichtextboxcallback, ex.message);
                        adoinserttemp.addservererrorlog("sokect关闭:[icopserver代码行号477]" + ex.message);
                    }
                    finally
                    {
                        s.close();
                    }
                }
            }
            catch (exception ex) { adoinserttemp.addservererrorlog("sokect关闭:[icopserver代码行号467]" + ex.message); }
        }
        private void setrichtextboxreceive(string str)
        {
            //show txt
            rtboxinformation.appendtext(str);
            //do right
            rtboxinformation.select(this.rtboxinformation.textlength, 0);
            //do down
            rtboxinformation.scrolltocaret();
            //new row
            rtboxinformation.appendtext("\r\n");

            filesave();
        }

        private void filesave()
        { 
            timespan udptime=datetime.now-getdate;

            if(udptime.hours>=1)
            {
                

                filestream fs = null;
                streamwriter sw = null;

                filename = filetxt + datetime.now.tostring("yyyymmddhh");
               if(!file.exists(filename))
               {
                   directory.createdirectory(filename);
               }

                

               fs = new filestream(filename + @"\log_"+datetime.now.tostring("yyyymmddhhmmssfff") + ".txt", filemode.create);
               sw = new streamwriter(fs);
               
               sw.write(rtboxinformation.text);
            
               sw.close();
               fs.close();
              
               rtboxinformation.clear();

               getdate = datetime.now;
            }
        
        }




        private void iocpserver_formclosing(object sender, formclosingeventargs e)
        {
                e.cancel = true;
                this.hide();
            
        }

        private void notifyicon_mousedoubleclick(object sender, mouseeventargs e)
        {

            this.show();
            windowstate = formwindowstate.normal;
        }
    }

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

更新中....

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网