目录
说明
初识rpmsg-lite
rpmsg-lite组件优势
rpmsg-lite工程架构
rpmsg-lite通信流程(RTOS)
rpmsg-lite通信流程(MCMGR)
1 说明
本文档旨在说明如何在RT-Thread(运行于Cortex-M85核)和裸机程序(运行于Cortex-M33核)之间使用rpmsg-lite进行通信,并采用MCMGR组件替代rpmsg-lite原生的队列机制实现双核间同步管理。
硬件平台:Renesas RA8P1( ARM Cortex-M85 & ARM Cortex-M33)

或复制链接购买:https://item.taobao.com/item.htm?ft=t&id=987791181903
软件环境
M85核:RT-Thread操作系统
M33核:裸机程序
通信组件:rpmsg-lite + MCMGR
2 初识rpmsg-lite
RPMsg-Lite 是一款轻量级开源异构处理器通信框架,适用于多核系统中的小型微控制器(MCU)。
它支持在非对称多处理(AMP)配置中,例如运行 Linux 的主处理器与运行实时操作系统(如 RT-Thread)的协处理器之间进行高效的消息传递。
RPMsg-Lite 针对资源受限环境设计,相比 OpenAMP 提供更简化的 API,支持零拷贝和静态内存选项,并确保与 RPMsg 协议兼容。通过使用共享内存进行数据交换,要求配置非缓存内存,非常适合卸载 CPU 密集型任务。
3 rpmsg-lite组件优势
1)静态API创建
相比较RPMsg,RPMsg-Lite采用静态API创建方式,相比较动态分配,静态API创建的方式至少减少了5KB的代码体量,也避免了动态内存管理开销。

2)基于共享内存的数据零拷贝
相比较传统驱动外设来说,都需要经历由应用层->驱动层->硬件缓冲区这么多次拷贝的过程,相对应的也会带来一定的延迟;而RPMsg-Lite则是数据零拷贝的这么一种实现方式,也就是应用层->共享内存方式,仅一次的内存拷贝,将数据交换周期能够最小降低至μs微秒级
(当然这部分依赖于双核硬件平台的IPC机制,即处理器间通信)

3)硬中断的高效触发
对于传统共享中断来说,都是需要轮询所有关联外设的状态寄存器;而RPMsg-Lite直接使用 platform_notify 来主动触发核间中断通知,同时这个触发时机可以由用户决定,比如说累计n个消息后触发中断,以此来批量处理核间通信。

4 rpmsg-lite工程架构
rpmsg-lite工程分为两个版本:
RT-Thread + Freertos + RPMsg-Lite:该示例统一使用RPMsg-Lite结合RTOS的消息队列来实现子组件queue

RT-Thread + Bera metal + RPMsg-Lite + MCMGR:为了方便裸机系统能够实时同步主次核之间的状态,移植了官方的MCMGR组件(multiple core manager)

5 rpmsg-lite通信流程(RTOS)
需要说明的是,裸机rpmsg-lite与RTOS版的rpmsg-lite的收发逻辑是不一样的,这里分开说明

这里绘制了一张图简单说明下,首先明确上层留给用户的rpmsg-lite api,最为关键的就是rpmsg-dev的初始化、消息的发送与接收:
1) 初始化
# masterstruct rpmsg_lite_instance *rpmsg_lite_master_init(void *shmem_addr, size_t shmem_length, uint32_t link_id, uint32_t init_flags)# remotestruct rpmsg_lite_instance *rpmsg_lite_remote_init(void *shmem_addr, uint32_t link_id, uint32_t init_flags)
这一步会分别在master和remote初始化一个 rpmsg-dev 实体,并且声明共享内存起始地址,这个过程中会绑定两个virtualqueue的回调函数(收发关键)
callback[0] = rpmsg_lite_rx_callback;
callback[1] = rpmsg_lite_tx_callback;
struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev, uint32_t addr, rl_ept_rx_cb_t rx_cb, void *rx_cb_data);
这里最终会传递给 virtqueue 的初始化创建,并作为 callback_fc 参数进行绑定:
int32_t virtqueue_create(uint16_t id, constchar *name, struct vring_alloc_info *ring, void (*callback_fc)(struct virtqueue *vq), void (*notify_fc)(struct virtqueue *vq), struct virtqueue **v_queue)
与此同时,virtqueue_notify() 函数会作为第五个参数绑定为通知函数,这一步对接到移植层的子 layer:enviroment layer,也就是 platform_notify(),用于触发通知另外一个核心,一般使用核间中断,基于此,就可以触发另一个核心来处理接收数据的逻辑了:
void platform_notify(uint32_t vector_id){ env_lock_mutex(platform_lock); R_IPC_MessageSend(&g_ipc1_ctrl, (uint32_t)(RL_GET_Q_ID(vector_id))); env_unlock_mutex(platform_lock);}
注意:
补充一点,为了方便多核心间的通信,这里的virtualqueue默认都是配置为两个,并且双核间的收发virtualqueue都是分别一一对应的:
Master Remotevqx[0]:tx_vq -> rx_vqvqx[0]:rx_vq -> tx_vq
2) 发送数据
首先来看下 rpmsg-lite 的发送数据API:
int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev, struct rpmsg_lite_endpoint *ept, uint32_t dst, char *data, uint32_t size, uintptr_t timeout)
这里的data是我们传输过来的数据,那么后面就是这么几个步骤:
# 1.从共享内存申请buffer(vq_tx_master)buffer = rpmsg_lite_dev->vq_ops->vq_tx_alloc(rpmsg_lite_dev->tvq, &buff_len, &idx);# 2.将共享地址传递给rpmsg_msg,并对该变量传递实际数据rpmsg_msg = (struct rpmsg_std_msg *)buffer;env_memcpy(rpmsg_msg->data, data, size);# 3.将前面申请的这段buffer标记为可用模式,并加入到vq_ring.avail中,一般这里应该是用于缓存下的多核通信/* Enqueue buffer on virtqueue. */rpmsg_lite_dev->vq_ops->vq_tx(rpmsg_lite_dev->tvq, buffer, buff_len, idx);
那么到这里,我们想要传递的数据就已经放进去 virtualqueue了,那么接下来就是再发送一个通知给另外一个核心,通知它从共享队列中读取数据。对应的流程如下:
>>> rpmsg_lite_send -> rpmsg_lite_format_message -> virtqueue_kick -> vq_ring_notify_host -> virtqueue_notify -> platform_notify
3) 接收数据
首先明确接收数据的api,在RTOS下我们使用的是 rpmsg_queue_recv():
int32_t rpmsg_queue_recv(struct rpmsg_lite_instance *rpmsg_lite_dev, rpmsg_queue_handle q, uint32_t *src, char *data, uint32_t maxlen, uint32_t *len, uintptr_t timeout)
深入进去就是需要从消息队列中拿出数据
可以观察上图中红色箭头所指示的流程,在接收到来自另一个核心的IPC通知后,会触发进入 env_isr:
void g_ipc0_callback(ipc_callback_args_t *p_args){ rt_interrupt_enter(); /* Check for message received event */ if (IPC_EVENT_MESSAGE_RECEIVED & p_args->event) { env_isr(p_args->message); } rt_interrupt_leave();}
此处会告知本核心的virtualqueue,并触发对应的回调函数(接收为rpmsg_lite_rx_callback),在这个回调函数中,会执行到一个 ept->rx_cb,这里也就是我们在初始化rpmsg-lite时绑定的endpoint回调函数,即 rpmsg_queue_rx_cb:
my_ept = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, rpmsg_queue_rx_cb, my_queue);int32_trpmsg_queue_rx_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv){ rpmsg_queue_rx_cb_data_t msg; RL_ASSERT(priv != RL_NULL); msg.data = payload; msg.len = payload_len; msg.src = src; /* if message is successfully added into queue then hold rpmsg buffer */ if (0 != env_put_queue(priv, &msg, 0)) { /* hold the rx buffer */ return RL_HOLD; } return RL_RELEASE;}
接着就是激活 rpmsg-lite 的消息队列线程,进而去处理消息。
6 rpmsg-lite通信流程(MCMGR)
MCMGR组件开源地址:https://github.com/nxp-mcuxpresso/mcux-mcmgr
1)mcmgr双核间同步
# M85# 首先注册一个event回调函数,实时检测来自次核的通知,也就是 APP_RPMSG_READY_EVENT_DATA 和 APP_RPMSG_EP_READY_EVENT_DATA事件static void RPMsgRemoteReadyEventHandler(uint16_t eventData, void *context){ uint16_t *data = (uint16_t *)context; *data = eventData;}/* Register the application event before starting the secondary core */ (void)MCMGR_RegisterEvent(kMCMGR_RemoteApplicationEvent, RPMsgRemoteReadyEventHandler, (void *)&RPMsgRemoteReadyEventData); while (APP_RPMSG_READY_EVENT_DATA != RPMsgRemoteReadyEventData) { }; /* Wait until the secondary core application signals the rpmsg remote endpoint has been created. */ while (APP_RPMSG_EP_READY_EVENT_DATA != RPMsgRemoteReadyEventData) { }--------------------------------------------------------------------------------------------------------------------------------# M33# 通过MCMGR_TriggerEvent触发IPC中断,发送通知给主核 /* Signal the other core we are ready by triggering the event and passing the APP_RPMSG_READY_EVENT_DATA */ (void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_READY_EVENT_DATA); /* Signal the other core the endpoint has been created by triggering the event and passing the * APP_RPMSG_READY_EP_EVENT_DATA */ (void)MCMGR_TriggerEvent(kMCMGR_RemoteApplicationEvent, APP_RPMSG_EP_READY_EVENT_DATA);
2)mcmgr双核间通信
<1>接收
接收逻辑关键取决于下面这个API:
struct rpmsg_lite_endpoint *rpmsg_lite_create_ept(struct rpmsg_lite_instance *rpmsg_lite_dev, uint32_t addr, rl_ept_rx_cb_t rx_cb, void *rx_cb_data, struct rpmsg_lite_ept_static_context *ept_context)
通过绑定的回调函数rx_cb,再由mcmgr多核管理器统一注册事件,并根据平台层提供的platform_notify机制触发env_isr,并触发到mcmgr_ipc_callback
来执行接收回调rx_cb,而这个触发由rpmsg_lite_send 提供触发动作(virtqueue_kick)
# 示例代码static int32_t my_ept_read_cb(void *payload, uint32_t payload_len, uint32_t src, void *priv){ int32_t *has_received = priv; if (payload_len <= sizeof(THE_MESSAGE)) { (void)rt_memcpy((void *)&msg, payload, payload_len); *has_received = 1; } (void)rt_kprintf("Primary core received a msg\r\n"); (void)rt_kprintf("Message: Size=%x, DATA = %i\r\n", payload_len, msg.DATA); return RL_RELEASE;}my_ept = rpmsg_lite_create_ept(my_rpmsg, LOCAL_EPT_ADDR, my_ept_read_cb, (void *)&has_received, &my_ept_context);
所以流程为:
mcmgr_ipc_callback ->mcmgr_event_handler ->env_isr ->virtqueue_notification ->rpmsg_lite_rx_callback(vq->callback_fc) ->my_ept_read_cb(ept->rx_cb)
<2>发送
发送逻辑关键取决于下面这个API:
int32_t rpmsg_lite_send(struct rpmsg_lite_instance *rpmsg_lite_dev, struct rpmsg_lite_endpoint *ept, uint32_t dst, char *data, uint32_t size, uintptr_t timeout)
流程为
(void)rpmsg_lite_send(my_rpmsg, my_ept, REMOTE_EPT_ADDR, (char *)&msg, sizeof(THE_MESSAGE), RL_DONT_BLOCK);return rpmsg_lite_format_message(rpmsg_lite_dev, ept->addr, dst, data, size, RL_NO_FLAGS, timeout);# 这里的data是我们传输过来的数据,那么后面就是这么几个步骤:# 1.从共享内存申请buffer(vq_tx_master)buffer = rpmsg_lite_dev->vq_ops->vq_tx_alloc(rpmsg_lite_dev->tvq, &buff_len, &idx);# 2.将共享地址传递给rpmsg_msg,并对该变量传递实际数据rpmsg_msg = (struct rpmsg_std_msg *)buffer;env_memcpy(rpmsg_msg->data, data, size);# 3.将前面申请的这段buffer标记为可用模式,并加入到vq_ring.avail中,一般这里应该是用于缓存下的多核通信/* Enqueue buffer on virtqueue. */rpmsg_lite_dev->vq_ops->vq_tx(rpmsg_lite_dev->tvq, buffer, buff_len, idx);# 最后执行通知机制virtqueue_kick ->vq_ring_notify_host(vq->notify_fc(vq)) ->platform_nofity -> (void)MCMGR_TriggerEventForce(kMCMGR_RemoteRPMsgEvent, (uint16_t)RL_GET_Q_ID(vector_id)); ->trig ipc callback ->mcmgr_ipc_callback
RT-Thread Github 开源仓库,欢迎撒个星(Star)支持,更期待你的代码贡献: https://github.com/RT-Thread/rt-thread
获取硬件
RT-Thread 与瑞萨电子联合推出 RA8P1 Titan Board,基于 1GHz Arm Cortex-M85 + 250MHz Cortex-M33 双核架构,集成 Ethos-U55 NPU ,实现 256 GOPS 的 AI 性能、超过 7300 CoreMarks 的突破性 CPU 性能和先进的人工智能 (AI) 功能,可支持语音、视觉和实时分析 AI 场景!
全部0条评论
快来发表一下你的评论吧 !