当前位置: 移动技术网 > IT编程>开发语言>.net > TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

TaskCreationOptions.LongRunning 运行比可用线程数更多的任务

2019年04月10日  | 移动技术网IT编程  | 我要评论

新闻网,烽火纪元,湛江二中林涛


最近在学websocket,服务端需要监听多个websocket客户端发送的消息。

开始的解决方法是每个websocket客户端都添加一个线程进行监听,代码如下:

/// <summary>
/// 监听端口 创建websocket
/// </summary>
/// <param name="httplistener"></param>
private void createwebsocket(httplistener httplistener)
{
    if (!httplistener.islistening)
        throw new exception("httplistener未启动");
    httplistenercontext listenercontext =  httplistener.getcontextasync().result;

    if (!listenercontext.request.iswebsocketrequest)
    {
        createwebsocket(httplistener);
        return;
    }

    websocketcontext websocket = null;
    try
    {
        websocket = new websocketcontext(listenercontext, subprotocol);
    }
    catch (exception ex)
    {
        log.error(ex);
        createwebsocket(httplistener);
        return;
    }

    log.info($"成功创建websocket:{websocket.id}");

    int workerthreads = 0, completionportthreads = 0;
    threadpool.getavailablethreads(out workerthreads, out completionportthreads);
    if (workerthreads <= reservedthreadscount + 1 || completionportthreads <= reservedthreadscount + 1)
    {
        /**
         * 可用线程小于预留线程数量
         * 通知客户端关闭连接
         * */
        websocket.closeasync(websocketclosestatus.internalservererror, "可用线程不足,无法连接").wait();
    }
    else
    {
        if (onreceivemessage != null)
            websocket.onreceivemessage += onreceivemessage;
        websocket.onclosewebsocket += websocket_onclosewebsocket;
        websocketcontexts.add(websocket);

        // 在线程中监听客户端发送的消息
        threadpool.queueuserworkitem(new waitcallback(p =>
        {
            (p as websocketcontext).receivemessageasync().wait();
        }), websocket);

    }

    createwebsocket(httplistener);
}

 

但是可用线程数量是有限的,先连接的客户端一直递归接收消息,导致线程无限占用,后连接上的客户端就没有线程用于监听接受消息了。
接受消息方法如下:

/// <summary>
/// 递归 同步接收消息
/// </summary>
/// <returns></returns>
public void receivemessage()
{
    websocket websocket = httplistenerwebsocketcontext.websocket;

    if (websocket.state != websocketstate.open)
        throw new exception("http未握手成功,不能接受消息!");
    
    var bytebuffer = websocket.createserverbuffer(receivebuffersize);
    websocketreceiveresult receiveresult = null;
    try
    {
        receiveresult = websocket.receiveasync(bytebuffer, cancellationtoken).result;
    }
    catch (websocketexception ex)
    {
        if (ex.innerexception is httplistenerexception)
        {
            log.error(ex);
            closeasync(websocketclosestatus.protocolerror, "客户端断开连接" + ex.message).wait(timespan.fromseconds(20));
            return;
        }
        else
        {
            log.error(ex);
            closeasync(websocketclosestatus.protocolerror, "websocket 连接异常" + ex.message).wait(timespan.fromseconds(20));
            return;
        }
    }
    catch (exception ex)
    {
        log.error(ex);
        closeasync(websocketclosestatus.protocolerror, "客户端断开连接" + ex.message).wait(timespan.fromseconds(20));
        return;
    }
    if (receiveresult.closestatus.hasvalue)
    {
        log.info("接受到关闭消息!");
        closeasync(receiveresult.closestatus.value, receiveresult.closestatusdescription).wait(timespan.fromseconds(20));
        return;
    }

    byte[] bytes = new byte[receiveresult.count];
    array.copy(bytebuffer.array, bytes, bytes.length);

    string message = encoding.getstring(bytes);
    log.info($"{id}接收到消息:{message}");

    if (onreceivemessage != null)
        onreceivemessage.invoke(this, message);

    if (!cancellationtoken.iscancellationrequested)
        receivemessage();
}

这是不能接受的。

后来在task中看到,在创建task时可以设置taskcreationoptions参数

该枚举有个字段longrunning 

longrunning 2

指定任务将是长时间运行的、粗粒度的操作,涉及比细化的系统更少、更大的组件。 它会向 taskscheduler 提示,过度订阅可能是合理的。 可以通过过度订阅创建比可用硬件线程数更多的线程。 它还将提示任务计划程序:该任务需要附加线程,以使任务不阻塞本地线程池队列中其他线程或工作项的向前推动。

经过测试,可同时运行的任务数量的确可以超出可用线程数量。
测试如下:
没有设置 taskcreationoptions.longrunning  代码如下:

        /// <summary>
        /// 测试任务
        /// 只运行了9个任务
        /// </summary>
        [testmethod]
        public void testtask1()
        {
            var cts = new cancellationtokensource();
            int maxworkerthreads = 0, maxcompletionportthreads = 0;
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            maxworkerthreads = 10;
            maxcompletionportthreads = 10;
            console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads);
            threadpool.setmaxthreads(10, 10);
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            
            int count = 0;
            while (count++ < 30)
            {
                task.factory.startnew(p =>
                {
                    int index = (int)p;
                    int runcount = 0;
                    longrunningtask($"线程{index}", runcount, cts.token);
                }, count, cts.token, taskcreationoptions.none, taskscheduler.default);
            }

            task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20)); // 等待超时,等待任务没有执行
            cts.cancel();
        }

        /// <summary>
        /// 长时运行任务
        /// 递归运行
        /// </summary>
        /// <param name="taskname">任务名称</param>
        /// <param name="runcount">运行次数</param>
        /// <param name="token">传播有关取消操作的通知</param>
        private void longrunningtask(string taskname, int runcount, cancellationtoken token)
        {
            printtask($"任务【{taskname}】线程id【{environment.currentmanagedthreadid}】第【{++runcount}】次运行").wait();
            if (!token.iscancellationrequested)
                longrunningtask(taskname, runcount, token);
        }
        /// <summary>
        /// 异步打印任务 等待1秒后打印消息
        /// </summary>
        /// <param name="message">消息</param>
        /// <returns></returns>
        private task printtask(string message)
        {
            return task.factory.startnew(() =>
            {
                thread.sleep(1000);
                console.writeline(message);
            });
        }

 测试结果

测试用了20秒才完成

主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务没有执行,而是等待超时了。

 

设置了 taskcreationoptions.longrunning  代码如下:

        /// <summary>
        /// 测试长时运行任务
        /// 30个任务全部都运行了
        /// </summary>
        [testmethod]
        public void testtasklongrunning()
        {
            var cts = new cancellationtokensource();
            int maxworkerthreads = 0, maxcompletionportthreads = 0;
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            maxworkerthreads = 10;
            maxcompletionportthreads = 10;
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个同步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads);
            threadpool.setmaxthreads(10, 10);
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            
            int count = 0;
            while (count++ < 30)
            {

                task.factory.startnew(p =>
                {
                    int index = (int)p;
                    int runcount = 0;
                    longrunningtask($"线程{index}", runcount, cts.token);
                }, count, cts.token, taskcreationoptions.longrunning, taskscheduler.default);
            }

            task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20));    // 等待没有超时,等待任务有执行
            cts.cancel();
        }

测试结果:

测试用了10秒完成

主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务立即执行了,程序等待了10秒完成。

 使用taskcreationoptions.longrunning  需要注意的是action必须是同步方法同时运行任务书才能超出可以用线程数量,否则不能。

例如:

        /// <summary>
        /// 测试长时运行任务
        /// 只运行了前9个任务
        /// </summary>
        [testmethod]
        public void testtasklongrunning2()
        {
            var cts = new cancellationtokensource();
            int maxworkerthreads = 0, maxcompletionportthreads = 0;
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");

            maxworkerthreads = 10;
            maxcompletionportthreads = 10;
            console.writeline(@"设置线程池中辅助线程的最大数目为{0}, 线程池中异步 i/o 线程的最大数目为{1}
同时运行30个长时运行线程,每个线程中运行一个异步方法,看是否30个线程是否都能运行。", maxworkerthreads, maxcompletionportthreads);
            threadpool.setmaxthreads(10, 10);
            threadpool.getmaxthreads(out maxworkerthreads, out maxcompletionportthreads);
            console.writeline($"最大可用辅助线程数目为{maxcompletionportthreads},最大可用异步 i/o 线程数目为{maxcompletionportthreads}");
            
            int count = 0;
            while (count++ < 30)
            {

                task.factory.startnew(async p =>
                {
                    int index = (int)p;
                    int runcount = 0;
                    await longrunningtaskasync($"线程{index}", runcount, cts.token);
                }, count, cts.token, taskcreationoptions.longrunning, taskscheduler.default);
            }

            task.delay(timespan.fromseconds(10)).wait(timespan.fromseconds(20));    // 等待没有超时,等待任务有执行
            cts.cancel();
        }
        /// <summary>
        /// 异步长时运行任务
        /// </summary>
        /// <param name="taskname">任务名称</param>
        /// <param name="runcount">运行次数</param>
        /// <param name="token">传播有关取消操作的通知</param>
        /// <returns></returns>
        private async task longrunningtaskasync(string taskname, int runcount, cancellationtoken token)
        {
            await printtask($"任务【{taskname}】线程id【{environment.currentmanagedthreadid}】第【{++runcount}】次运行");
            if (!token.iscancellationrequested)
                await longrunningtaskasync(taskname, runcount, token);
        }

 

测试结果

测试用了10秒完成

主线程创建了一个等待10秒后完成的任务,任务等待超时20秒

说明主程序创建的任务立即执行了,程序等待了10秒完成。

websocket修改后的监听方法:

        /// <summary>
        /// 监听端口 创建websocket
        /// </summary>
        /// <param name="httplistener"></param>
        private void createwebsocket(httplistener httplistener)
        {
            if (!httplistener.islistening)
                throw new exception("httplistener未启动");
            httplistenercontext listenercontext = httplistener.getcontext();

            if (!listenercontext.request.iswebsocketrequest)
            {
                createwebsocket(httplistener);
                return;
            }

            websocketcontext websocket = null;
            try
            {
                websocket = new websocketcontext(listenercontext, subprotocol);
            }
            catch (exception ex)
            {
                log.error(ex);
                createwebsocket(httplistener);
                return;
            }

            log.info($"成功创建websocket:{websocket.id}");

            int workerthreads = 0, completionportthreads = 0;
            threadpool.getavailablethreads(out workerthreads, out completionportthreads);
            if (onreceivemessage != null)
                    websocket.onreceivemessage += onreceivemessage;
                websocket.onclosewebsocket += websocket_onclosewebsocket;

            task.factory.startnew(() =>
            {
                websocket.receivemessage();
            }, cancellationtoken, taskcreationoptions.longrunning, taskscheduler.default);

            createwebsocket(httplistener);
        }

 

修改后的websocket服务可以监听超过可用线程数量的客户端

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网