消息队列可认为是一个消息链表,队列中的每个消息具有如下属性:
posix消息队列主要用于线程间消息的传递:
mq_open
用于创建一个新的消息队列或打开一个已存在的消息队列,编译时需指定链接-lrt,下面其他函数同理。
//成功返回消息队列描述符,失败返回-1 mqd_t mq_open(const char *name, int oflag, ... /* mode_t mode, struct mq_attr *attr */);
/dev/mqueue/
目录才可以查看mkdir /dev/mqueue mount -t mqueue none /dev/mqueue
mq_close
用于关闭已打开的消息队列,mq_unlink
用于从系统中删除消息队列。
//两个函数返回值:成功返回0,失败返回-1 int mq_close(mqd_t mqdes); int mq_unlink(const char *name);
关闭与删除机制已在posix信号量中讲过,这里不再赘述。
//成功返回0,失败返回-1 int mq_getattr(mqd_t mqdes, struct mq_attr *attr);
mq_getattr
用于获取消息队列的四个属性,这四个属性定义在struct mq_attr
结构体中。
struct mq_attr { long mq_flags; //非阻塞标志,可设0或o_nonblock,由且仅由mq_setattr设置 long mq_maxmsg; //队列中最大消息条数,由mq_open在创建新队列时设置 long mq_msgsize; //消息最大长度,由mq_open在创建新队列时设置 long mq_curmsgs; //队列中当前消息条数,只能获取不能设置 };
//成功返回0,失败返回-1 int mq_setattr(mqd_t mqdes, const struct mq_attr *attr, struct mq_attr *oldattr);
在消息队列的四个属性中:
mq_maxmsg
和mq_msgsize
只能在创建新队列时由mq_open的attr参数设置#include <mqueue.h> #include <stdio.h> int main() { struct mq_attr attr; struct mq_attr attr1; mqd_t mqdes; /* * 在我的系统上,消息队列默认属性为:mq_maxmsg = 10, mq_msgsize = 8192. * 这里显式指定attr.mq_maxmsg = 5,mq_msgsize不赋值,会导致mq_open失败. */ attr.mq_maxmsg = 5; //attr.mq_msgsize = 8192; if ((mqdes = mq_open("/mqueue1", o_rdwr | o_creat, 0666, &attr)) == -1) { printf("mq_open create new mqueue failed because attr.mq_msgsize not specified.\n"); } mq_getattr(mqdes, &attr1); printf("%ld %ld\n", attr1.mq_maxmsg, attr1.mq_msgsize); mq_close(mqdes); mq_unlink("/mqueue1"); return 0; }
mq_send
用于向队列中放入一个消息,mq_receive
用于从队列中取走一个消息。
//成功返回0,失败返回-1 int mq_send(mqd_t mqdes, const char *ptr, size_t len, unsigned int prio); //成功返回消息数据长度,失败返回-1 ssize_t mq_receive(mqd_t mqdes, char *ptr, size_t len, unsigned int *prio);
消息队列共有4个属性受到系统限制:
其中,前两个限制和应用程序的开发密切相关,既要保证队列不会被填满,又要保证消息长度不会超过允许的最大值,必要时可以修改linux内核源码来改变限制值。
查看限制的方法:
cat /proc/sys/fs/mqueue/msg_max //struct mq_attr.mq_maxmsg <= msg_max cat /proc/sys/fs/mqueue/msgsize_max //struct mq_attr.mq_msgsize <= msgsize_max cat /proc/sys/fs/mqueue/queues_max
不难看出,posix消息队列的基本使用模型就是一个典型的生产者消费者问题:
我们把前面写的生产者消费者代码拿来稍微改一下,先来看一个单生产者 + 单消费者的例子。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/time.h> #include <pthread.h> #include <mqueue.h> #define posix_queue "/mqueue" #define max_threads 1 #define max_items 1000000 struct shared { mqd_t mqdes; int nput; int nval; }; struct shared shared; void shared_init() { shared.mqdes = mq_open(posix_queue, o_rdwr | o_creat, 0666, null); //在我的系统中,posix消息队列最大容量为10 mq_unlink(posix_queue); } void shared_destroy() { mq_close(shared.mqdes); } void *produce(void *arg) { while (1) { if (shared.nput >= max_items) { pthread_exit(null); } mq_send(shared.mqdes, (char *)&shared.nval, sizeof(shared.nval), 0); shared.nput++; shared.nval++; /* 线程tid_produce[i]每执行一次,就累加count[i]的值 */ *((int *)arg) += 1; } pthread_exit(null); } void *consume(void *arg) { struct mq_attr attr; int nval; int i; mq_getattr(shared.mqdes, &attr); printf("system defaut mq_maxmsg = %ld, mq_msgsize = %ld\n", attr.mq_maxmsg, attr.mq_msgsize); for (i = 0; i < max_items; i++) { //消费者线程按顺序取出消息,根据mq_getattr返回结果来设置mq_receive的参数len mq_receive(shared.mqdes, (char *)&nval, attr.mq_msgsize, null); if (nval != i) { printf("error: buff[%d] = %d\n", i, nval); } } pthread_exit(null); } int main() { pthread_t tid_produce[max_threads]; pthread_t tid_consume; int count[max_threads]; struct timeval start_time; struct timeval end_time; float time_sec; int i; shared_init(); gettimeofday(&start_time, null); for (i = 0; i < max_threads; i++) { count[i] = 0; pthread_create(&tid_produce[i], null, produce, &count[i]); } pthread_create(&tid_consume, null, consume, null); for (i = 0; i < max_threads; i++) { pthread_join(tid_produce[i], null); printf("count[%d] = %d\n", i, count[i]); //输出每个线程的执行次数 } pthread_join(tid_consume, null); gettimeofday(&end_time, null); time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0; printf("%d produce and %d consume total spend %.2f second\n", max_threads, 1, time_sec); shared_destroy(); return 0; }
注意观察代码和运行结果,可以发现生产者和消费者之间并没有做同步处理,但仍然得到了正确结果,这是因为当没有设置非阻塞标志时,posix消息队列自带隐式同步机制:
而这正是单生产者 + 单消费者模型唯一需要处理的同步问题,因此不需要应用程序再进行显式同步。
显式同步,指的是使用互斥锁、条件变量、信号量等方式进行的同步;posix自带的同步在内核中进行,对于应用程序来说是不可见的,因此称其为隐式同步。
当有多个生产者时,posix消息队列自带的同步机制就不够用了,需要显式处理生产者线程之间的同步问题,我们使用互斥锁实现这个功能。
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <sys/time.h> #include <pthread.h> #include <mqueue.h> #define posix_queue "/mqueue" #define max_threads 10 #define max_items 1000000 struct shared { pthread_mutex_t mutex; mqd_t mqdes; int nput; int nval; }; struct shared shared; void shared_init() { pthread_mutex_init(&shared.mutex, null); shared.mqdes = mq_open(posix_queue, o_rdwr | o_creat, 0666, null); //在我的系统中,posix消息队列最大容量为10 mq_unlink(posix_queue); } void shared_destroy() { pthread_mutex_destroy(&shared.mutex); mq_close(shared.mqdes); } void *produce(void *arg) { while (1) { pthread_mutex_lock(&shared.mutex); if (shared.nput >= max_items) { pthread_mutex_unlock(&shared.mutex); pthread_exit(null); } //生产者线程依次累加nval的值,并以无优先级消息方式放入消息队列 mq_send(shared.mqdes, (char *)&shared.nval, sizeof(shared.nval), 0); shared.nput++; shared.nval++; pthread_mutex_unlock(&shared.mutex); /* 线程tid_produce[i]每执行一次,就累加count[i]的值 */ *((int *)arg) += 1; } pthread_exit(null); } void *consume(void *arg) { struct mq_attr attr; int nval; int i; mq_getattr(shared.mqdes, &attr); printf("system defaut mq_maxmsg = %ld, mq_msgsize = %ld\n", attr.mq_maxmsg, attr.mq_msgsize); for (i = 0; i < max_items; i++) { mq_receive(shared.mqdes, (char *)&nval, attr.mq_msgsize, null); //根据mq_getattr返回结果来设置mq_receive的参数len if (nval != i) { printf("error: buff[%d] = %d\n", i, nval); } } pthread_exit(null); } int main() { pthread_t tid_produce[max_threads]; pthread_t tid_consume; int count[max_threads]; struct timeval start_time; struct timeval end_time; float time_sec; int i; shared_init(); gettimeofday(&start_time, null); for (i = 0; i < max_threads; i++) { count[i] = 0; pthread_create(&tid_produce[i], null, produce, &count[i]); } pthread_create(&tid_consume, null, consume, null); for (i = 0; i < max_threads; i++) { pthread_join(tid_produce[i], null); printf("count[%d] = %d\n", i, count[i]); //输出每个线程的执行次数 } pthread_join(tid_consume, null); gettimeofday(&end_time, null); time_sec = (end_time.tv_sec - start_time.tv_sec) + (end_time.tv_usec - start_time.tv_usec) / 1000000.0; printf("%d produce and %d consume total spend %.2f second\n", max_threads, 1, time_sec); shared_destroy(); return 0; }
遇到的问题(原因暂时未知):
和生产者消费者一节中的解决方案相比,posix消息队列的效率比信号量差,比条件变量高:
而且,同使用posix消息队列,10个生产者 + 互斥锁需要2.5s,而单生产者无锁1s以内就可以完成,可见互斥锁的开销使得多线程反而降低了效率。
如对本文有疑问, 点击进行留言回复!!
linux下文本编辑器vim的使用方法(复制、粘贴、替换、行号、撤销、多文件操作)
网友评论