生产者消费者问题是线程同步的经典问题,也称为有界缓冲区问题,问题描述大致如下:
生产者消费者问题中的缓冲区,包括队列缓冲区和环形缓冲区,它们都按照先进先出的顺序处理数据,我们现在只考虑队列缓冲区:
缓冲区有两个极端状态:缓冲区空,缓冲区满。链表队列和数组队列缓冲区空的含义相同,都是队列中没有一个元素的情形,但两者缓冲区满的含义不同:
此外,posix消息队列也可以作为队列缓冲区,posix当以无优先级消息的方式使用时,也是按照先进先出的顺序进行处理的。
本文只讨论第一种数据结构队列缓冲区,基于posix消息队列缓冲区的生产者消费者问题,会在后续posix消息队列中单独讲解。
生产者消费者个数的多少、缓冲区的类型都会影响生产者消费者问题模型的复杂度,本文选取两种常见典型模型进行分析。
该模型只需要处理生产者和消费者之间的同步问题,在实际工程很常见,具体的同步详情为:
模型二与模型一相比,既需要处理生产者之间的同步问题,又需要处理生产者和消费者之间的同步问题,在实际工程也比较常见,具体的同步详情为:
模型一和模型二所列均为必须处理的同步问题,还有一个根据实际情况、可能会存在的同步需求:
队列缓冲区,不管是数组实现还是链表实现,其内部都符合上述条件,都需要处理该可选同步需求。
网上找了份数据结构队列c语言实现的代码,稍微改了下,可正常使用,本节后续的生产者消费者问题示例代码就是用它作为缓冲区。
#ifndef _link_queue_h_ #define _link_queue_h_ typedef enum { false = 0, true } bool; typedef int data_t; typedef struct linknode { data_t data; struct linknode *next; } linknode, *linkqueue; typedef struct { linkqueue front; linkqueue rear; } headqueue; headqueue *createemptyqueue(); bool emptylinkqueue(headqueue *queue); void enqueue(headqueue *queue, data_t value); void dequeue(headqueue *queue, data_t *value); void printqueue(headqueue *queue); bool clearlinkqueue(headqueue *queue); bool destroylinkqueue(headqueue *queue); int getcuritemsnum(headqueue *queue); #endif
#include "linkqueue.h" #include <stdio.h> #include <stdlib.h> static int nitems; //创建空链表队列 headqueue *createemptyqueue(void) { headqueue *queue = (headqueue *)malloc(sizeof(headqueue)); if (queue == null) { perror("create empty queue failed"); exit(exit_failure); } queue->rear = queue->front = null; nitems = 0; return queue; } //判断是否为空链表队列 bool emptylinkqueue(headqueue *queue) { if (queue == null) { printf("empty link queue error!\n"); exit(exit_failure); } return queue->front == null ? true : false; } //增加队列元素 void enqueue(headqueue *queue, data_t value) { linkqueue new; if (queue == null) { printf("enqueue error!\n"); return; } new = (linkqueue)malloc(sizeof(linknode)); if (new == null) { perror("insert value failed"); return; } new->data = value; new->next = null; if (emptylinkqueue(queue)) { queue->front = queue->rear = new; } else { queue->rear->next = new; queue->rear = new; } nitems++; } //删除队列元素 void dequeue(headqueue *queue, data_t *value) { *value = 0; linkqueue remove; if (queue == null) { printf("dequeue error!\n"); return; } if (emptylinkqueue(queue)) { printf("queue is empty!\n"); return; } remove = queue->front; queue->front = remove->next; if (queue->front == null) queue->rear = null; *value = remove->data; free(remove); nitems--; } //遍历队列元素 void printqueue(headqueue *queue) { linkqueue node; printf("queue = {"); node = queue->front; if (node == null) { printf("}\n"); return ; } while (node != null) { printf("%d,", node->data); node = node->next; } printf("\b}\n"); } //清空队列元素 bool clearlinkqueue(headqueue *queue) { linkqueue remove = queue->front; while (remove != null) { queue->front = queue->front->next; free(remove); remove = queue->front; } queue->front = null; queue->rear = null; nitems = 0; return true; } //销毁队列 bool destroylinkqueue(headqueue *queue) { if (queue != null) { clearlinkqueue(queue); free(queue); nitems = 0; return true; } else { printf("destroylinkqueue error!\n"); return false; } } //获得当前队列元素个数 int getcuritemsnum(headqueue *queue) { return nitems; }
#include "linkqueue.h" #include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/time.h> #include <pthread.h> #define max_threads 10 #define max_items 1000000 #define max_buffer 10 /* * 编程技巧:尽量把共享数据和它们的同步变量(互斥锁、条件变量、信号量)收集到同一个结构体中 */ struct shared { pthread_cond_t cond_nempty; //条件变量:缓冲区不满 pthread_cond_t cond_nstored; //条件变量:缓冲区不空 pthread_mutex_t cond_mutex; //保护条件的锁,用于确保同时只有一个线程可以访问缓冲区 pthread_mutex_t mutex; //同步多个生产者的锁,仅在有多个生产者时使用 headqueue *queue; //队列缓冲区 int nput; int nval; }; struct shared shared; void shared_init() { shared.queue = createemptyqueue(); pthread_mutex_init(&shared.mutex, null); pthread_mutex_init(&shared.cond_mutex, null); pthread_cond_init(&shared.cond_nempty, null); pthread_cond_init(&shared.cond_nstored, null); } void shared_destroy() { destroylinkqueue(shared.queue); pthread_mutex_destroy(&shared.mutex); pthread_mutex_destroy(&shared.cond_mutex); pthread_cond_destroy(&shared.cond_nempty); pthread_cond_destroy(&shared.cond_nstored); } void *produce(void *arg) { int nthreads = *((int *)arg); while (1) { if (shared.nput >= max_items) { pthread_exit(null); } if (nthreads > 1) { pthread_mutex_lock(&shared.mutex); } pthread_mutex_lock(&shared.cond_mutex); while (getcuritemsnum(shared.queue) == max_buffer) pthread_cond_wait(&shared.cond_nempty, &shared.cond_mutex); enqueue(shared.queue, shared.nval); shared.nput++; shared.nval++; pthread_cond_signal(&shared.cond_nstored); if (nthreads > 1) { pthread_mutex_unlock(&shared.mutex); } pthread_mutex_unlock(&shared.cond_mutex); } pthread_exit(null); } void *consume(void *arg) { int nval; int i; for (i = 0; i < max_items; i++) { pthread_mutex_lock(&shared.cond_mutex); while (getcuritemsnum(shared.queue) == 0) pthread_cond_wait(&shared.cond_nstored, &shared.cond_mutex); dequeue(shared.queue, &nval); pthread_cond_signal(&shared.cond_nempty); pthread_mutex_unlock(&shared.cond_mutex); if (nval != i) { printf("error: buff[%d] = %d\n", i, nval); } } pthread_exit(null); } int main(int argc, char **argv) { pthread_t tid_produce[max_threads]; pthread_t tid_consume; int nthreads; struct timeval start_time; struct timeval end_time; float time_sec; int i; nthreads = (atoi(argv[1]) > max_threads) ? max_threads : atoi(argv[1]); shared_init(); gettimeofday(&start_time, null); for (i = 0; i < nthreads; i++) { pthread_create(&tid_produce[i], null, produce, &nthreads); } pthread_create(&tid_consume, null, consume, null); for (i = 0; i < nthreads; i++) { pthread_join(tid_produce[i], null); } pthread_join(tid_consume, null); gettimeofday(&end_time, null); time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0; printf("%d produce and %d consume total spend %.2f second\n", nthreads, 1, time_sec); shared_destroy(); return 0; }
运行时通过命令行参数指定生产者个数,来选择模型一或模型二,其中,在produce()中,第57-56行、第74-77行会根据生产者个数,选择是否使用第二把锁。
下面分别是模型一和模型二的运行结果。
#include "linkqueue.h" #include <stdio.h> #include <stdlib.h> #include <string.h> #include <fcntl.h> #include <sys/time.h> #include <pthread.h> #include <semaphore.h> #define sem_nempty "/sem_nempty" #define sem_nstroed "/sem_nstored" #define max_threads 10 #define max_items 1000000 #define max_buffer 10 /* * 编程技巧:把共享数据和它们的同步变量(互斥锁、条件变量、信号量)收集到同一个结构体中 */ struct shared { pthread_mutex_t mutex; sem_t *nempty; sem_t *nstored; headqueue *queue; int nput; int nval; }; struct shared shared; void shared_init() { shared.queue = createemptyqueue(); pthread_mutex_init(&shared.mutex, null); shared.nempty = sem_open(sem_nempty, o_creat, 0666, max_buffer); shared.nstored = sem_open(sem_nstroed, o_creat, 0666, 0); } void shared_destroy() { destroylinkqueue(shared.queue); pthread_mutex_destroy(&shared.mutex); sem_close(shared.nempty); sem_close(shared.nstored); sem_unlink(sem_nempty); sem_unlink(sem_nstroed); } void *produce(void *arg) { while (1) { if (shared.nput >= max_items) { pthread_exit(null); } /* * produce和consume都必须先sem_wait,确保sem_wait返回后再上锁; * 防止先上锁后sem_wait阻塞,导致另一方二次上锁而死锁. */ sem_wait(shared.nempty); pthread_mutex_lock(&shared.mutex); enqueue(shared.queue, shared.nval); /* 如果队列缓冲区中的元素个数超过了max_buffer,就输出提示信息 */ if (getcuritemsnum(shared.queue) > max_buffer) { printf("notice: queue buffer capacity > %d\n", max_buffer); } shared.nput++; shared.nval++; pthread_mutex_unlock(&shared.mutex); sem_post(shared.nstored); } pthread_exit(null); } void *consume(void *arg) { int nval; int i; for (i = 0; i < max_items; i++) { /* * produce和consume都必须先sem_wait,确保sem_wait返回后再上锁; * 防止先上锁后sem_wait阻塞,导致另一方二次上锁而死锁. */ sem_wait(shared.nstored); pthread_mutex_lock(&shared.mutex); dequeue(shared.queue, &nval); if (nval != i) { printf("error: buff[%d] = %d\n", i, nval); } pthread_mutex_unlock(&shared.mutex); sem_post(shared.nempty); } pthread_exit(null); } int main(int argc, char **argv) { pthread_t tid_produce[max_threads]; pthread_t tid_consume; int nthreads; struct timeval start_time; struct timeval end_time; float time_sec; int i; nthreads = (atoi(argv[1]) > max_threads) ? max_threads : atoi(argv[1]); shared_init(); gettimeofday(&start_time, null); for (i = 0; i < nthreads; i++) { pthread_create(&tid_produce[i], null, produce, &nthreads); } pthread_create(&tid_consume, null, consume, null); for (i = 0; i < nthreads; i++) { pthread_join(tid_produce[i], null); } pthread_join(tid_consume, null); gettimeofday(&end_time, null); time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0; printf("%d produce and %d consume total spend %.2f second\n", nthreads, 1, time_sec); shared_destroy(); return 0; }
运行时通过命令行参数指定生产者个数,来选择模型一或模型二。
#include "linkqueue.h" #include <stdio.h> #include <stdlib.h> #include <string.h> #include <fcntl.h> #include <sys/time.h> #include <pthread.h> #include <semaphore.h> #define max_threads 10 #define max_items 1000000 #define max_buffer 10 /* * 编程技巧:把共享数据和它们的同步变量(互斥锁、条件变量、信号量)收集到同一个结构体中 */ struct shared { pthread_mutex_t mutex; sem_t nempty; sem_t nstored; headqueue *queue; int nput; int nval; }; struct shared shared; void shared_init() { shared.queue = createemptyqueue(); pthread_mutex_init(&shared.mutex, null); sem_init(&shared.nempty, 0, max_buffer); sem_init(&shared.nstored, 0, 0); } void shared_destroy() { destroylinkqueue(shared.queue); pthread_mutex_destroy(&shared.mutex); sem_destroy(&shared.nempty); sem_destroy(&shared.nstored); } void *produce(void *arg) { while (1) { if (shared.nput >= max_items) { pthread_exit(null); } /* * produce和consume都必须先sem_wait,确保sem_wait返回后再上锁; * 防止先上锁后sem_wait阻塞,导致另一方二次上锁而死锁. */ sem_wait(&shared.nempty); pthread_mutex_lock(&shared.mutex); enqueue(shared.queue, shared.nval); /* 如果队列缓冲区中的元素个数超过了max_buffer,就输出提示信息 */ if (getcuritemsnum(shared.queue) > max_buffer) { printf("notice: queue buffer capacity > %d\n", max_buffer); } shared.nput++; shared.nval++; pthread_mutex_unlock(&shared.mutex); sem_post(&shared.nstored); } pthread_exit(null); } void *consume(void *arg) { int nval; int i; for (i = 0; i < max_items; i++) { /* * produce和consume都必须先sem_wait,确保sem_wait返回后再上锁; * 防止先上锁后sem_wait阻塞,导致另一方二次上锁而死锁. */ sem_wait(&shared.nstored); pthread_mutex_lock(&shared.mutex); dequeue(shared.queue, &nval); if (nval != i) { printf("error: buff[%d] = %d\n", i, nval); } pthread_mutex_unlock(&shared.mutex); sem_post(&shared.nempty); } pthread_exit(null); } int main(int argc, char **argv) { pthread_t tid_produce[max_threads]; pthread_t tid_consume; int nthreads; struct timeval start_time; struct timeval end_time; float time_sec; int i; nthreads = (atoi(argv[1]) > max_threads) ? max_threads : atoi(argv[1]); shared_init(); gettimeofday(&start_time, null); for (i = 0; i < nthreads; i++) { pthread_create(&tid_produce[i], null, produce, &nthreads); } pthread_create(&tid_consume, null, consume, null); for (i = 0; i < nthreads; i++) { pthread_join(tid_produce[i], null); } pthread_join(tid_consume, null); gettimeofday(&end_time, null); time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0; printf("%d produce and %d consume total spend %.2f second\n", nthreads, 1, time_sec); shared_destroy(); return 0; }
运行时通过命令行参数指定生产者个数,来选择模型一或模型二。
该结论仅限于本文使用的示例代码,仅作结果陈述,不代表具有普适性,也没有进行深层原因分析。
在使用条件变量的示例代码中,当有多个生产者时用了两把锁:
实际上,只需要一个cond_mutex就够了,但经过测试发现这样运行时间会明显增加,如下图所示:
先从理论上分析下,10个生产者1个消费者:
总结起来就是:只能同时有一个线程访问缓冲区。从这个结论来看,确实不需要第二把锁,在一把锁就够用的情况下,再加第二把锁反而多了不必要的开销。
经过测试,posix有名和无名信号量都符合这个理论,运行时间明显增加了,有名信号量从1.9s增加到2.3s,无名信号量从1.4s增加到2.5s。
但邪门就邪门在,条件变量却相反,一把锁效率低,两把锁反而效率有大幅提升,想不通为什么!
如对本文有疑问, 点击进行留言回复!!
linux下文本编辑器vim的使用方法(复制、粘贴、替换、行号、撤销、多文件操作)
网友评论