当前位置: 移动技术网 > IT编程>开发语言>.net > C#线程同步--限量使用

C#线程同步--限量使用

2018年08月22日  | 移动技术网IT编程  | 我要评论

黎家大院唯美图库,殇痕,化名捐款写错字

问题抽象:当某一资源同一时刻允许一定数量的线程使用的时候,需要有个机制来阻塞多余的线程,直到资源再次变得可用。
线程同步方案:semaphore、semaphoreslim、countdownevent
方案特性:限量供应;除所有者外,其他人无条件等待;先到先得,没有先后顺序

1、semaphore类
      用于控制线程的访问数量,默认的构造函数为initialcount和maximumcount,表示默认设置的信号量个数和最大信号量个数。当你waitone的时候,信号量自减,当release的时候,信号量自增,然而当信号量为0的时候,后续的线程就不能拿到waitone了,所以必须等待先前的线程通过release来释放。

using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {
            thread t1 = new thread(run1);
            t1.start();
            thread t2 = new thread(run2);
            t2.start();
            thread t3 = new thread(run3);
            t3.start();
            console.readkey();
        }

        //初始可以授予2个线程信号,因为第3个要等待前面的release才能得到信号
        static semaphore sem = new semaphore(2, 10);

        static void run1()
        {
            sem.waitone();
            console.writeline("大家好,我是run1;" + datetime.now.tostring("mm:ss"));

            //两秒后
            thread.sleep(2000);
            sem.release();
        }

        static void run2()
        {
            sem.waitone();
            console.writeline("大家好,我是run2;" + datetime.now.tostring("mm:ss"));

            //两秒后
            thread.sleep(2000);
            sem.release();
        }

        static void run3()
        {
            sem.waitone();
            console.writeline("大家好,我是run3;" + datetime.now.tostring("mm:ss"));

            //两秒后
            thread.sleep(2000);
            sem.release();
        }
    }
}
program

在以上的方法中release()方法相当于自增一个信号量,release(5)自增5个信号量。但是,release()到构造函数的第二个参数maximumcount的值就不能再自增了。

semaphore可用于进程级交互。

using system;
using system.diagnostics;
using system.threading;

namespace consoleapp1
{
    class program
    {
        static void main(string[] args)
        {

            thread t1 = new thread(run1);
            t1.start();

            thread t2 = new thread(run2);
            t2.start();

            console.read();
        }

        //初始可以授予2个线程信号,因为第3个要等待前面的release才能得到信号
        static semaphore sem = new semaphore(3, 10, "命名semaphore");

        static void run1()
        {
            sem.waitone();

            console.writeline("进程:" + process.getcurrentprocess().id + "  我是run1" + datetime.now.timeofday);
        }

        static void run2()
        {
            sem.waitone();

            console.writeline("进程:" + process.getcurrentprocess().id + "  我是run2" + datetime.now.timeofday);
        }
    }
}
program

直接运行两次bin目录的exe文件,就能发现最多只能输出3个。

semaphore可以限制可同时访问某一资源或资源池的线程数。
        semaphore类在内部维护一个计数器,当一个线程调用semaphore对象的wait系列方法时,此计数器减一,只要计数器还是一个正数,线程就不会阻塞。当计数器减到0时,再调用semaphore对象wait系列方法的线程将被阻塞,直到有线程调用semaphore对象的release()方法增加计数器值时,才有可能解除阻塞状态。
 
示例说明:
图书馆都配备有若干台公用计算机供读者查询信息,当某日读者比较多时,必须排队等候。uselibrarycomputer实例用多线程模拟了多人使用多台计算机的过程
using system;
using system.threading;

namespace consoleapp1
{
    class program
    {
        //图书馆拥有的公用计算机  
        private const int computernum = 3;
        private static computer[] librarycomputers;
        //同步信号量  
        public static semaphore sp = new semaphore(computernum, computernum);

        static void main(string[] args)
        {
            //图书馆拥有computernum台电脑  
            librarycomputers = new computer[computernum];
            for (int i = 0; i < computernum; i++)
                librarycomputers[i] = new computer("computer" + (i + 1).tostring());
            int peoplenum = 0;
            random ran = new random();
            thread user;
            system.console.writeline("敲任意键模拟一批批的人排队使用{0}台计算机,esc键结束模拟……", computernum);
            //每次创建若干个线程,模拟人排队使用计算机  
            while (system.console.readkey().key != consolekey.escape)
            {
                peoplenum = ran.next(0, 10);
                system.console.writeline("\n有{0}人在等待使用计算机。", peoplenum);

                for (int i = 1; i <= peoplenum; i++)
                {
                    user = new thread(usecomputer);
                    user.start("user" + i.tostring());
                }
            }
        }

        //线程函数  
        static void usecomputer(object username)
        {
            sp.waitone();//等待计算机可用  

            //查找可用的计算机  
            computer cp = null;
            for (int i = 0; i < computernum; i++)
                if (librarycomputers[i].isoccupied == false)
                {
                    cp = librarycomputers[i];
                    break;
                }
            //使用计算机工作  
            cp.use(username.tostring());

            //不再使用计算机,让出来给其他人使用  
            sp.release();
        }
    }

    class computer
    {
        public readonly string computername = "";
        public computer(string name)
        {
            computername = name;
        }
        //是否被占用  
        public bool isoccupied = false;
        //人在使用计算机  
        public void use(string username)
        {
            system.console.writeline("{0}开始使用计算机{1}", username, computername);
            isoccupied = true;
            thread.sleep(new random().next(1, 2000)); //随机休眠,以模拟人使用计算机  
            system.console.writeline("{0}结束使用计算机{1}", username, computername);
            isoccupied = false;
        }
    }
}
program

 

2、semaphoreslim类
     在.net 4.0之前,framework中有一个重量级的semaphore,可以跨进程同步,semaphoreslim轻量级不行,msdn对它的解释为:限制可同时访问某一资源或资源池的线程数。
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        static semaphoreslim slim = new semaphoreslim(environment.processorcount, 12);

        static void main(string[] args)
        {
            for (int i = 0; i < 12; i++)
            {
                task.factory.startnew((obj) =>
                {
                    run(obj);
                }, i);
            }
            console.read();
        }

        static void run(object obj)
        {
            slim.wait();
            console.writeline("当前时间:{0}任务 {1}已经进入。", datetime.now, obj);
            //这里busy3s中
            thread.sleep(3000);
            slim.release();
        }
    }
}
program

同样,防止死锁的情况,我们需要知道”超时和取消标记“的解决方案,像semaphoreslim这种定死的”线程请求范围“,其实是降低了扩展性,使用需谨慎,在觉得有必要的时候使用它

注:semaphore类是semaphoreslim类的老版本,该版本使用纯粹的内核时间(kernel-time)方式。
    semaphoreslim类不使用windows内核信号量,而且也不支持进程间同步。所以在跨程序同步的场景下可以使用semaphore
 
3、countdownevent类
     这种采用信号状态的同步基元非常适合在动态的fork,join的场景,它采用“信号计数”的方式,就比如这样,一个麻将桌只能容纳4个人打麻将,如果后来的人也想搓一把碰碰运气,那么他必须等待直到麻将桌上的人走掉一位。好,这就是简单的信号计数机制,从技术角度上来说它是定义了最多能够进入关键代码的线程数。
     但是countdownevent更牛x之处在于我们可以动态的改变“信号计数”的大小,比如一会儿能够容纳8个线程,一下又4个,一下又10个,这样做有什么好处呢?比如一个任务需要加载1w条数据,那么可能出现这种情况。
例如:
加载user表:         根据user表的数据量,我们需要开5个task。
加载product表:    产品表数据相对比较多,计算之后需要开8个task。
加载order表:       由于我的网站订单丰富,计算之后需要开12个task。
using system;
using system.threading;
using system.threading.tasks;

namespace consoleapp1
{
    class program
    {
        //默认的容纳大小为“硬件线程“数
        static countdownevent cde = new countdownevent(environment.processorcount);

        static void loaduser(object obj)
        {
            try
            {
                console.writeline("threadid={0};当前任务:{1}正在加载user部分数据!", thread.currentthread.managedthreadid, obj);
            }
            finally
            {
                cde.signal();
            }
        }

        static void loadproduct(object obj)
        {
            try
            {
                console.writeline("threadid={0};当前任务:{1}正在加载product部分数据!", thread.currentthread.managedthreadid, obj);
            }
            finally
            {
                cde.signal();
            }
        }

        static void loadorder(object obj)
        {
            try
            {
                console.writeline("threadid={0};当前任务:{1}正在加载order部分数据!", thread.currentthread.managedthreadid, obj);
            }
            finally
            {
                cde.signal();
            }
        }

        static void main(string[] args)
        {
            //加载user表需要5个任务
            var usertaskcount = 5;
            //重置信号
            cde.reset(usertaskcount);
            for (int i = 0; i < usertaskcount; i++)
            {
                task.factory.startnew((obj) =>
                {
                    loaduser(obj);
                }, i);
            }
            //等待所有任务执行完毕
            cde.wait();
            console.writeline("\nuser表数据全部加载完毕!\n");

            //加载product需要8个任务
            var producttaskcount = 8;
            //重置信号
            cde.reset(producttaskcount);
            for (int i = 0; i < producttaskcount; i++)
            {
                task.factory.startnew((obj) =>
                {
                    loadproduct(obj);
                }, i);
            }
            cde.wait();
            console.writeline("\nproduct表数据全部加载完毕!\n");

            //加载order需要12个任务
            var ordertaskcount = 12;
            //重置信号
            cde.reset(ordertaskcount);
            for (int i = 0; i < ordertaskcount; i++)
            {
                task.factory.startnew((obj) =>
                {
                    loadorder(obj);
                }, i);
            }
            cde.wait();
            console.writeline("\norder表数据全部加载完毕!\n");

            console.writeline("\n(*^__^*) 嘻嘻,恭喜你,数据全部加载完毕\n");
            console.read();
        }
    }
}
program

我们看到有两个主要方法:wait和signal。每调用一次signal相当于麻将桌上走了一个人,直到所有人都搓过麻将wait才给放行,这里同样要注意也就是“超时“问题的存在性,尤其是在并行计算中,轻量级别给我们提供了”取消标记“的机制,这是在重量级别中不存在的

注:如果调用signal()没有到达指定的次数,那么wait()将一直等待,请确保使用每个线程完成后都要调用signal方法。

 

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网