
作者|donatello1996
来源 | 电子发烧友
题图|飞凌嵌入式
rte_ring是一个用CAS实现的无锁FIFO环形队列,支持多消费者/生产者同时出入队列,常用于多线程/多进程之间的通信。具体原理可以查看DPDK官方文档或者阅读源码,本文采用的硬件板卡为飞凌嵌入式OKMX8MP-C开发板,系统版本Linux5.4.70+Qt5.15.0,主要介绍通过编译DPDK源码实现rte_ring无锁环队列进程间通信。
下面就跟着小编一起了解下。
一、内核编译
下载并解压飞凌厂商提供的iMX8MP内核源码压缩包分卷:

在虚拟机中合并压缩分卷并解压得出内核源码包文件夹OK8MP-linux-kernel,将文件夹使用tar打包并复制到TF卡文件系统中解压:

找到内核源码中的配置文件OK8MP-C_defconfig:

这个就是make选项,使用
make OK8MP-C_defconfig
指令即可配置编译选项:
make -j4
开始编译:
注意开始编译前需要安装常用软件:
apt install bison bc flex


增量编译完毕:

二、DPDK编译
接下来就可以下载DPDK并运行rte_ring无锁环队列Demo应用,需要从
https://www.dpdk.org/
官网中下载DPDK 19.11.10 (LTS)长期支持版本:

在根目录下的mk/文件夹下找到名为rte_vars.mk设置文件,找到环境变量RTE_KERNELDIR,修改为上述的内核源码路径:


RTE_KERNELDIR ?= /home/OK8MP-linux-kernel/
进入usertools文件夹,找到dpdk-setup.sh脚本并运行

选择8,ARM64-armv8a-linuxapp-gcc,

这个选项会使dpdk的gcc交叉编译链生成适用于armv8a处理器的外部库,外部库中有kmod和lib等ko文件和so文件,是用于第三方程序开发和运行的:


使用指令
insmod /home/dpdk-stable-19.11.10/arm64-armv8a-linuxapp-gcc/kmod/igb_uio.ko
左右滑动查看完整代码加载igb_uio.ko驱动文件,这是进行dpdk开发必备的步骤:

然后是使用dpdk-devbind.py脚本手动进行hugepage大页内存绑定,此处为numa方式:

此举会将/mnt/huge文件mount成hugepage映射文件,并实实在在地占用内存空间:


三、rte_ring无锁环队列
准备工作完成,我们接下来可以进行rte_ring无锁环队列Demo代码的编写,但是在编写之前,需要对无锁环队列有一个基本的认识:https://blog.csdn.net/chen98765432101/article/details/69367633
无论是dpdk第三方开发的rte_ring还是Linux内核中本就存在的无锁环队列,其基本原理类似,在一条分配好的队列型内存空间中,读写方式为FIFO(先进先出),读和写的动作分别有两个进程或两个线程进行,写进程不断往地址自增的内存位置写入数据,读进程不断读取地址自增的内存位置的数据,当写位置的内存地址已为队列中内存的最高值时,需要释放队列中内存地址最低值的空间供写进程继续写,方式仍与上一周期相同(不断往地址自增的内存位置写入数据),释放过程需要保证对末尾内存地址空间的锁定与解锁,避免读写过程出错。而不同的是,Linux内核中的无锁环队列,地址管理和读写控制均由内核进行,而dpdk的rte_ring则由dpdk内部的控制器进行,因为dpdk这一模块完整地接管了所分配内存空间的管理权,是直接绕过Linux内核进行管理的,内核也无权访问dpdk控制器的具体管理细节。




编写无锁环队列两个进程的Demo,先写Primary进程:
左右滑动查看完整代码static const char *_MSG_POOL = "MSG_POOL";static const char *_SEC_2_PRI = "SEC_2_PRI";static const char *_PRI_2_SEC = "PRI_2_SEC";static const char *_PRI_2_THI = "PRI_2_THI";struct rte_ring *send_ring, *recv_ring , *send_ring_third;struct rte_mempool *message_pool;volatile int quit = 0;static void * lcore_recv(void *arg){unsigned lcore_id = rte_lcore_id();printf("Starting core %u\n", lcore_id);while (!quit){void *msg;if (rte_ring_dequeue(recv_ring, &msg) < 0){usleep(5);continue;}printf("lcore_id = %d Received '%s'\n" , lcore_id , (char *)msg);rte_mempool_put(message_pool , msg);}return 0;}int string_size = 100;int elt_size = 128;pthread_t id1;int main(int argc, char **argv){const unsigned flags = 0;const unsigned ring_size = 64;const unsigned pool_size = 1024;const unsigned pool_cache = 32;const unsigned priv_data_sz = 0;int ret;unsigned lcore_id;ret = rte_eal_init(argc, argv);if (ret < 0)rte_exit(EXIT_FAILURE, "Cannot init EAL\n");send_ring = rte_ring_create(_PRI_2_SEC, ring_size, rte_socket_id(), flags);recv_ring = rte_ring_create(_SEC_2_PRI, ring_size, rte_socket_id(), flags);send_ring_third = rte_ring_create(_PRI_2_THI, ring_size, rte_socket_id(), flags);message_pool = rte_mempool_create(_MSG_POOL, pool_size,elt_size, pool_cache, priv_data_sz,NULL, NULL, NULL, NULL,rte_socket_id(), flags);if (send_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");if (recv_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");if (send_ring_third == NULL)rte_exit(EXIT_FAILURE, "Problem getting send_ring_third\n");if (message_pool == NULL)rte_exit(EXIT_FAILURE, "Problem getting message pool\n");pthread_create(&id1 , NULL , lcore_recv , NULL);while(1){void *msg = NULL;if (rte_mempool_get(message_pool, &msg) < 0)continue;snprintf((char *)msg, string_size, "%s", "primary to secondary");if (rte_ring_enqueue(send_ring , msg) < 0){rte_mempool_put(message_pool, msg);}if (rte_mempool_get(message_pool, &msg) < 0)continue;snprintf((char *)msg, string_size, "%s", "primary to third");if (rte_ring_enqueue(send_ring_third , msg) < 0){rte_mempool_put(message_pool, msg);}sleep(1);}return 0;}
注意在Makefile文件里面要关闭WERROR相关编译选项:
左右滑动查看完整代码# BSD LICENSE## Copyright(c) 2010-2014 Intel Corporation. All rights reserved.# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions# are met:## * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.# * Redistributions in binary form must reproduce the above copyright# notice, this list of conditions and the following disclaimer in# the documentation and/or other materials provided with the# distribution.# * Neither the name of Intel Corporation nor the names of its# contributors may be used to endorse or promote products derived# from this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.ifeq ($(RTE_SDK),)$(error "Please define RTE_SDK environment variable")endif# Default target, can be overridden by command line or environmentRTE_TARGET ?= arm64-armv8a-linuxapp-gccinclude $(RTE_SDK)/mk/rte.vars.mk# binary nameAPP = rte_ring_primary# all source are stored in SRCS-ySRCS-y := main.cCFLAGS += -O0CFLAGS +=include $(RTE_SDK)/mk/rte.extapp.mk
Secondary进程:
static const char *_MSG_POOL = "MSG_POOL";static const char *_SEC_2_PRI = "SEC_2_PRI";static const char *_PRI_2_SEC = "PRI_2_SEC";struct rte_ring *send_ring, *recv_ring;struct rte_mempool *message_pool;volatile int quit = 0;int string_size = 100;static int lcore_send(__attribute__((unused)) void *arg){unsigned lcore_id = rte_lcore_id();while(1){void *msg = NULL;if (rte_mempool_get(message_pool, &msg) < 0)continue;snprintf((char *)msg , string_size , "%s", "secondary to primary");if (rte_ring_enqueue(send_ring , msg) < 0){rte_mempool_put(message_pool, msg);}sleep(1);}return 0;}pthread_t id1;int main(int argc, char **argv){const unsigned flags = 0;const unsigned ring_size = 64;const unsigned pool_size = 1024;const unsigned pool_cache = 32;const unsigned priv_data_sz = 0;int ret;unsigned lcore_id;ret = rte_eal_init(argc, argv);if (ret < 0)rte_exit(EXIT_FAILURE, "Cannot init EAL\n");recv_ring = rte_ring_lookup(_PRI_2_SEC);send_ring = rte_ring_lookup(_SEC_2_PRI);message_pool = rte_mempool_lookup(_MSG_POOL);if (send_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting sending ring\n");if (recv_ring == NULL)rte_exit(EXIT_FAILURE, "Problem getting receiving ring\n");if (message_pool == NULL)rte_exit(EXIT_FAILURE, "Problem getting message pool\n");pthread_create(&id1 , NULL , lcore_send , NULL);while (1){lcore_id = rte_lcore_id();void * msg = NULL;if (rte_ring_dequeue(recv_ring, &msg) < 0){usleep(5);continue;}printf("lcore_id = %d Received: %s\n" , lcore_id , (char *)msg);rte_mempool_put(message_pool, msg);}return 0;}
左右滑动查看完整代码
同样在Makefile文件里面要关闭WERROR相关编译选项:
左右滑动查看完整代码# BSD LICENSE## Copyright(c) 2010-2014 Intel Corporation. All rights reserved.# All rights reserved.## Redistribution and use in source and binary forms, with or without# modification, are permitted provided that the following conditions# are met:## * Redistributions of source code must retain the above copyright# notice, this list of conditions and the following disclaimer.# * Redistributions in binary form must reproduce the above copyright# notice, this list of conditions and the following disclaimer in# the documentation and/or other materials provided with the# distribution.# * Neither the name of Intel Corporation nor the names of its# contributors may be used to endorse or promote products derived# from this software without specific prior written permission.## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.ifeq ($(RTE_SDK),)$(error "Please define RTE_SDK environment variable")endif# Default target, can be overridden by command line or environmentRTE_TARGET ?= arm64-armv8a-linuxapp-gccinclude $(RTE_SDK)/mk/rte.vars.mk# binary nameAPP = rte_ring_secondary# all source are stored in SRCS-ySRCS-y := main.cCFLAGS += -O3CFLAGS += $()include $(RTE_SDK)/mk/rte.extapp.mk
运行,这里说一下,基于rte_ring的进程间通信,Secondary进程最好是使用auto类型:
./rte_ring_primary --proc-type primary./rte_ring_secondary --proc-type auto
运行效果:


作者简介
donatello1996,某大型企业资深嵌入式工程师,电子发烧友论坛技术大牛,同时也是飞凌嵌入式多年铁粉,曾基于飞凌多款板卡产出过优质测评文章或使用心得。本期三篇文章为donatello1996在使用OKMX8MP-C开发板过程中精心产出的干货,在此对donatello1996表示感谢。
全部0条评论
快来发表一下你的评论吧 !