浅析ESP32运行MQTT客户端进行主题的发布和订阅的方法

电子说

1.3w人已加入

描述

ESP32 MQTT的库有很多,凌顺实验室(lingshunlab.com)这次主要使用AsyncMQTT_ESP32,以后有机会再更多的MQTT其他库的使用方法。

前提条件

树莓派部署本地的MQTT服务端,具体安装请查看以下连接:

https://lingshunlab.com/book/raspberry-pi/raspberry-pi-install-mosquitto-mqtt-server-and-test-mqtt

ESP32和树莓派在同一WIFI网络里面

效果实现

凌顺实验室(lingshunlab.com)在本示例展示了使用两个ESP32,分别实现发布MQTT的主题消息和订阅并输出MQTT的主题内容。当然,可能会问能不能一个ESP32同时又是发布者,又是订阅者?答案是可以的,因为作为客户端,都是对中间商做信息交换。

BOM

需要准备2个ESP32,
一个ESP32用于发布,
一个ESP32用于订阅。

库的安装

可以在Arduino IDE的库管理里搜索并安装:

点击菜单栏的「工具」---> 「库管理」,然后在搜索框中输入“AsyncMQTT_ESP32”,点击安装即可

下图在我本地已经安装好了:

回调函数

又或者在Github中下载,并安装到Arduino的 "libraries"文件夹里

Github 地址:

https://github.com/khoih-prog/AsyncMQTT_ESP32

程序提点

1, 首先,需要加载AsyncMQTT_ESP32的库

 

#include 

 

2,配置MQTT的服务器信息,可以是IP或者域名的方式

 

//#define MQTT_HOST         IPAddress(192, 168, 100, 100)
#define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

 

3,设置主题,发布需要主题,订阅也需要主题

 

const char *Topic  = "lingshunlab/ESP32";               // 主题

 

4,创建MQTT客户端的实例

 

// 创建MQTT客户端的实例,名为mqttClient
AsyncMqttClient mqttClient;

 

5,认识mqttClient的可用的回调函数

当MQTT触发特定事件的时候,可以配置自定义的函数

 

  mqttClient.onConnect(onMqttConnect);  // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
  mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当MQTT取消订阅主题时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息

 

6, 连接MQTT服务器

 

  mqttClient.setServer(MQTT_HOST, MQTT_PORT); //连接MQTT服务器

 

7,发布主题

通过以下代码,可以对配置好的主题发布消息

 

// 发布主题消息
  uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
  Serial.print("Publisshing at QoS 2, packetId: ");
  Serial.println(packetIdPub);
  delay(2000);

 

8,订阅主题

通过以下代码,可以订阅配置好的主题

 

// 订阅MQTT主题,并QoS设置为2
  uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
  Serial.print("Subscribing at QoS 2, packetId: ");
  Serial.println(packetIdSub);

 

9,当发生主题消息变化的时候的回调函数

mqttClient的回调函数有很多种,请仔细学习查看例子中其他的回调函数。在这里,特别说明一下onMessage的回调函数onMqttMessage(这个函数名称你可以自己定义,随喜),里面有不少参数,例如topic,payload等,其中payload即是消息的内容,可以通过输出显示。

 

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);   // 输出消息内容
}

 

10,请查看AsyncMQTT_ESP32的官方例子

可以学习到FreeRTOS的多线程如何应用。

程序代码

发布主题de完整代码

 

#include 

// 配置 WIFI 
#define WIFI_SSID         "***your wifi***"
#define WIFI_PASSWORD     "***your wifi password***"

// 加载AsyncMQTT_ESP32库
#include  

// 配置MQTT服务器地址和端口
#define MQTT_HOST         IPAddress(192,168,100,100)    // Broker IP
// #define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

const char *PubTopic  = "lingshunlab/ESP32";               // 发布消息的主题

AsyncMqttClient mqttClient; // 创建 MQTT客户端实例

void onMqttConnect(bool sessionPresent)   // 编写对应的回调函数
{
  Serial.println("=====On MQTT Connect=====");
  Serial.print("Connected to MQTT broker: ");
  Serial.print(MQTT_HOST);
  Serial.print(", port: ");
  Serial.println(MQTT_PORT);
  Serial.print("PubTopic: ");
  Serial.println(PubTopic);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
  (void) reason;

  Serial.println("Disconnected from MQTT.");
}

void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(const uint16_t& packetId)
{
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);   // 输出消息内容
}

void onMqttPublish(const uint16_t& packetId)
{
  Serial.println("Publish acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void setup()
{
  Serial.begin(115200);
  while (!Serial && millis() < 5000);
  delay(500);

  // 连接WIFI
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  while (WiFi.waitForConnectResult() != WL_CONNECTED) {
        Serial.println("Connection Failed! Rebooting...");
        delay(5000);
        ESP.restart(); // 重启esp32
      }
  delay(500);

  mqttClient.onConnect(onMqttConnect);  // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect); // 设置 当MQTT断开连接时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onPublish(onMqttPublish); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
  mqttClient.connect(); // 连接 MQTT

  delay(500);
}

void loop()
{
  // 发布主题消息
  uint16_t packetIdPub = mqttClient.publish(PubTopic, 2, true, "welcome to Lingshunlab.com");
  Serial.print("Publisshing at QoS 2, packetId: ");
  Serial.println(packetIdPub);
  delay(2000);
}

 

上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,再之后每隔2秒发布一个对应主题的消息

回调函数

订阅主题de完整代码

 

#include 

// 配置 WIFI 
#define WIFI_SSID         "***your wifi***"
#define WIFI_PASSWORD     "***your wifi password***"

// 加载 AsyncMQTT_ESP32 库
#include 

// 配置MQTT服务器地址和端口
#define MQTT_HOST         IPAddress(192,168,1,55)    // Broker IP
// #define MQTT_HOST         "broker.emqx.io"        // Broker address
#define MQTT_PORT         1883

const char *SubTopic  = "lingshunlab/ESP32";        // 订阅的主题

AsyncMqttClient mqttClient;

void onMqttConnect(bool sessionPresent)
{
  Serial.println("=====On MQTT Connect=====");
  Serial.print("Connected to MQTT broker: ");
  Serial.print(MQTT_HOST);
  Serial.print(", port: ");
  Serial.println(MQTT_PORT);
  Serial.print("PubTopic: ");
  Serial.println(SubTopic);

  // 订阅MQTT主题,并QoS设置为2
  uint16_t packetIdSub = mqttClient.subscribe(SubTopic, 2);
  Serial.print("Subscribing at QoS 2, packetId: ");
  Serial.println(packetIdSub);
}

void onMqttDisconnect(AsyncMqttClientDisconnectReason reason)
{
  (void) reason;

  Serial.println("Disconnected from MQTT.");
}

void onMqttSubscribe(const uint16_t& packetId, const uint8_t& qos)
{
  Serial.println("=====On MQTT Subscribe=====");
  Serial.println("Subscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
  Serial.print("  qos: ");
  Serial.println(qos);
}

void onMqttUnsubscribe(const uint16_t& packetId)
{
  Serial.println("Unsubscribe acknowledged.");
  Serial.print("  packetId: ");
  Serial.println(packetId);
}

void onMqttMessage(char* topic, char* payload, const AsyncMqttClientMessageProperties& properties,
                   const size_t& len, const size_t& index, const size_t& total)
{
  (void) payload;
  Serial.println("=====On MQTT Message=====");
  Serial.println("Publish received.");
  Serial.print("  topic: ");
  Serial.println(topic);
  Serial.print("  qos: ");
  Serial.println(properties.qos);
  Serial.print("  dup: ");
  Serial.println(properties.dup);
  Serial.print("  retain: ");
  Serial.println(properties.retain);
  Serial.print("  len: ");
  Serial.println(len);
  Serial.print("  index: ");
  Serial.println(index);
  Serial.print("  total: ");
  Serial.println(total);
  Serial.print("payload: ");
  Serial.println(payload);  
}

void setup()
{
  Serial.begin(115200);
  while (!Serial && millis() < 5000);
  delay(500);

  // 连接WIFI
  WiFi.begin(WIFI_SSID, WIFI_PASSWORD);
  while (WiFi.waitForConnectResult() != WL_CONNECTED) {
        Serial.println("Connection Failed! Rebooting...");
        delay(5000);
        ESP.restart();
      }
  delay(500);

  mqttClient.onConnect(onMqttConnect); // 设置 当MQTT连接时的回调函数
  mqttClient.onDisconnect(onMqttDisconnect);  // 设置 当MQTT断开连接时的回调函数
  mqttClient.onSubscribe(onMqttSubscribe); // 设置 当MQTT订阅主题时的回调函数
  mqttClient.onUnsubscribe(onMqttUnsubscribe); // 设置 当取消MQTT订阅主题时的回调函数
  mqttClient.onMessage(onMqttMessage); // 设置 当MQTT收到主题消息时的回调函数
  mqttClient.setServer(MQTT_HOST, MQTT_PORT); // 设置 MQTT服务器信息
  mqttClient.connect(); // 连接 MQTT

  delay(500);
}

void loop()
{

}

 

上传代码后,程序将会先连接WIFI,然后连接MQTT服务器,当连接MQTT时,则会订阅主题,之后每隔2秒就会收到主题发布的消息

回调函数




审核编辑:刘清

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分