基于RT-Thread的RPMsg-Lite异构多核通信原理分析 | 技术集结

描述

目录


 

说明


 

初识rpmsg-lite


 

rpmsg-lite组件优势


 

rpmsg-lite工程架构


 

rpmsg-lite通信流程(RTOS)


 

rpmsg-lite通信流程(MCMGR)

1 说明

本文档旨在说明如何在RT-Thread(运行于Cortex-M85核)和裸机程序(运行于Cortex-M33核)之间使用rpmsg-lite进行通信,并采用MCMGR组件替代rpmsg-lite原生的队列机制实现双核间同步管理。

硬件平台:Renesas RA8P1( ARM Cortex-M85 & ARM Cortex-M33)

RT-Thread

或复制链接购买:https://item.taobao.com/item.htm?ft=t&id=987791181903


 

软件环境
 

M85核:RT-Thread操作系统

M33核:裸机程序

通信组件:rpmsg-lite + MCMGR

2 初识rpmsg-lite

RPMsg-Lite 是一款轻量级开源异构处理器通信框架,适用于多核系统中的小型微控制器(MCU)。

它支持在非对称多处理(AMP)配置中,例如运行 Linux 的主处理器与运行实时操作系统(如 RT-Thread)的协处理器之间进行高效的消息传递。

RPMsg-Lite 针对资源受限环境设计,相比 OpenAMP 提供更简化的 API,支持零拷贝和静态内存选项,并确保与 RPMsg 协议兼容。通过使用共享内存进行数据交换,要求配置非缓存内存,非常适合卸载 CPU 密集型任务。

3 rpmsg-lite组件优势

1)静态API创建

相比较RPMsg,RPMsg-Lite采用静态API创建方式,相比较动态分配,静态API创建的方式至少减少了5KB的代码体量,也避免了动态内存管理开销。

RT-Thread

2)基于共享内存的数据零拷贝

相比较传统驱动外设来说,都需要经历由应用层->驱动层->硬件缓冲区这么多次拷贝的过程,相对应的也会带来一定的延迟;而RPMsg-Lite则是数据零拷贝的这么一种实现方式,也就是应用层->共享内存方式,仅一次的内存拷贝,将数据交换周期能够最小降低至μs微秒级

(当然这部分依赖于双核硬件平台的IPC机制,即处理器间通信)

RT-Thread

3)硬中断的高效触发

对于传统共享中断来说,都是需要轮询所有关联外设的状态寄存器;而RPMsg-Lite直接使用 platform_notify 来主动触发核间中断通知,同时这个触发时机可以由用户决定,比如说累计n个消息后触发中断,以此来批量处理核间通信。

RT-Thread

4 rpmsg-lite工程架构

rpmsg-lite工程分为两个版本:

RT-Thread + Freertos + RPMsg-Lite:该示例统一使用RPMsg-Lite结合RTOS的消息队列来实现子组件queue

RT-Thread

RT-Thread + Bera metal + RPMsg-Lite + MCMGR:为了方便裸机系统能够实时同步主次核之间的状态,移植了官方的MCMGR组件(multiple core manager)

RT-Thread

5 rpmsg-lite通信流程(RTOS)

需要说明的是,裸机rpmsg-lite与RTOS版的rpmsg-lite的收发逻辑是不一样的,这里分开说明

RT-Thread

这里绘制了一张图简单说明下,首先明确上层留给用户的rpmsg-lite api,最为关键的就是rpmsg-dev的初始化、消息的发送与接收:

1) 初始化

  •  
  •  
  •  
  •  
  •  
  •  
  •  

# masterstruct rpmsg_lite_instance *rpmsg_lite_master_init(void *shmem_addr,                                                   size_t shmem_length,                                                   uint32_t link_id,                                                   uint32_t init_flags)# remotestruct rpmsg_lite_instance *rpmsg_lite_remote_init(void *shmem_addr, uint32_t link_id, uint32_t init_flags)

这一步会分别在master和remote初始化一个 rpmsg-dev 实体,并且声明共享内存起始地址,这个过程中会绑定两个virtualqueue的回调函数(收发关键)

callback[0] = rpmsg_lite_rx_callback;

callback[1] = rpmsg_lite_tx_callback;

  •  
  •  
  •  
  •  

struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev,                                                  uint32_t addr,                                                  rl_ept_rx_cb_t rx_cb,                                                  void *rx_cb_data);

这里最终会传递给 virtqueue 的初始化创建,并作为 callback_fc 参数进行绑定:

  •  
  •  
  •  
  •  
  •  
  •  

int32_t virtqueue_create(uint16_t id,                         constchar *name,                         struct vring_alloc_info *ring,                         void (*callback_fc)(struct virtqueue *vq),                         void (*notify_fc)(struct virtqueue *vq),                         struct virtqueue **v_queue)

与此同时,virtqueue_notify() 函数会作为第五个参数绑定为通知函数,这一步对接到移植层的子 layer:enviroment layer,也就是 platform_notify(),用于触发通知另外一个核心,一般使用核间中断,基于此,就可以触发另一个核心来处理接收数据的逻辑了:

  •  
  •  
  •  
  •  
  •  
  •  

void platform_notify(uint32_t vector_id){    env_lock_mutex(platform_lock);    R_IPC_MessageSend(&g_ipc1_ctrl, (uint32_t)(RL_GET_Q_ID(vector_id)));    env_unlock_mutex(platform_lock);}

注意:

补充一点,为了方便多核心间的通信,这里的virtualqueue默认都是配置为两个,并且双核间的收发virtualqueue都是分别一一对应的:

  •  
  •  
  •  

       Master            Remotevqx[0]:tx_vq     ->        rx_vqvqx[0]:rx_vq     ->        tx_vq

2) 发送数据

首先来看下 rpmsg-lite 的发送数据API:

  •  
  •  
  •  
  •  
  •  
  •  

int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev,                        struct rpmsg_lite_endpoint *ept,                        uint32_t dst,                        char *data,                        uint32_t size,                        uintptr_t timeout)

这里的data是我们传输过来的数据,那么后面就是这么几个步骤:

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

# 1.从共享内存申请buffer(vq_tx_master)buffer = rpmsg_lite_dev->vq_ops->vq_tx_alloc(rpmsg_lite_dev->tvq, &buff_len, &idx);# 2.将共享地址传递给rpmsg_msg,并对该变量传递实际数据rpmsg_msg = (struct rpmsg_std_msg *)buffer;env_memcpy(rpmsg_msg->data, data, size);# 3.将前面申请的这段buffer标记为可用模式,并加入到vq_ring.avail中,一般这里应该是用于缓存下的多核通信/* Enqueue buffer on virtqueue. */rpmsg_lite_dev->vq_ops->vq_tx(rpmsg_lite_dev->tvq, buffer, buff_len, idx);

那么到这里,我们想要传递的数据就已经放进去 virtualqueue了,那么接下来就是再发送一个通知给另外一个核心,通知它从共享队列中读取数据。对应的流程如下:

  •  
  •  
  •  
  •  
  •  
  •  

>>> rpmsg_lite_send    -> rpmsg_lite_format_message        -> virtqueue_kick            -> vq_ring_notify_host                -> virtqueue_notify                    -> platform_notify

3) 接收数据

首先明确接收数据的api,在RTOS下我们使用的是 rpmsg_queue_recv():

  •  
  •  
  •  
  •  
  •  
  •  
  •  

int32_t rpmsg_queue_recv(struct rpmsg_lite_instance *rpmsg_lite_dev,                         rpmsg_queue_handle q,                         uint32_t *src,                         char *data,                         uint32_t maxlen,                         uint32_t *len,                         uintptr_t timeout)

深入进去就是需要从消息队列中拿出数据

可以观察上图中红色箭头所指示的流程,在接收到来自另一个核心的IPC通知后,会触发进入 env_isr:

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

void g_ipc0_callback(ipc_callback_args_t *p_args){    rt_interrupt_enter();    /* Check for message received event */    if (IPC_EVENT_MESSAGE_RECEIVED  & p_args->event)    {        env_isr(p_args->message);    }    rt_interrupt_leave();}

此处会告知本核心的virtualqueue,并触发对应的回调函数(接收为rpmsg_lite_rx_callback),在这个回调函数中,会执行到一个 ept->rx_cb,这里也就是我们在初始化rpmsg-lite时绑定的endpoint回调函数,即 rpmsg_queue_rx_cb:

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

my_ept    = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, rpmsg_queue_rx_cb, my_queue);int32_trpmsg_queue_rx_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv){    rpmsg_queue_rx_cb_data_t msg;    RL_ASSERT(priv != RL_NULL);    msg.data = payload;    msg.len  = payload_len;    msg.src  = src;    /* if message is successfully added into queue then hold rpmsg buffer */    if (0 != env_put_queue(priv, &msg, 0))    {        /* hold the rx buffer */        return RL_HOLD;    }    return RL_RELEASE;}

接着就是激活 rpmsg-lite 的消息队列线程,进而去处理消息。

6 rpmsg-lite通信流程(MCMGR)

MCMGR组件开源地址:https://github.com/nxp-mcuxpresso/mcux-mcmgr

1)mcmgr双核间同步

  •  
  •  
  •  
  •  
  •  

# M85# 首先注册一个event回调函数,实时检测来自次核的通知,也就是 APP_RPMSG_READY_EVENT_DATA 和 APP_RPMSG_EP_READY_EVENT_DATA事件static void RPMsgRemoteReadyEventHandler(uint16_t eventData, void *context){    uint16_t *data = (uint16_t *)context;    *data = eventData;}/* Register the application event before starting the secondary core */    (void)MCMGR_RegisterEvent(kMCMGR_RemoteApplicationEvent, RPMsgRemoteReadyEventHandler,                              (void *)&RPMsgRemoteReadyEventData);    while (APP_RPMSG_READY_EVENT_DATA != RPMsgRemoteReadyEventData)    {    };    /* Wait until the secondary core application signals the rpmsg remote endpoint has been created. */    while (APP_RPMSG_EP_READY_EVENT_DATA != RPMsgRemoteReadyEventData)    {    }--------------------------------------------------------------------------------------------------------------------------------# M33# 通过MCMGR_TriggerEvent触发IPC中断,发送通知给主核    /* Signal the other core we are ready by triggering the event and passing the APP_RPMSG_READY_EVENT_DATA */    (void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_READY_EVENT_DATA);    /* Signal the other core the endpoint has been created by triggering the event and passing the     * APP_RPMSG_READY_EP_EVENT_DATA */    (void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_EP_READY_EVENT_DATA);

2)mcmgr双核间通信

<1>接收

接收逻辑关键取决于下面这个API:

  •  
  •  
  •  
  •  
  •  

struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev,                                                  uint32_t addr,                                                  rl_ept_rx_cb_t rx_cb,                                                  void *rx_cb_data,                                                  struct rpmsg_lite_ept_static_context *ept_context)

通过绑定的回调函数rx_cb,再由mcmgr多核管理器统一注册事件,并根据平台层提供的platform_notify机制触发env_isr,并触发到mcmgr_ipc_callback

来执行接收回调rx_cb,而这个触发由rpmsg_lite_send 提供触发动作(virtqueue_kick)

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

# 示例代码static int32_t my_ept_read_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv){    int32_t *has_received = priv;    if (payload_len <= sizeof(THE_MESSAGE))    {        (void)rt_memcpy((void *)&msg, payload, payload_len);        *has_received = 1;    }    (void)rt_kprintf("Primary core received a msg\r\n");    (void)rt_kprintf("Message: Size=%x, DATA = %i\r\n", payload_len, msg.DATA);    return RL_RELEASE;}my_ept = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, my_ept_read_cb, (void *)&has_received, &my_ept_context);

所以流程为:

  •  
  •  
  •  
  •  
  •  
  •  

mcmgr_ipc_callback    ->mcmgr_event_handler        ->env_isr            ->virtqueue_notification                ->rpmsg_lite_rx_callback(vq->callback_fc)                    ->my_ept_read_cb(ept->rx_cb)

<2>发送

发送逻辑关键取决于下面这个API:

  •  
  •  
  •  
  •  
  •  
  •  

int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev,                        struct rpmsg_lite_endpoint *ept,                        uint32_t dst,                        char *data,                        uint32_t size,                        uintptr_t timeout)

流程为

  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  
  •  

(void)rpmsg_lite_send(my_rpmsg, my_ept, REMOTE_EPT_ADDR, (char *)&msg, sizeof(THE_MESSAGE), RL_DONT_BLOCK);return rpmsg_lite_format_message(rpmsg_lite_dev, ept->addr, dst, data, size, RL_NO_FLAGS, timeout);# 这里的data是我们传输过来的数据,那么后面就是这么几个步骤:# 1.从共享内存申请buffer(vq_tx_master)buffer = rpmsg_lite_dev->vq_ops->vq_tx_alloc(rpmsg_lite_dev->tvq, &buff_len, &idx);# 2.将共享地址传递给rpmsg_msg,并对该变量传递实际数据rpmsg_msg = (struct rpmsg_std_msg *)buffer;env_memcpy(rpmsg_msg->data, data, size);# 3.将前面申请的这段buffer标记为可用模式,并加入到vq_ring.avail中,一般这里应该是用于缓存下的多核通信/* Enqueue buffer on virtqueue. */rpmsg_lite_dev->vq_ops->vq_tx(rpmsg_lite_dev->tvq, buffer, buff_len, idx);# 最后执行通知机制virtqueue_kick    ->vq_ring_notify_host(vq->notify_fc(vq))        ->platform_nofity            -> (void)MCMGR_TriggerEventForce(kMCMGR_RemoteRPMsgEvent, (uint16_t)RL_GET_Q_ID(vector_id));                ->trig ipc callback                    ->mcmgr_ipc_callback

 

RT-Thread Github 开源仓库,欢迎撒个星(Star)支持,更期待你的代码贡献: https://github.com/RT-Thread/rt-thread

获取硬件

 

RT-Thread 与瑞萨电子联合推出 RA8P1 Titan Board,基于 1GHz Arm Cortex-M85 + 250MHz Cortex-M33 双核架构,集成 Ethos-U55 NPU ,实现 256 GOPS 的 AI 性能、超过 7300 CoreMarks 的突破性 CPU 性能和先进的人工智能 (AI) 功能,可支持语音、视觉和实时分析 AI 场景!

 

 

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分