消息队列应用于线程间通信的简单例子

描述

大家好,我是LinuxZn。

在应用开发中,生产者,消费者的模型非常常见,一方产生数据并把数据放入队列中,而另一方从队列中取数据,先进先出。

应用:线程间通信/进程间通信。Hello系列 | 多线程编程基础!

Linux系统中提供了两种不同接口的消息队列:

POSIX消息队列。POSIX为可移植的操作系统接口。

System V消息队列。System V 是 AT&T 的第一个商业UNIX版本(UNIX System III)的加强。

其中,POSIX消息队列可移植性较强,使用较广。

Linux系统中提供的消息队列一般应用于进行间通信,但也可以用于线程间通信。

本文介绍POSIX消息队列应用于线程间通信。

头文件:

 

#include            /* For O_* constants */
#include         /* For mode constants */
#include 

 

编译链接需要加上 -lr 链接。

Linux内核提供了一系列函数来使用消息队列:

 

/**
 * @brief 创建消息队列实例
 *
 * Detailed function description
 *
 * @param[in] name: 消息队列名称
 * @param[in] oflag:根据传入标识来创建或者打开一个已创建的消息队列
                    - O_CREAT: 创建一个消息队列
                    - O_EXCL: 检查消息队列是否存在,一般与O_CREAT一起使用
                    - O_CREAT|O_EXCL: 消息队列不存在则创建,已存在返回NULL
                    - O_NONBLOCK: 非阻塞模式打开,消息队列不存在返回NULL
                    - O_RDONLY: 只读模式打开
                    - O_WRONLY: 只写模式打开
                    - O_RDWR: 读写模式打开
 * @param[in] mode:访问权限
 * @param[in] attr:消息队列属性地址
 *
 * @return 成功返回消息队列描述符,失败返回-1,错误码存于error中
 */
mqd_t mq_open(const char *name, int oflag,  mode_t mode, struct mq_attr *attr);

/**
 * @brief 无限阻塞方式接收消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:消息体缓冲区地址
 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
 * @param[in] msg_prio:消息优先级
 *
 * @return 成功返回消息长度,失败返回-1,错误码存于error中
 */
mqd_t mq_receive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio);

/**
 * @brief 指定超时时间阻塞方式接收消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:消息体缓冲区地址
 * @param[in] msg_len:消息体长度,长度必须大于等于消息属性设定的最大值
 * @param[in] msg_prio:消息优先级
 * @param[in] abs_timeout:超时时间
 *
 * @return 成功返回消息长度,失败返回-1,错误码存于error中
 */
mqd_t mq_timedreceive(mqd_t mqdes, char *msg_ptr, size_t msg_len, unsigned *msg_prio, const struct timespec *abs_timeout);

/**
 * @brief 无限阻塞方式发送消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:待发送消息体缓冲区地址
 * @param[in] msg_len:消息体长度
 * @param[in] msg_prio:消息优先级
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_send(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio);

/**
 * @brief 指定超时时间阻塞方式发送消息
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 * @param[in] msg_ptr:待发送消息体缓冲区地址
 * @param[in] msg_len:消息体长度
 * @param[in] msg_prio:消息优先级
 * @param[in] abs_timeout:超时时间
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_timedsend(mqd_t mqdes, const char *msg_ptr, size_t msg_len, unsigned msg_prio, const struct timespec *abs_timeout);

/**
 * @brief 关闭消息队列
 *
 * Detailed function description
 *
 * @param[in] mqdes: 消息队列描述符
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_close(mqd_t mqdes);

/**
 * @brief 分离消息队列
 *
 * Detailed function description
 *
 * @param[in] name: 消息队列名称
 *
 * @return 成功返回0,失败返回-1
 */
mqd_t mq_unlink(const char *name);

 

例子:线程1不断给线程2发送字符串数据。

 

#include 
#include 
#include 
#include 
#include 
#include            /* For O_* constants */
#include         /* For mode constants */
#include 

#define MQ_MSG_MAX_SIZE    512  ///< 最大消息长度 
#define MQ_MSG_MAX_ITEM    5  ///< 最大消息数目

static pthread_t s_thread1_id;
static pthread_t s_thread2_id;
static unsigned char s_thread1_running = 0;
static unsigned char s_thread2_running = 0;

static mqd_t s_mq;
static char send_msg[10] = "hello";

void *thread1_fun(void *arg)
{
    int ret = 0;

    s_thread1_running = 1;
    while (s_thread1_running)  
    {
  ret = mq_send(s_mq, send_msg, sizeof(send_msg), 0);
  if (ret < 0)
  {
         perror("mq_send error");
  }
        printf("send msg = %s
", send_msg);
        usleep(100 * 1000);
    }
    
    pthread_exit(NULL);
}

void *thread2_fun(void *arg)
{
 char  buf[MQ_MSG_MAX_SIZE];
 int recv_size = 0;

    s_thread2_running = 1;
    while (s_thread2_running)
    {
  recv_size = mq_receive(s_mq, &buf[0], sizeof(buf), NULL);
  if (-1 != recv_size)
  {
   printf("receive msg = %s
", buf);
  }
  else
  {
   perror("mq_receive error");
   break;
  }

        usleep(100 * 1000);
    }
    
    pthread_exit(NULL);
}

int main(void)
{
    int ret = 0;
    struct mq_attr attr;

    ///< 创建消息队列
    memset(&attr, 0, sizeof(attr));
    attr.mq_maxmsg = MQ_MSG_MAX_ITEM;
    attr.mq_msgsize = MQ_MSG_MAX_SIZE;
    attr.mq_flags = 0;
    s_mq = mq_open("/mq", O_CREAT|O_RDWR, 0777, &attr);
 if(-1 == s_mq)
    {
        perror("mq_open error");
        return -1;
    }

    ///< 创建线程1
    ret = pthread_create(&s_thread1_id, NULL, thread1_fun, NULL);
    if (ret != 0)
    {
        printf("thread1_create error!
");
        exit(EXIT_FAILURE);
    }
    ret = pthread_detach(s_thread1_id);
    if (ret != 0)
    {
        printf("s_thread1_id error!
");
        exit(EXIT_FAILURE);
    }

    ///< 创建线程2
    ret = pthread_create(&s_thread2_id, NULL, thread2_fun, NULL);
    if (ret != 0)
    {
        printf("thread2_create error!
");
        exit(EXIT_FAILURE);
    }
    ret = pthread_detach(s_thread2_id);
    if (ret != 0)
    {
        printf("s_thread2_id error!
");
        exit(EXIT_FAILURE);
    }

    while (1)
    {
        sleep(1);
    }

    return 0;
}

 

编译、运行:

通信

以上就是本次的分享,如果文章有帮助,麻烦帮忙转发,谢谢!

  审核编辑:汤梓红

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分