当前位置: 移动技术网 > IT编程>开发语言>Java > 基于Java 生产者消费者模式(详细分析)

基于Java 生产者消费者模式(详细分析)

2018年12月04日  | 移动技术网IT编程  | 我要评论

生产者消费者模式是多线程中最为常见的模式:生产者线程(一个或多个)生成面包放进篮子里(集合或数组),同时,消费者线程(一个或多个)从篮子里(集合或数组)取出面包消耗。虽然它们任务不同,但处理的资源是相同的,这体现的是一种线程间通信方式。

本文将先说明单生产者单消费者的情况,之后再说明多生产者多消费者模式的情况。还会分别使用wait()/nofity()/nofityall()机制、lock()/unlock()机制实现这两种模式。

在开始介绍模式之前,先解释下wait()、notify()和notifyall()方法的用法细节以及改进的lock()/unlock()、await()/signal()/signalall()的用法。

1.等待、唤醒机制的原理

wait()、notify()和notifyall()分别表示让线程进入睡眠、唤醒睡眠线程以及唤醒所有睡眠的线程。但是,对象是哪个线程呢?另外,在api文档中描述这三个方法都必须在有效监视器(可理解为持有锁)的前提下使用。这三个方法和锁有什么关系呢?

以同步代码块synchronized(obj){}或同步函数为例,在它们的代码结构中可以使用wait()、notify()以及notifyall(),因为它们都持有锁。

对于下面的两个同步代码块来说,分别使用的是锁obj1和锁obj2,其中线程1、线程2执行的是obj1对应的同步代码,线程3、线程4执行的是obj2对应的同步代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class mylock implements runnable {
 public int flag = 0;
 object obj1 = new object();
 object obj2 = new object();
 public void run(){
 while(true){
  if(flag%2=0){
  synchronized(obj1){ //线程t1和t2执行此同步任务
   //try{obj1.wait();}catch(interruptedexception i){}
   //obj1.notify()
   //obj1.notifyall()
  }
  } else {
  synchronized(obj2){ //线程t3和t4执行此同步任务
   //try{obj2.wait();}catch(interruptedexception i){}
   //obj2.notify()
   //obj2.notifyall()
  }
  }
 }
 }
}
class demo {
 public static void main(string[] args){
 mylock ml = new mylock();
 thread t1 = new thread(ml);
 thread t2 = new thread(ml);
 thread t3 = new thread(ml);
 thread t4 = new thread(ml);
 t1.start();
 t2.start();
 try{thread.sleep(1)}catch(interruptedexception i){};
 ml.flag++;
 t3.start();
 t4.start();
 }
}

当t1开始执行到wait()时,它将进入睡眠状态,但却不是一般的睡眠,而是在一个被obj1标识的线程池中睡眠(实际上是监视器对应线程池,只不过此时的监视器和锁是绑定在一起的)。当t2开始执行,它发现锁obj1被其他线程持有,它将进入睡眠态,这次睡眠是因为锁资源等待而非wait()进入的睡眠。因为t2已经判断过它要申请的是obj1锁,因此它也会进入obj1这个线程池睡眠,而不是普通的睡眠。同理t3和t4,这两个线程会进入obj2线程池睡眠。

当某个线程执行到notify()时,这个notify()将 随机 唤醒它 所属锁对应线程池 中的 任意一个 线程。例如,obj1.notify()将唤醒obj1线程池中任意一个睡眠的线程(当然,如果没有睡眠线程则什么也不做)。同理notifyall()则是唤醒所属锁对应线程池中所有睡眠的线程。

必须要搞清楚的是"对应锁",因为在调用wait()、notify()和notifyall()时都必须明确指定锁。例如,obj1.wait()。如果省略了所属锁,则表示的是this这个对象,也就是说,只有在非静态的同步函数中才能省略这三个方法的前缀。

简而言之,当使用了同步,就使用了锁,线程也就有了归属,它的所有依据都由所属锁来决定。例如,线程同步时,判断锁是否空闲以决定是否执行后面的代码,亦决定是否去特定的线程池中睡眠,当唤醒时也只会唤醒所属锁对应线程池中的线程。

这几个方法在应用上,一般在一次任务中,wait()和notify()/notifyall()是成对出现且择一执行的。换句话说,就是这一轮原子性同步执行过程中,要么执行wait()进入睡眠,要么执行notify()唤醒线程池中的睡眠线程。要如何实现择一执行,可以考虑使用标记的方式来作为判断依据。参考后文的例子。

2.lock和condition

wait()系列的三个方法局限性很大,因为无论是睡眠还是唤醒的动作,都完全和锁耦合在一起了。例如,锁obj1关联的线程只能唤醒obj1线程池中的线程,而无法唤醒锁obj2关联的线程;再例如,在原来synchronized同步时,锁是在开始同步时隐式地自动获取的,且是在执行完一整个任务后,又隐式地自动释放锁,也就是说获取锁和释放锁的动作无法人为控制。

从jdk 1.5开始,java提供了java.util.concurrent.locks包,这个包中提供了lock接口、condition接口和readwritelock接口,前两个接口将锁和监视器方法(睡眠、唤醒操作)解耦了。其中lock接口只提供锁,通过锁方法newconditon()可以生成一个或多个与该锁关联的监视器,每个监视器都有自己的睡眠、唤醒方法。也就是说lock替代了synchronized方法和同步代码块的使用,condition替代了object监视器方法的使用。

如下图:

当某线程执行condition1.await()时,该线程将进入condition1监视器对应的线程池睡眠,当执行condition1.signal()时,将随机唤醒condition1线程池中的任意一个线程,当执行condition1.signalall()时,将唤醒condition1线程池中的所有线程。同理,对于condition2监视器也是一样的。

即使有多个监视器,但只要它们关联的是同一个锁对象,就可以跨监视器操作对方线程。例如condition1中的线程可以执行condition2.signal()来唤醒condition2线程池中的某个线程。

要使用这种锁、监视器的关联方式,参考如下步骤:

1
2
3
4
5
6
7
8
9
10
import java.util.concurrent.locks.*;
lock l = new reentrantlock();
condition con1 = l.newcondition();
condition con2 = l.newcondition();
l.lock();
try{
 //包含await()、signal()或signalall()的代码段...
} finally {
 l.unlock(); //由于代码段可能异常,但unlock()是必须执行的,所以必须使用try,且将unlock()放进finally段
}

具体用法见后文关于lock、condition的示例代码。

3.单生产者单消费者模式

一个生产者线程,一个消费者线程,生产者每生产一个面包放进盘子里,消费者从盘子里取出面包进行消费。其中生产者判断是否继续生产的依据是盘子里没有面包,而消费者判断是否消费的依据是盘子里有面包。由于这个模式中,盘子一直只放一个面包,因此可以把盘子省略掉,生产者和消费者直接手把手地交递面包即可。

首先需要描述这三个类,一是多线程共同操作的资源(此处即面包),二是生产者,三是消费者。在下面的例子中,我把生产面包和消费面包的方法分别封装到了生产者和消费者类中,如果把它们封装在面包类中则更容易理解。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
//描述资源:面包的名称和编号,由编号决定面包的号码
class bread {
 public string name;
 public int count = 1;
 public boolean flag = false; //该标记为wait()和notify()提供判断标记
}
//生产者和消费者先后处理的面包资源是同一个,要确保这一点,
//可以按单例模式来设计面包类,也可以将同一个面包对象通过构造方法传递给生产者和消费者,此处使用后一种方式。
//描述生产者
class producer implements runnable {
 private bread b; //生产者的成员:它要处理的资源
 producer(bread b){
 this.b = b;
 }
 //提供生产面包的方法
 public void produce(string name){
 b.name = name + b.count;
 b.count++;
 }
 public void run(){
 while(true){
  synchronized(bread.class){ //使用bread.class作为锁标识,使得生产者和消费者的同步代码块可以使用同一个锁
  if(b.flag){  //wait()必须在同步代码块内部,不仅因为必须持有锁才能睡眠,而且对锁这个资源的判断会出现混乱
   try{bread.class.wait();}catch(interruptedexception i){}
  }
  produce("面包");
  system.out.println(thread.currentthread().getname()+"----生产者------"+b.name);
  try{thread.sleep(10);}catch(interruptedexception i){}
  b.flag = true;  //标记的切换也必须在保持同步
  bread.class.notify(); //notify()也必须同步,否则锁都已经释放了,就无法做唤醒动作
  //ps:一次同步任务中,wait()和notify()应当只能其中一个执行,否则对方线程会混乱
  }
 }
 }
}
//描述消费者
class consumer implements runnable {
 private bread b; //消费者的成员:它要处理的资源
 consumer(bread b){
 this.b = b;
 }
 //提供消费面包的方法
 public string consume(){
 return b.name;
 }
 public void run(){
 while(true){
  synchronized(bread.class){
  if(!b.flag){
   try{bread.class.wait();}catch(interruptedexception i){}
  }
  system.out.println(thread.currentthread().getname()+"----消费者-------------"+consume());
  try{thread.sleep(10);}catch(interruptedexception i){}
  b.flag = false;
  bread.class.notify();
  }
 }
 }
}
public class produceconsume_1{
 public static void main(string[] args) {
 //1.创建资源对象
 bread b = new bread();
 //2.创建生产者和消费者对象,将同一个面包对象传递给生产者和消费者
 producer pro = new producer(b);
 consumer con = new consumer(b);
 //3.创建线程对象
 thread pro_t = new thread(pro);
 thread con_t = new thread(con);
 pro_t.start();
 con_t.start();
 }
}

最后的执行结果应当生产一个、消费一个,如此不断循环。如下:

1
2
3
4
5
6
7
8
9
10
11
12
thread-0----生产者------面包1
thread-1----消费者-------------面包1
thread-0----生产者------面包2
thread-1----消费者-------------面包2
thread-0----生产者------面包3
thread-1----消费者-------------面包3
thread-0----生产者------面包4
thread-1----消费者-------------面包4
thread-0----生产者------面包5
thread-1----消费者-------------面包5
thread-0----生产者------面包6
thread-1----消费者-------------面包6

4.使用lock和condition实现单生产单消费模式

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import java.util.concurrent.locks.*;
class bread {
 public string name;
 public int count = 1;
 public boolean flag = false;
 //为生产者和消费者提供同一个锁对象以及同一个condition对象
 public static lock lock = new reentrantlock();
 public static condition condition = lock.newcondition();
}
class producer implements runnable {
 private bread b;
 producer(bread b){
 this.b = b;
 }
 public void produce(string name){
 b.name = name + b.count;
 b.count++;
 }
 public void run(){
 while(true){
  //使用bread.lock来锁住资源
  bread.lock.lock();
  try{
  if(b.flag){
   try{bread.condition.await();}catch(interruptedexception i){}
  }
  produce("面包");
  system.out.println(thread.currentthread().getname()+"----生产者------"+b.name);
  try{thread.sleep(10);}catch(interruptedexception i){}
  b.flag = true;
  bread.condition.signal();
  } finally {
  bread.lock.unlock();
  }
 }
 }
}
class consumer implements runnable {
 private bread b;
 consumer(bread b){
 this.b = b;
 }
 public string consume(){
 return b.name;
 }
 public void run(){
 while(true){
  //使用bread.lock来锁住资源
  bread.lock.lock();
  try{
  if(!b.flag){
   try{bread.condition.await();}catch(interruptedexception i){}
  }
  system.out.println(thread.currentthread().getname()+"----消费者-------------"+consume());
  try{thread.sleep(10);}catch(interruptedexception i){}
  b.flag = false;
  bread.condition.signal();
  } finally {
  bread.lock.unlock();
  }
 }
 }
}
public class produceconsume_1{
 public static void main(string[] args) {
 //1.创建资源对象
 bread b = new bread();
 //2.创建生产者和消费者对象,将同一个面包对象传递给生产者和消费者
 producer pro = new producer(b);
 consumer con = new consumer(b);
 //3.创建线程对象
 thread pro_t = new thread(pro);
 thread con_t = new thread(con);
 pro_t.start();
 con_t.start();
 }
}

5.多生产多消费模式(单面包)

这里先说明多生产者多消费者,但同一个时刻最多只能有一个面包的模式,这个模式在实际中可能是不理想的,但为了引出后面真实的多生产多消费模式,我觉得有必要在这里解释这种模式,并且分析这种模式以及如何从单生产单消费的代码演变而来。

如下图:

从单生产单消费到多生产多消费,因为多线程安全问题和死锁问题,所以有两个方面的问题需要考虑:

对于某一方来说,如何让多线程达到和单线程同样的生产或消费能力?也就是说,如何让多线程看上去就是单线程。多线程和单线程最大的区别在于多线程安全问题,因此,只要保证多线程执行的任务能够同步即可。

第1个问题考虑的是某一方多线程的问题,第2个问题考虑的是两方如何能和谐配合完成生产消费问题。也就是如何保证生产方和消费方一方活动的同时另一方睡眠。只需在某一方执行完同步任务时,唤醒另一方即可。

其实从单线程到多线程,就两个问题需要考虑:不同步和死锁。(1)当生产方和消费方都出现了多线程,可以将生产方的多线程看成一个线程整体、消费方的多线程也看成一个整体,这解决的是线程安全问题。(2)再将生产方整体和消费方整体两方结合起来看成多线程,来解决死锁问题,而java中解决死锁的方式就是唤醒对方或唤醒所有。

问题是如何保证某一方的多线程之间同步?以多线程执行单消费方的代码为例进行分析。

1
2
3
4
5
6
7
8
9
10
11
while(true){
 synchronized(bread.class){
 if(!b.flag){
  try{bread.class.wait();}catch(interruptedexception i){}
 }
 system.out.println(thread.currentthread().getname()+"----消费者-------------"+consume());
 try{thread.sleep(10);}catch(interruptedexception i){}
 b.flag = false;
 bread.class.notify();
 }
}

假设消费线程1消费完一个面包后唤醒了消费线程2,并继续循环,判断if(!flag),它将wait,于是锁被释放。假设cpu正好选中了消费线程2,那么消费线程2也将进入wait。当生产方生产了一个面包后,假设唤醒了消费线程1,它将从wait语句处继续向下消费刚生产完的面包,假设正好再次唤醒了消费线程2,当消费线程2被cpu选中后,消费线程2也将从wait语句处向下消费,消费的也是刚才生产的面包,问题再此出现了,连续唤醒的消费线程1和2消费的是同一个面包,也就是说面包被重复消费了。这又是多线程不同步问题。

说了一大段,其实将视线放大后分析就很简单了,只要某一方的2个或多个线程都因为判断b.flag而wait,那么这两个或多个线程有可能会被连续唤醒而继续向下生产或消费。这造成了多线程不同步问题。

不安全的问题就出在同一方的多个线程在连续唤醒后继续向下生产或消费。这是if语句引起的,如果能够让wait的线程在唤醒后还回头判断b.flag是否为true,就能让其决定是否继续wait还是向下生产或消费。

可以将if语句替换为while语句来满足要求。这样一来,无论某一方的多个线程是否被连续唤醒,它们都将回头判断b.flag。

1
2
3
4
5
6
7
8
9
10
11
while(true){
 synchronized(bread.class){
 while(!b.flag){
  try{bread.class.wait();}catch(interruptedexception i){}
 }
 system.out.println(thread.currentthread().getname()+"----消费者-------------"+consume());
 try{thread.sleep(10);}catch(interruptedexception i){}
 b.flag = false;
 bread.class.notify();
 }
}

解决了第一个多线程安全的问题,但会出现死锁问题。这很容易分析,将生产方看作一个整体,将消费方也看作一个整体,当生产方线程都wait了(生产方的线程被连续唤醒时会出现该方线程全部wait),消费方也都wait了,死锁就出现了。其实放大了看,将生产方、消费方分别看作一个线程,这两个线程组成多线程,当某一方wait后无法唤醒另一方,另一方也一定会wait,于是就死锁了。

对于双方死锁的问题,只要保证能唤醒对方,而非本方连续唤醒就能解决。使用notifyall()或signalall()即可,也可以通过signal()唤醒对方线程解决,见下面的第二段代码。

根据上面的分析,将单生产、单消费模式的代码改进一下,就可以变为多生产多消费单面包模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
//代码段1
class bread {
 public string name;
 public int count = 1;
 public boolean flag = false;
}
//描述生产者
class producer implements runnable {
 private bread b;
 producer(bread b){
  this.b = b;
 }
 public void produce(string name){
  b.name = name + b.count;
  b.count++;
 }
 public void run(){
  while(true){
    synchronized(bread.class){
     while(b.flag){
      try{bread.class.wait();}catch(interruptedexception i){}
     }
     produce("面包");
     system.out.println(thread.currentthread().getname()+"----生产者------"+b.name);
     try{thread.sleep(10);}catch(interruptedexception i){}
     b.flag = true;
     bread.class.notifyall();
   }
  }
 }
}
//描述消费者
class consumer implements runnable {
 private bread b;
 consumer(bread b){
  this.b = b;
 }
 public string consume(){
  return b.name;
 }
 public void run(){
  while(true){

相关文章:

验证码:
移动技术网