大家好,我是LinuxZn。
在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。
应用:线程间通信/进程间通信。Hello系列 | 多线程编程基础!
Linux系统中提供了两种不同接口的消息队列:
POSIX消息队列。POSIX为可移植的操作系统接口。
System V消息队列。System V 是 AT&T 的第一个商业UNIX版本(UNIX System III)的加强。
其中,POSIX消息队列可移植性较强,使用较广。
Linux系统中提供的消息队列一般应用于进行间通信,但也可以用于线程间通信。
本文介绍POSIX消息队列应用于线程间通信。
头文件:
#include/* For O_* constants */ #include /* For mode constants */ #include
编译链接需要加上 -lr 链接。
Linux内核提供了一系列函数来使用消息队列:
/** * @brief 创建消息队列实例 * * Detailed function description * * @param[in] name: 消息队列名称 * @param[in] oflag:根据传入标识来创建或者打开一个已创建的消息队列 - O_CREAT: 创建一个消息队列 - O_EXCL: 检查消息队列是否存在,一般与O_CREAT一起使用 - O_CREAT|O_EXCL: 消息队列不存在则创建,已存在返回NULL - O_NONBLOCK: 非阻塞模式打开,消息队列不存在返回NULL - O_RDONLY: 只读模式打开 - O_WRONLY: 只写模式打开 - O_RDWR: 读写模式打开 * @param[in] mode:访问权限 * @param[in] attr:消息队列属性地址 * * @return 成功返回消息队列描述符,失败返回-1,错误码存于error中 */ mqd_t mq_open(const char *name, int oflag, mode_t mode, struct mq_attr *attr); /** * @brief 无限阻塞方式接收消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:消息体缓冲区地址 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值 * @param[in] msg_prio:消息优先级 * * @return 成功返回消息长度,失败返回-1,错误码存于error中 */ mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio); /** * @brief 指定超时时间阻塞方式接收消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:消息体缓冲区地址 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值 * @param[in] msg_prio:消息优先级 * @param[in] abs_timeout:超时时间 * * @return 成功返回消息长度,失败返回-1,错误码存于error中 */ mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout); /** * @brief 无限阻塞方式发送消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:待发送消息体缓冲区地址 * @param[in] msg_len:消息体长度 * @param[in] msg_prio:消息优先级 * * @return 成功返回0,失败返回-1 */ mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio); /** * @brief 指定超时时间阻塞方式发送消息 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * @param[in] msg_ptr:待发送消息体缓冲区地址 * @param[in] msg_len:消息体长度 * @param[in] msg_prio:消息优先级 * @param[in] abs_timeout:超时时间 * * @return 成功返回0,失败返回-1 */ mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout); /** * @brief 关闭消息队列 * * Detailed function description * * @param[in] mqdes: 消息队列描述符 * * @return 成功返回0,失败返回-1 */ mqd_t mq_close(mqd_t mqdes); /** * @brief 分离消息队列 * * Detailed function description * * @param[in] name: 消息队列名称 * * @return 成功返回0,失败返回-1 */ mqd_t mq_unlink(const char *name);
例子:线程1不断给线程2发送字符串数据。
#include#include #include #include #include #include /* For O_* constants */ #include /* For mode constants */ #include #define MQ_MSG_MAX_SIZE 512 ///< 最大消息长度 #define MQ_MSG_MAX_ITEM 5 ///< 最大消息数目 static pthread_t s_thread1_id; static pthread_t s_thread2_id; static unsigned char s_thread1_running = 0; static unsigned char s_thread2_running = 0; static mqd_t s_mq; static char send_msg[10] = "hello"; void *thread1_fun(void *arg) { int ret = 0; s_thread1_running = 1; while (s_thread1_running) { ret = mq_send(s_mq, send_msg, sizeof(send_msg), 0); if (ret < 0) { perror("mq_send error"); } printf("send msg = %s ", send_msg); usleep(100 * 1000); } pthread_exit(NULL); } void *thread2_fun(void *arg) { char buf[MQ_MSG_MAX_SIZE]; int recv_size = 0; s_thread2_running = 1; while (s_thread2_running) { recv_size = mq_receive(s_mq, &buf[0], sizeof(buf), NULL); if (-1 != recv_size) { printf("receive msg = %s ", buf); } else { perror("mq_receive error"); break; } usleep(100 * 1000); } pthread_exit(NULL); } int main(void) { int ret = 0; struct mq_attr attr; ///< 创建消息队列 memset(&attr, 0, sizeof(attr)); attr.mq_maxmsg = MQ_MSG_MAX_ITEM; attr.mq_msgsize = MQ_MSG_MAX_SIZE; attr.mq_flags = 0; s_mq = mq_open("/mq", O_CREAT|O_RDWR, 0777, &attr); if(-1 == s_mq) { perror("mq_open error"); return -1; } ///< 创建线程1 ret = pthread_create(&s_thread1_id, NULL, thread1_fun, NULL); if (ret != 0) { printf("thread1_create error! "); exit(EXIT_FAILURE); } ret = pthread_detach(s_thread1_id); if (ret != 0) { printf("s_thread1_id error! "); exit(EXIT_FAILURE); } ///< 创建线程2 ret = pthread_create(&s_thread2_id, NULL, thread2_fun, NULL); if (ret != 0) { printf("thread2_create error! "); exit(EXIT_FAILURE); } ret = pthread_detach(s_thread2_id); if (ret != 0) { printf("s_thread2_id error! "); exit(EXIT_FAILURE); } while (1) { sleep(1); } return 0; }
编译、运行:
以上就是本次的分享,如果文章有帮助,麻烦帮忙转发,谢谢!
审核编辑:汤梓红
全部0条评论
快来发表一下你的评论吧 !