https://cloud.emqx.com/console/微信扫描登陆
MQTT 发布/订阅模式
发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。
MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。
在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。
MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。
发布者(Publisher)
负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。
订阅者(Subscriber)
订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过共享订阅的方式在多个订阅者之间实现订阅的负载均衡。
代理(Broker)
负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。
主题(Topic)
主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。
如何在Python中使用基于MQTT
https://docs.emqx.com/zh/cloud/latest/connect_to_deployments/python_sdk.html#%E8%AE%A2%E9%98%85%E5%92%8C%E5%8F%91%E5%B8%83
更多程序参考:https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Python3
1. 将CA 证书文件放入到文件目录下
2.新建sub.py和pub.py
# 消息订阅代码sub.py
import random
from paho.mqtt import client as mqtt_client
broker = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
port = 8883
topic = "python-mqtt/tls"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'pi'
password = 'raspberry'
def connect_mqtt():
client = mqtt_client.Client(client_id)
client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
client.username_pw_set(username, password)
client.connect(broker, port, keepalive=3)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
# 消息发布代码pub.py
import random
import time
from paho.mqtt import client as mqtt_client
broker = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
port = 8883
topic = "python-mqtt/tls"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'pi'
password = 'raspberry'
def connect_mqtt():
client = mqtt_client.Client(client_id)
client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
client.username_pw_set(username, password)
client.connect(broker, port, keepalive=3)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
进阶程序
以上是基本的发布与订阅程序,实际生产环境还需要解决断线重连、同时具有订阅与发布能力等功能,以下程序将解决这些问题。
# 使用Python编写的MQTT客户端程序,用于连接到MQTT服务器并发布消息。
# 导入所需库:json、logging、random、time和paho-mqtt库。
import json
import logging
import random
import time
from paho.mqtt import client as mqtt_client
# 设置MQTT服务器地址(BROKER)和端口(PORT),以及要订阅的主题(TOPIC)。
BROKER = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
PORT = 8883
TOPIC = "python-mqtt/tls"
# 生成一个带有pub前缀的随机client ID,设置用户名(USERNAME)和密码(PASSWORD)。
CLIENT_ID = f'python-mqtt-tls-pub-sub-{random.randint(0, 1000)}'
USERNAME = 'pi'
PASSWORD = 'raspberry'
# 定义连接失败时的重连间隔和速率(FIRST_RECONNECT_DELAY、RECONNECT_RATE)、最大重连次数(MAX_RECONNECT_COUNT)和最大重连延迟(MAX_RECONNECT_DELAY)。
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
FLAG_EXIT = False # 定义退出标志(FLAG_EXIT)。
# on_connect函数:当客户端成功连接到MQTT服务器时调用。如果连接成功,订阅指定主题;否则打印错误信息。
def on_connect(client, userdata, flags, rc):
if rc == 0 and client.is_connected():
print("Connected to MQTT Broker!")
client.subscribe(TOPIC)
else:
print(f'Failed to connect, return code {rc}')
# on_disconnect函数:当客户端与MQTT服务器断开连接时调用。尝试重新连接,直到达到最大重连次数。如果重连失败,将退出标志设为True。
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
# on_message函数:当客户端收到订阅主题的消息时调用。打印接收到的消息内容和主题。
def on_message(client, userdata, msg):
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
# connect_mqtt函数:创建一个MQTT客户端对象,设置TLS证书、用户名和密码,并绑定回调函数。返回客户端对象。
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=3)
client.on_disconnect = on_disconnect
return client
# publish函数:持续发送消息,直到退出标志被设置为True。每隔1秒发送一条消息,消息内容为当前计数值。
def publish(client):
msg_count = 0
while not FLAG_EXIT:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
if not client.is_connected():
logging.error("publish: MQTT client is not connected!")
time.sleep(1)
continue
result = client.publish(TOPIC, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f'Send `{msg}` to topic `{TOPIC}`')
else:
print(f'Failed to send message to topic {TOPIC}')
msg_count += 1
time.sleep(1)
# run函数:配置日志记录,启动MQTT客户端循环,等待1秒后开始发送消息。如果连接成功,继续发送消息;否则停止循环。
def run():
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
level=logging.DEBUG)
client = connect_mqtt()
client.loop_start()
time.sleep(1)
if client.is_connected():
publish(client)
else:
client.loop_stop()
if __name__ == '__main__':
run() #在主程序中,调用run函数运行程序。