Air780EPM通过MQTT上传温湿度数据

描述

重要提醒:

1、当你安装配置好 智能体 、规则和技能后,最终一定要参考验证智能体、规则和技能是否安装成功来验证安装配置是否正确;

2、Trae中内置的免费大 模型 ,会经常排队,并且性能不可控,容易出问题,所以推荐参考:001 发送会话请求时,提示排队,如何解决?的方法,订阅收费的大模型(目前每月40元);这样可以大大提高AI性能;如果不想使用收费模型,在非正常工作时间段内,使用内置的免费模型也能勉强凑合;

3、选择收费大模型时,经过我们的实际测试,根据工作任务的不同,可以按照如下建议选择(仅供参考,具体情况还需要根据你自己的实际使用情况来定):

代码开发任务,优先选择GLM(可能是使用的人数太多,有时候处理较慢);如果GLM处理太慢,再考虑切换到MiniMax;

其他任务,可以首先选择ark-code-latest,其次选择MiniMax,最后选择GLM(可能是使用的人数太多,有时候处理较慢);

一、概述

在本文中,我们来演示如何使用 luatos-docs-code 协助开发 MQTT上传温湿度数据 项目脚本代码;

鉴于我们第一次

二、项目功能需求

帮我生成一个 LuatOS 项目代码,功能需求如下:

硬件模组:Air780EPM

软件功能需求:

支持 MQTT 长连接,MQTT 服务器的地址为"lbsmqtt.airm2m.com",端口为 1234

MQTT 连接出现异常后,支持重连

每隔一分钟读取 AirSHT30_1000 的温湿度数据,将读取到的温湿度数据上传到 MQTT 服务器

软件代码设计要求:

项目脚本要包含 main.lua 和一个或者多个其他具体功能应用的 lua 文件,main.lua 文件、其他 lua 文件功能都要模块化解耦设计

main.lua 中,要有最基本的项目名,版本号,fota 升级使用说明,errDump 使用说明,require 其他功能应用的 lua 文件,sys.run()代码

代码存储路径:新建一个 mqtt_temprature_humdity 文件夹 ,将项目的脚本文件和资源文件都保存到这个文件夹

三、使用 luatos-docs-code 开发以及调试过程

3.1 输入项目需求,luatos-docs-code 第一次生成代码

MQTT

 

 

 

添加图片注释,不超过 140 字(可选)

3.2 生成的代码存在的问题

刚才生成的代码存在以下问题:

3.2.1 main.lua 编程规范需要改进

要遵循 Air780EPM demo 代码的以下编程规范

main.lua 中,要有最基本的项目名,版本号,fota 升级使用说明,errDump 使用说明,require 其他功能应用的 lua 文件,sys.run()代码,不需要 require “sys”,请你找一个 demo 目录下的 main.lua 文件学习一下,然后完全遵守 demo 中 main.lua 的格式和规范,注释也要详细

3.2.2 获取 4G 模组信号强度的函数好像不对

请你再修复一下

3.3 告知 luatos-docs-code 存在的问题,让它重新生成代码

3.3.1 修复 main.lua

将第一个问题告知 luatos-docs-code 后,让它重新生成,交互过程如下

MQTT

 

 

 

添加图片注释,不超过 140 字(可选)

这次生成的 main.lua 代码质量就很高了,给他提的这个问题已经解决!

3.3.2 修复获取 4G 模组信号强度的函数使用

MQTT

 

 

 

添加图片注释,不超过 140 字(可选)

3.4 最终生成的项目脚本代码

3.4.1 项目代码目录结构

 

mqtt\_temprature\_humdity/ ├── main.lua # 项目主入口文件 ├── config.lua # 配置文件 ├── app\_mqtt\_main.lua # MQTT 客户端主应用模块 ├── app\_mqtt\_sender.lua # MQTT 数据发送模块 ├── app\_mqtt\_receiver.lua # MQTT 数据接收模块 ├── app\_sht30.lua # 温湿度传感器应用模块 └── AirSHT30\_1000.lua # 温湿度传感器驱动文件

 

3.4.2 main.lua

 

--[[ @module main @summary LuatOS用户应用脚本文件入口,总体调度应用逻辑 @version 1.0 @date 2026.03.15 @author LuatOS @usage 本demo演示的核心功能为: 1. 支持MQTT长连接,MQTT服务器地址为"lbsmqtt.airm2m.com",端口为1234 2. MQTT连接出现异常后,支持自动重连 3. 每隔一分钟读取AirSHT30_1000的温湿度数据 4. 将读取到的温湿度数据以及4G模组的信号强度上传到MQTT服务器 更多说明参考本目录下的readme.md文件 ]] --[[ 必须定义PROJECT和VERSION变量,Luatools工具会用到这两个变量,远程升级功能也会用到这两个变量 PROJECT:项目名,ascii string类型 可以随便定义,只要不使用,就行 VERSION:项目版本号,ascii string类型 如果使用合宙iot.openluat.com进行远程升级,必须按照"XXX.YYY.ZZZ"三段格式定义: X、Y、Z各表示1位数字,三个X表示的数字可以相同,也可以不同,同理三个Y和三个Z表示的数字也是可以相同,可以不同 因为历史原因,YYY这三位数字必须存在,但是没有任何用处,可以一直写为999 如果不使用合宙iot.openluat.com进行远程升级,根据自己项目的需求,自定义格式即可 ]] PROJECT = "MQTT_TEMP_HUMIDITY" VERSION = "001.999.000" log.info("main", "project name is ", PROJECT, "version is ", VERSION) -- 如果内核固件支持errDump功能,此处进行配置,【强烈建议打开此处的注释】 -- 因为此功能模块可以记录并且上传脚本在运行过程中出现的语法错误或者其他自定义的错误信息,可以初步分析一些设备运行异常的问题 -- 以下代码是最基本的用法,更复杂的用法可以详细阅读API说明文档 -- 启动errDump日志存储并且上传功能,600秒上传一次 -- if errDump then -- errDump.config(true, 600) -- end -- 使用LuatOS开发的任何一个项目,都强烈建议使用远程升级FOTA功能 -- 可以使用合宙的iot.openluat.com平台进行远程升级 -- 也可以使用客户自己搭建的平台进行远程升级 -- 远程升级的详细用法,可以参考fota的demo进行使用 -- 启动一个循环定时器 -- 每隔3秒钟打印一次总内存,实时的已使用内存,历史最高的已使用内存情况 -- 方便分析内存使用是否有异常 -- sys.timerLoopStart(function() -- log.info("mem.lua", rtos.meminfo()) -- log.info("mem.sys", rtos.meminfo("sys")) -- end, 3000) -- 加载配置文件 require "config" -- 加载MQTT客户端主应用模块 require "app_mqtt_main" -- 加载MQTT客户端数据发送模块 require "app_mqtt_sender" -- 加载MQTT客户端数据接收模块 require "app_mqtt_receiver" -- 加载温湿度传感器应用模块 require "app_sht30" -- 用户代码已结束--------------------------------------------- -- 结尾总是这一句 sys.run() -- sys.run()之后后面不要加任何语句!!!!!

 

3.4.3 config.lua

 

--[[ @module config @summary 配置文件 @version 1.0 @date 2026.03.15 @author LuatOS @usage 本文件包含项目的配置信息 ]] local config = {} -- MQTT服务器配置 config.mqtt = { server_addr = "lbsmqtt.airm2m.com", -- MQTT服务器地址 server_port = 1234, -- MQTT服务器端口 client_id = "", -- 客户端ID,将在运行时设置为IMEI username = "", -- MQTT用户名 password = "", -- MQTT密码 keepalive = 120, -- 心跳间隔(秒) qos = 0 -- QoS等级 } -- 温湿度传感器配置 config.sht30 = { i2c_id = 0, -- I2C总线ID slave_addr = 0x44 -- 传感器从机地址 } -- 数据上传配置 config.data = { upload_interval = 60 * 1000, -- 上传间隔(毫秒) topic = "/up" -- MQTT上传主题后缀 } return config

 

3.4.4 AirSHT30_1000.lua

 

--[[ @module AirSHT30_1000 @summary AirSHT30_1000应用功能模块 @version 1.0 @date 2026.03.15 @author LuatOS @usage 本文件为AirSHT30_1000驱动配置文件,核心业务逻辑为: 1、打开AirSHT30_1000; 2、读取温湿度数据; 本文件没有对外接口,直接require "AirSHT30_1000"就可以加载运行; ]] -- 本文件中的主机是指I2C主机,具体指Air780EXX系列每个模组 -- 本文件中的从机是指I2C从机,具体指AirSHT30_1000配件板上的sht30温湿度传感器芯片 local AirSHT30_1000 ={ -- i2c_id:主机的i2c id; } -- 从机地址为0x44 local slave_addr = 0x44 -- 计算数据表data中所有数据元素的crc8校验值 local function crc8(data) local crc = 0xFF for i = 1, #data do crc = bit.bxor(crc, data[i]) for j = 1, 8 do crc = crc * 2 if crc >= 0x100 then crc = bit.band(bit.bxor(crc, 0x31), 0xff) end end end return crc end -- 打开AirSHT30_1000; --i2c_id:number类型; -- 主机使用的I2C ID,用来控制AirSHT30_1000; -- 取值范围:仅支持0和1; -- 如果没有传入此参数,则默认为0; --返回值:成功返回true,失败返回false function AirSHT30_1000.open(i2c_id) --如果i2c_id为nil,则赋值为默认值0 if i2c_id==nil then i2c_id=0 end --检查参数的合法性 if not (i2c_id == 0 or i2c_id == 1) then log.error("AirSHT30_1000.open", "invalid i2c_id", i2c_id) return false end AirSHT30_1000.i2c_id = i2c_id --初始化I2C if i2c.setup(i2c_id, i2c.FAST) ~= 1 then log.error("AirSHT30_1000.open", "i2c.setup error", i2c_id) return false end return true end -- 读取温湿度数据; -- 返回值:失败返回false; -- 成功返回两个值,第一个为摄氏温度值(number类型,例如23.6表示23.6摄氏度),第二个为百分比湿度值(number类型,例如67表示67%的湿度) function AirSHT30_1000.read() -- 发送启动测量命令(高精度) i2c.send(AirSHT30_1000.i2c_id, slave_addr, {0x24, 0x00}) -- 等待测量完成(SHT30高精度测量需~15ms) sys.wait(20) -- 读取6字节数据(温度高/低 + CRC,湿度高/低 + CRC) local data = i2c.recv(AirSHT30_1000.i2c_id, slave_addr, 6) -- 如果没有读取到6字节数据 if type(data)~="string" or data:len()~=6 then log.error("AirSHT30_1000.read", "i2c.recv error") return false end -- log.info("AirSHT30_1000.read", data:toHex()) --如果校验值正确 if crc8({data:byte(1), data:byte(2)}) == data:byte(3) and crc8({data:byte(4), data:byte(5)}) == data:byte(6) then -- 提取原始温度值 local temp_raw = (data:byte(1) << 8) | data:byte(2) -- 提取原始湿度值 local hum_raw = (data:byte(4) << 8) | data:byte(5) -- 转换为实际值(根据SHT30数据手册公式) local temprature = (-45 + 175 * temp_raw / 65535.0) local humidity = (100 * hum_raw / 65535.0) -- 打印输出结果(保留2位小数) -- log.info("AirSHT30_1000.read", "temprature", string.format("%.2f ℃", temprature)) -- log.info("AirSHT30_1000.read", "temprature", string.format("%.2f %%RH", humidity)) return temprature, humidity else log.error("AirSHT30_1000.read", "crc error", i2c_id) return false end end -- 关闭AirSHT30_1000; -- 返回值:成功返回true,失败返回false function AirSHT30_1000.close() --close接口没有返回值,理论上不会关闭失败 i2c.close(AirSHT30_1000.i2c_id) return true end return AirSHT30_1000

 

3.4.5 app_sht30.lua

 

--[[ @module app_sht30 @summary 温湿度传感器应用模块 @version 1.0 @date 2026.03.15 @author LuatOS @usage 本文件为温湿度传感器应用模块,核心业务逻辑为: 1、初始化AirSHT30_1000传感器; 2、每分钟读取一次温湿度数据; 3、读取4G模组的信号强度; 4、将数据打包成JSON格式并上传到MQTT服务器; 本文件没有对外接口,直接在main.lua中require "app_sht30"就可以加载运行; ]] -- 加载配置文件 local config = require "config" -- 加载AirSHT30_1000驱动 local AirSHT30_1000 = require "AirSHT30_1000" -- 主题前缀:IMEI号 local TOPIC_PREFIX = mobile.imei() -- 数据发送结果回调函数 local function send_data_cbfunc(result) log.info("app_sht30", "send data result", result) end -- 读取并上传温湿度数据的任务函数 local function sht30_task() -- 打开AirSHT30_1000传感器 local open_result = AirSHT30_1000.open(config.sht30.i2c_id) if not open_result then log.error("app_sht30", "AirSHT30_1000 open failed") return end -- 启动MQTT长连接 sys.publish("MQTT_CLIENT_RUN_REQ", true) while true do -- 读取温湿度数据 local temperature, humidity = AirSHT30_1000.read() if temperature and humidity then -- 读取4G信号强度 local csq = mobile.csq() -- 构建JSON数据 local data = { temperature = temperature, humidity = humidity, signal = csq, timestamp = os.time() } local json_data = json.encode(data) log.info("app_sht30", "read data", json_data) -- 发送数据到MQTT服务器 sys.publish("SEND_DATA_REQ", "sht30", TOPIC_PREFIX .. config.data.topic, json_data, config.mqtt.qos, {func = send_data_cbfunc}) else log.error("app_sht30", "read data failed") end -- 等待指定的上传间隔 sys.wait(config.data.upload_interval) end end -- 启动温湿度传感器任务 sys.taskInit(sht30_task)

 

3.4.6 app_mqtt_main.lua

 

--[[ @module app_mqtt_main @summary MQTT客户端主应用模块 @version 1.0 @date 2026.03.15 @author LuatOS @usage 本文件为MQTT客户端主应用模块,核心业务逻辑为: 1、创建MQTT客户端,连接服务器; 2、处理连接/订阅/取消订阅/异常逻辑,出现异常后执行重连动作; 3、调用mqtt_receiver的外部接口mqtt_receiver.proc,对接收到的publish数据进行处理; 4、调用sys.sendMsg接口,发送"CONNECT OK"、"PUBLISH OK"和"DISCONNECTED"三种类型的"MQTT_EVENT"消息到mqtt_sender的task,控制publish数据发送逻辑; 5、收到MQTT心跳应答后,执行sys.publish("FEED_NETWORK_WATCHDOG") 对网络环境检测看门狗功能模块进行喂狗; 本文件没有对外接口,直接在main.lua中require "app_mqtt_main"就可以加载运行; ]] -- 加载配置文件 local config = require "config" -- 加载mqtt client数据接收功能模块 local mqtt_receiver = require "app_mqtt_receiver" -- 加载mqtt client数据发送功能模块 local mqtt_sender = require "app_mqtt_sender" -- mqtt服务器地址和端口 local SERVER_ADDR = config.mqtt.server_addr local SERVER_PORT = config.mqtt.server_port -- mqtt_main的任务名 local TASK_NAME = mqtt_sender.TASK_NAME_PREFIX.."main" -- mqtt主题的前缀:IMEI号 local TOPIC_PREFIX = mobile.imei() -- mqtt client的事件回调函数 local function mqtt_client_event_cbfunc(mqtt_client, event, data, payload, metas) log.info("mqtt_client_event_cbfunc", mqtt_client, event, data, payload, json.encode(metas)) -- mqtt连接成功 if event == "conack" then sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CONNECT", true) -- 订阅单主题 -- 第二个参数表示qos,取值范围为0,1,2,如果不设置,默认为0 -- if not mqtt_client:subscribe(TOPIC_PREFIX .. "/down") then -- sys.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", false, -1) -- end -- 订阅多主题,如果有需要,打开注释 -- 表中的每一个订阅主题的格式为[topic]=qos -- if not mqtt_client:subscribe( -- { -- [(TOPIC_PREFIX .. "/data"]=0, -- [(TOPIC_PREFIX .. "/cmd"]=1 -- } -- ) then -- sys.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", false, -1) -- end -- 订阅结果 -- data:订阅应答结果,true为成功,false为失败 -- payload:number类型;成功时表示qos,取值范围为0,1,2;失败时表示失败码,一般是0x80 elseif event == "suback" then -- 发送消息通知 mqtt main task sys.sendMsg(TASK_NAME, "MQTT_EVENT", "SUBSCRIBE", data, payload) -- 取消订阅成功 elseif event == "unsuback" then -- 发送消息通知 mqtt main task sys.sendMsg(TASK_NAME, "MQTT_EVENT", "UNSUBSCRIBE", true) -- 接收到服务器下发的publish数据 -- data:string类型,表示topic -- payload:string类型,表示payload -- metas:table类型,数据内容如下 -- { -- qos: number类型,取值范围0,1,2 -- retain:number类型,取值范围0,1 -- dup:number类型,取值范围0,1 -- message_id: number类型 -- } elseif event == "recv" then -- 对接收到的publish数据处理 mqtt_receiver.proc(data, payload, metas) -- 发送成功publish数据 -- data:number类型,表示message id elseif event == "sent" then -- 发送消息通知 mqtt sender task sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "PUBLISH_OK", data) -- 服务器断开mqtt连接 elseif event == "disconnect" then -- 发送消息通知 mqtt main task sys.sendMsg(TASK_NAME, "MQTT_EVENT", "DISCONNECTED", false) -- 收到服务器的心跳应答 elseif event == "pong" then -- 接收到数据,通知网络环境检测看门狗功能模块进行喂狗 sys.publish("FEED_NETWORK_WATCHDOG") -- 严重异常,本地会主动断开连接 -- data:string类型,表示具体的异常,有以下几种: -- "connect":tcp连接失败 -- "tx":数据发送失败 -- "conack":mqtt connect后,服务器应答CONNACK鉴权失败,失败码为payload(number类型) -- "other":其他异常 elseif event == "error" then if data == "connect" or data == "conack" then -- 发送消息通知 mqtt main task,连接失败 sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CONNECT", false) elseif data == "other" or data == "tx" then -- 发送消息通知 mqtt main task,出现异常 sys.sendMsg(TASK_NAME, "MQTT_EVENT", "ERROR") end end end -- mqtt main task 的任务处理函数 -- auto_reconnect:短连接还是长连接;长连接为true,表示断开连接后会自动重连;短连接为false,表示断开连接后,不会自动重连 local function mqtt_client_main_task_func(auto_reconnect) local mqtt_client local result, msg while true do -- 如果当前时间点设置的默认网卡还没有连接成功,一直在这里循环等待 while not socket.adapter(socket.dft()) do log.warn("mqtt_client_main_task_func", "wait IP_READY", socket.dft()) -- 在此处阻塞等待默认网卡连接成功的消息"IP_READY" -- 或者等待1秒超时退出阻塞等待状态; -- 注意:此处的1000毫秒超时不要修改的更长; -- 因为当使用exnetif.set_priority_order配置多个网卡连接外网的优先级时,会隐式的修改默认使用的网卡 -- 当exnetif.set_priority_order的调用时序和此处的socket.adapter(socket.dft())判断时序有可能不匹配 -- 此处的1秒,能够保证,即使时序不匹配,也能1秒钟退出阻塞状态,再去判断socket.adapter(socket.dft()) sys.waitUntil("IP_READY", 1000) end -- 检测到了IP_READY消息 log.info("mqtt_client_main_task_func", "recv IP_READY", socket.dft()) -- 清空此task绑定的消息队列中的未处理的消息 sys.cleanMsg(TASK_NAME) -- 创建mqtt client对象 mqtt_client = mqtt.create(nil, SERVER_ADDR, SERVER_PORT) -- 如果创建mqtt client对象失败 if not mqtt_client then log.error("mqtt_client_main_task_func", "mqtt.create error") goto EXCEPTION_PROC end -- 配置mqtt client对象的client id,username,password和clean session标志 result = mqtt_client:auth(TASK_NAME..mobile.imei(), config.mqtt.username, config.mqtt.password, true) -- 如果配置失败 if not result then log.error("mqtt_client_main_task_func", "mqtt_client:auth error") goto EXCEPTION_PROC end -- 注册mqtt client对象的事件回调函数 mqtt_client:on(mqtt_client_event_cbfunc) -- 设置mqtt keepalive时间 mqtt_client:keepalive(config.mqtt.keepalive) -- 设置遗嘱消息,有需要的话,可以打开注释 -- mqtt_client:will(TOPIC_PREFIX .. "/status", "offline") -- 连接server result = mqtt_client:connect() -- 如果连接server失败 if not result then log.error("mqtt_client_main_task_func", "mqtt_client:connect error") goto EXCEPTION_PROC end -- 连接、断开连接、订阅、取消订阅、异常等各种事件的处理调度逻辑 while true do -- 等待"MQTT_EVENT"消息 msg = sys.waitMsg(TASK_NAME, "MQTT_EVENT") log.info("mqtt_client_main_task_func waitMsg", msg[2], msg[3], msg[4]) -- connect连接结果 -- msg[3]表示连接结果,true为连接成功,false为连接失败 if msg[2] == "CONNECT" then -- mqtt连接成功 if msg[3] then log.info("mqtt_client_main_task_func", "connect success") -- 通知mqtt sender数据发送应用模块的task,MQTT连接成功 sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "CONNECT_OK", mqtt_client) -- mqtt连接失败 else log.info("mqtt_client_main_task_func", "connect error") -- 退出循环,发起重连 break end -- subscribe订阅结果 -- msg[3]表示订阅结果,true为订阅成功,false为订阅失败 elseif msg[2] == "SUBSCRIBE" then -- 订阅成功 if msg[3] then log.info("mqtt_client_main_task_func", "subscribe success", "qos: "..(msg[4] or "nil")) -- 订阅失败 else log.error("mqtt_client_main_task_func", "subscribe error", "code", msg[4]) -- 主动断开mqtt client连接 mqtt_client:disconnect() -- 发送disconnect之后,此处延时1秒,给数据发送预留一点儿时间,发送到服务器; -- 即使1秒的时间不足以发送给服务器也没关系;对服务器来说,mqtt客户端只是没有优雅的断开,不影响什么实质功能; sys.wait(1000) break end -- unsubscribe取消订阅成功 elseif msg[2] == "UNSUBSCRIBE" then log.info("mqtt_client_main_task_func", "unsubscribe success") -- 需要主动关闭mqtt连接 -- 用户需要主动关闭mqtt连接时,可以调用sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CLOSE") elseif msg[2] == "CLOSE" then -- 主动断开mqtt client连接 mqtt_client:disconnect() -- 发送disconnect之后,此处延时1秒,给数据发送预留一点儿时间,发送到服务器; -- 即使1秒的时间不足以发送给服务器也没关系;对服务器来说,mqtt客户端只是没有优雅的断开,不影响什么实质功能; -- sys.wait(1000) break -- 被动关闭了mqtt连接 -- 被网络或者服务器断开了连接 elseif msg[2] == "DISCONNECTED" then break -- 出现了其他异常 elseif msg[2] == "ERROR" then break end end -- 出现异常 ::EXCEPTION_PROC:: -- 清空此task绑定的消息队列中的未处理的消息 sys.cleanMsg(TASK_NAME) -- 通知mqtt sender数据发送应用模块的task,MQTT连接已经断开 sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "DISCONNECTED") -- 如果存在mqtt client对象 if mqtt_client then -- 关闭mqtt client,并且释放mqtt client对象 mqtt_client:close() mqtt_client = nil end -- 根据是否需要自动重连标志,决定后续执行的动作 if auto_reconnect then -- 5秒后跳转到循环体开始位置,自动发起重连 sys.wait(5000) else sys.publish("MQTT_CLIENT_CLOSE_RSP") log.warn("mqtt_client_main_task_func", "exit") break end end -- 清除此task对应的管理表资源 sys.taskDel(TASK_NAME) end local function mqtt_client_run_req(connection_type) --创建并且启动一个task --运行这个task的处理函数mqtt_client_main_task_func sys.taskInitEx(mqtt_client_main_task_func, TASK_NAME, nil, connection_type) end local function mqtt_client_close_req() sys.sendMsg(TASK_NAME, "MQTT_EVENT", "CLOSE") end sys.subscribe("MQTT_CLIENT_RUN_REQ", mqtt_client_run_req) sys.subscribe("MQTT_CLIENT_CLOSE_REQ", mqtt_client_close_req)

 

3.4.7 app_mqtt_sender.lua

 

--[[ @module app_mqtt_sender @summary MQTT客户端数据发送应用模块 @version 1.0 @date 2026.03.15 @author LuatOS @usage 本文件为MQTT客户端数据发送应用模块,核心业务逻辑为: 1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func)订阅"SEND_DATA_REQ"消息,将其他应用模块需要发送的数据存储到队列send_queue中; 2、mqtt sender task接收"CONNECT OK"、"PUBLISH_REQ"、"PUBLISH OK"三种类型的"MQTT_EVENT"消息,遍历队列send_queue,逐条发送数据到server; 3、mqtt sender task接收"DISCONNECTED"类型的"MQTT_EVENT"消息,丢弃掉队列send_queue中未发送的数据; 4、任何一条数据无论发送成功还是失败,只要这条数据有回调函数,都会通过回调函数通知数据发送方; 本文件的对外接口有1个: 1、sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func):订阅"SEND_DATA_REQ"消息; 其他应用模块如果需要发送数据,直接sys.publish这个消息即可,将需要发送的topic,payload和qos以及回调函数和回调参数一起publish出去; ]] local mqtt_sender = {} --[[ 数据发送队列,数据结构为: { [1] = {topic="topic1", payload="payload1", qos=0, cb={func=callback_function1, para=callback_para1}}, [2] = {topic="topic2", payload="payload2", qos=1, cb={func=callback_function2, para=callback_para2}}, [3] = {topic="topic3", payload="payload3", qos=2, cb={func=callback_function3, para=callback_para3}}, } topic的内容为publish的主题,string类型,必须存在; payload的内容为publish的负载数据,string类型,必须存在; qos的内容为publish的质量等级,number类型,取值范围0,1,2,可选,如果用户没有指定,默认为0; cb.func的内容为数据发送结果的用户回调函数,可以不存在; cb.para的内容为数据发送结果的用户回调函数的回调参数,可以不存在; ]] local send_queue = {} -- mqtt client的任务名前缀 mqtt_sender.TASK_NAME_PREFIX = "mqtt_" -- mqtt_client_sender的任务名 mqtt_sender.TASK_NAME = mqtt_sender.TASK_NAME_PREFIX.."sender" -- "SEND_DATA_REQ"消息的处理函数 local function send_data_req_proc_func(tag, topic, payload, qos, cb) -- 将原始数据增加前缀,然后插入到发送队列send_queue中 table.insert(send_queue, {topic=topic, payload=payload, qos=qos or 0, cb=cb}) -- 发送消息通知 mqtt sender task,有新数据等待发送 sys.sendMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT", "PUBLISH_REQ") end -- 按照顺序发送send_queue中的数据 -- 如果调用publish接口成功,则返回当前正在发送的数据项 -- 如果调用publish接口失败,通知回调函数发送失败后,继续发送下一条数据 local function publish_item(mqtt_client) local item -- 如果发送队列中有数据等待发送 while #send_queue>0 do -- 取出来第一条数据赋值给item -- 同时从队列send_queue中删除这一条数据 item = table.remove(send_queue, 1) -- publish数据 -- result表示调用publish接口的同步结果,返回值有以下几种: -- 如果失败,返回nil -- 如果成功,number类型,qos为0时直接返回0;qos为1或者2时返回publish报文的message id result = mqtt_client:publish(item.topic, item.payload, item.qos) -- publish接口调用成功 if result then return item -- publish接口调用失败 else -- 如果当前发送的数据有用户回调函数,则执行用户回调函数 if item.cb and item.cb.func then item.cb.func(false, item.cb.para) end end end end local function publish_item_cbfunc(item, result) if item then -- 如果当前发送的数据有用户回调函数,则执行用户回调函数 if item.cb and item.cb.func then item.cb.func(result, item.cb.para) end end end -- mqtt client sender的任务处理函数 local function mqtt_client_sender_task_func() local mqtt_client local send_item local result, msg while true do -- 等待"MQTT_EVENT"消息 msg = sys.waitMsg(mqtt_sender.TASK_NAME, "MQTT_EVENT") -- mqtt连接成功 -- msg[3]表示mqtt client对象 if msg[2] == "CONNECT_OK" then mqtt_client = msg[3] -- 发送send_queue中的数据 send_item = publish_item(mqtt_client) -- mqtt publish数据请求 elseif msg[2] == "PUBLISH_REQ" then -- 如果mqtt client对象存在,并且没有正在等待发送结果的发送数据项 if mqtt_client and not send_item then -- 发送send_queue中的数据 send_item = publish_item(mqtt_client) end -- mqtt publish数据成功 elseif msg[2] == "PUBLISH_OK" then -- publish成功,执行回调函数通知发送方 publish_item_cbfunc(send_item, true) -- publish成功,通知网络环境检测看门狗功能模块进行喂狗 sys.publish("FEED_NETWORK_WATCHDOG") -- 发送send_queue中的数据 send_item = publish_item(mqtt_client) -- mqtt断开连接 elseif msg[2] == "DISCONNECTED" then -- 清空mqtt client对象 mqtt_client = nil -- 如果存在正在等待发送结果的发送项,执行回调函数通知发送方失败 publish_item_cbfunc(send_item, false) -- 如果发送队列中有数据等待发送 while #send_queue>0 do -- 取出来第一条数据赋值给send_item -- 同时从队列send_queue中删除这一条数据 send_item = table.remove(send_queue,1) -- 执行回调函数通知发送方失败 publish_item_cbfunc(send_item, false) end -- 当前没有正在等待发送结果的发送项 send_item = nil end end end -- 订阅"SEND_DATA_REQ"消息; -- 其他应用模块如果需要发送数据,直接sys.publish这个消息即可,将需要发送的数据以及回调函数和回调参数一起publish出去; sys.subscribe("SEND_DATA_REQ", send_data_req_proc_func) --创建并且启动一个task --运行这个task的处理函数mqtt_client_sender_task_func sys.taskInitEx(mqtt_client_sender_task_func, mqtt_sender.TASK_NAME) return mqtt_sender

 

3.4.8 app_mqtt_receiver.lua

 

--[[ @module app_mqtt_receiver @summary MQTT客户端数据接收应用模块 @version 1.0 @date 2026.03.15 @author LuatOS @usage 本文件为MQTT客户端数据接收应用模块,核心业务逻辑为: 1、提供mqtt_receiver.proc接口,对接收到的publish数据进行处理; 本文件的对外接口有1个: 1、mqtt_receiver.proc(data, payload, metas):处理接收到的publish数据; data:string类型,表示topic; payload:string类型,表示payload; metas:table类型,表示消息的元数据; ]] local mqtt_receiver = {} -- 处理接收到的publish数据 -- data:string类型,表示topic -- payload:string类型,表示payload -- metas:table类型,数据内容如下 -- { -- qos: number类型,取值范围0,1,2 -- retain:number类型,取值范围0,1 -- dup:number类型,取值范围0,1 -- message_id: number类型 -- } function mqtt_receiver.proc(data, payload, metas) -- 打印接收到的数据 log.info("mqtt_receiver.proc", "topic:", data, "payload:", payload, "metas:", json.encode(metas)) -- 这里可以根据实际需求处理接收到的数据 -- 例如:解析命令、控制设备等 end return mqtt_receiver

 

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分