Python 是一种跨平台的计算机程序设计语言,是 ABC 语言的替代品,属于面向对象的动态类型语言。它最初被设计用于编写自动化脚本,随着版本的不断更新和语言新功能的添加,越来越多被用于独立的、大型项目的开发。
MQTT 是一个物联网传输协议,用于轻量级的发布/订阅式消息传输,旨在为低带宽和不稳定的网络环境中的物联网设备提供可靠的网络服务。其轻量、简单、开放和易于实现等特点,使得它适用范围更加广泛。
本文主要介绍如何在 Python 项目中使用 paho-mqtt 客户端库 ,实现客户端与 MQTT 服务器的连接、订阅、取消订阅、收发消息等功能。
本项目使用 Python 3.10 进行开发测试。
用户可用以下命令来确认 Python 的版本:
python3 --version
Python 3.10.9
测试设备:
瑞科慧联(RAK)网关 RAK7268 V2、带温湿度传感器的数据采集器 Sensor Hub
paho-mqtt 是目前 Python 中使用较多的 MQTT 客户端库。它为 Python 2.7 或 3.x 版本以上的客户端类提供了对 MQTT v3.1 和 v3.1.1 的支持,还提供了一些帮助程序功能。这使得消息发布到 MQTT 服务器变得更简单。
Pip 是 Python 包管理工具。该工具提供了对 Python 包的查找、下载、安装、卸载的功能。
pip3 install paho.mqtt
本文将使用瑞科慧联 LoRaWAN® 网关提供的内置 MQTT 服务,该服务基于 Mosquitto 的开源消息代理。服务器接入信息如下:
from paho.mqtt import client as mqtt
设置 MQTT Broker 连接地址,端口以及 topic,同时调用 Python random.randint 函数随机生成 MQTT 客户端 id。
MQTT_SERVER_IP = "192.168.230.1"
MQTT_PORT = 1883
编写连接回调函数 on_connect,该函数将在客户端连接后会被调用。在该函数中可以依据 rc 来判断客户端是否连接成功。同时可创建一个 MQTT 客户端连接到 broker.emqx.io。
def mqtt_connect(MQTT_SERVER_IP, MQTT_PORT):
"""连接MQTT服务器"""
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
mqttClient = mqtt.Client(client_id)
mqttClient.on_connect = on_connect # 返回连接状态的回调函数
mqttClient.on_message = on_message # 返回订阅消息回调函数
MQTT_HOST = MQTT_SERVER_IP # MQTT服务器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username", "password") # mqtt服务器账号密码
mqttClient.connect(MQTT_HOST, MQTT_PORT, 60)
mqttClient.loop_start() # 启用线程连接
return mqttClient
定义一个 while 循环语句,在循环中设置每秒调用 MQTT 客户端 publish 函数向 /python/mqtt 主题发送消息。
ddef on_publish():
# 发布消息
msg_count = 0
while True:
time.sleep(1)
mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)
topic = 'application/1/device/0000000000000444/tx' # 发布的主题,订阅时需要使用这个主题才能订阅此消息
msg = '{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'
result = mqttClient.publish(topic, msg)
status = result[0]
if status == 0:
print('第{}条消息发送成功'.format(msg_count))
else:
print('第{}条消息发送失败'.format(msg_count))
msg_count += 1
编写消息回调函数 on_message,函数将在客户端从 MQTT Broker 收到消息后被调用,并打印出订阅的 topic 名称以及接收到的消息内容。
def on_subscribe():
"""订阅主题:mqtt/demo"""
mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)
while True:
mqttClient.subscribe("application/#", 2)
time.sleep(1)
消息订阅代码
#!/usr/bin/python
from paho.mqtt import client as mqtt
import time
import json
# from settings import *
import base64
"""
网关通过mqtt发出数据
json - ok
probuf - no
"""
MQTT_SERVER_IP = "192.168.230.1"
MQTT_PORT = 1883
def on_connect(client, userdata, flags, rc):
"""一旦连接成功, 回调此方法"""
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
print("connect:",rc_status[rc])
def on_message(client, userdata, msg):
"""一旦订阅到消息, 回调此方法"""
print("主题" + msg.topic + " 消息" + str(msg.payload.decode('gbk')))
print("主题" + msg.topic + " 消息" + str(msg.payload.decode()))
try:
temp = json.loads(msg.payload.decode())
# client.disconnect()
deveui = temp['devEUI']
print("devEUI: ", deveui)
data = temp['data']
print("解码前的data为: ", data)
data_decode = base64.b64decode(data).hex()
print("解码后的data为: ", data_decode)
str1 = data_decode[4:]
if str1[0:4]=="0167":
a = int(str1[4:8],16)*0.1
print("温度:", a,"℃")
if str1[8:12]=="0268":
b = int(str1[12:16],16)
print("湿度:", b,"%RH")
elif str1[0:4]=="0268":
c = int(str1[4:8],16)
print("湿度:", c,"%RH")
except Exception as e:
print(e)
def mqtt_connect(MQTT_SERVER_IP, MQTT_PORT):
"""连接MQTT服务器"""
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
mqttClient = mqtt.Client(client_id)
mqttClient.on_connect = on_connect # 返回连接状态的回调函数
mqttClient.on_message = on_message # 返回订阅消息回调函数
MQTT_HOST = MQTT_SERVER_IP # MQTT服务器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username", "password") # mqtt服务器账号密码
mqttClient.connect(MQTT_HOST, MQTT_PORT, 60)
mqttClient.loop_start() # 启用线程连接
return mqttClient
def on_subscribe():
"""订阅主题:mqtt/demo"""
mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)
while True:
mqttClient.subscribe("application/#", 2)
# allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")
# mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)
time.sleep(1)
if __name__ == '__main__':
on_subscribe()
消息发布代码
#!/usr/bin/python
from paho.mqtt import client as mqtt
import time
import json
# from settings import *
import base64
"""
网关通过mqtt发出数据
json - ok
probuf - no
"""
MQTT_SERVER_IP = "192.168.230.1"
MQTT_PORT = 1883
def on_connect(client, userdata, flags, rc):
"""一旦连接成功, 回调此方法"""
rc_status = ["连接成功", "协议版本错误", "无效的客户端标识", "服务器无法使用", "用户名或密码错误", "无授权"]
print("connect:",rc_status[rc])
def mqtt_connect(MQTT_SERVER_IP, MQTT_PORT):
"""连接MQTT服务器"""
client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
mqttClient = mqtt.Client(client_id)
mqttClient.on_connect = on_connect # 返回连接状态的回调函数
MQTT_HOST = MQTT_SERVER_IP # MQTT服务器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username", "password") # mqtt服务器账号密码
mqttClient.connect(MQTT_HOST, MQTT_PORT, 60)
mqttClient.loop_start() # 启用线程连接
return mqttClient
def on_publish():
# 发布消息
msg_count = 0
while True:
time.sleep(1)
mqttClient = mqtt_connect(MQTT_SERVER_IP, MQTT_PORT)
topic = 'application/x/device/x/tx' # 发布的主题,订阅时需要使用这个主题才能订阅此消息
msg = '{"confirmed": true,"data": "SGVsbG8=","fPort": 10}' #需要发布的消息内容
result = mqttClient.publish(topic, msg)
status = result[0]
if status == 0:
print('第{}条消息发送成功'.format(msg_count))
else:
print('第{}条消息发送失败'.format(msg_count))
msg_count += 1
if __name__ == '__main__':
on_publish()
测试
消息发布
运行 MQTT 消息发布代码,将看到客户端连接成功,并且成功将消息发布。
消息订阅
通过瑞科慧联带温湿度传感器的 Sensor hub 进行数据传输,订阅并解析数据结果如下:
至此,我们完成了使用 paho-mqtt 客户端连接到 LoRaWAN® 网关内置 MQTT 服务器,并实现了测试客户端与 MQTT 服务器的连接、消息发布和订阅并解析。
与 C ++ 或 Java 之类的高级语言不同,Python 比较适合设备侧的业务逻辑实现。使用 Python 可以减少代码上的逻辑复杂度,降低与设备的交互成本。未来,我们相信在物联网领域 Python 将会有更广泛的应用!
全部0条评论
快来发表一下你的评论吧 !