https://cloud.emqx.com/console/微信扫描登陆
订阅与发布.png

MQTT 发布/订阅模式

发布订阅模式(Publish-Subscribe Pattern)是一种消息传递模式,它将发送消息的客户端(发布者)与接收消息的客户端(订阅者)解耦,使得两者不需要建立直接的联系也不需要知道对方的存在。

MQTT 发布/订阅模式的精髓在于由一个被称为代理(Broker)的中间角色负责所有消息的路由和分发工作,发布者将带有主题的消息发送给代理,订阅者则向代理订阅主题来接收感兴趣的消息。

在 MQTT 中,主题和订阅无法被提前注册或创建,所以代理也无法预知某一个主题之后是否会有订阅者,以及会有多少订阅者,所以只能将消息转发给当前的订阅者,如果当前不存在任何订阅,那么消息将被直接丢弃。

MQTT 发布/订阅模式有 4 个主要组成部分:发布者、订阅者、代理和主题。

发布者(Publisher)

负责将消息发布到主题上,发布者一次只能向一个主题发送数据,发布者发布消息时也无需关心订阅者是否在线。

订阅者(Subscriber)

订阅者通过订阅主题接收消息,且可一次订阅多个主题。MQTT 还支持通过共享订阅的方式在多个订阅者之间实现订阅的负载均衡。

代理(Broker)

负责接收发布者的消息,并将消息转发至符合条件的订阅者。另外,代理也需要负责处理客户端发起的连接、断开连接、订阅、取消订阅等请求。

主题(Topic)

主题是 MQTT 进行消息路由的基础,它类似 URL 路径,使用斜杠 / 进行分层,比如 sensor/1/temperature。一个主题可以有多个订阅者,代理会将该主题下的消息转发给所有订阅者;一个主题也可以有多个发布者,代理将按照消息到达的顺序转发。

如何在Python中使用基于MQTT

https://docs.emqx.com/zh/cloud/latest/connect_to_deployments/python_sdk.html#%E8%AE%A2%E9%98%85%E5%92%8C%E5%8F%91%E5%B8%83
更多程序参考:https://github.com/emqx/MQTT-Client-Examples/tree/master/mqtt-client-Python3

1. 将CA 证书文件放入到文件目录下

broker.emqx.io-ca.rar

2.新建sub.py和pub.py

# 消息订阅代码sub.py

import random

from paho.mqtt import client as mqtt_client


broker = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
port = 8883
topic = "python-mqtt/tls"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
username = 'pi'
password = 'raspberry'


def connect_mqtt():
    client = mqtt_client.Client(client_id)
    client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
    client.username_pw_set(username, password)
    client.connect(broker, port, keepalive=3)
    return client


def subscribe(client: mqtt_client):
    def on_message(client, userdata, msg):
        print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")

    client.subscribe(topic)
    client.on_message = on_message


def run():
    client = connect_mqtt()
    subscribe(client)
    client.loop_forever()


if __name__ == '__main__':
    run()
# 消息发布代码pub.py

import random
import time

from paho.mqtt import client as mqtt_client


broker = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
port = 8883
topic = "python-mqtt/tls"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
username = 'pi'
password = 'raspberry'


def connect_mqtt():
    client = mqtt_client.Client(client_id)
    client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
    client.username_pw_set(username, password)
    client.connect(broker, port, keepalive=3)
    return client

def publish(client):
    msg_count = 0
    while True:
        time.sleep(1)
        msg = f"messages: {msg_count}"
        result = client.publish(topic, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f"Send `{msg}` to topic `{topic}`")
        else:
            print(f"Failed to send message to topic {topic}")
        msg_count += 1


def run():
    client = connect_mqtt()
    client.loop_start()
    publish(client)


if __name__ == '__main__':
    run()

进阶程序

以上是基本的发布与订阅程序,实际生产环境还需要解决断线重连、同时具有订阅与发布能力等功能,以下程序将解决这些问题。

# 使用Python编写的MQTT客户端程序,用于连接到MQTT服务器并发布消息。

# 导入所需库:json、logging、random、time和paho-mqtt库。
import json 
import logging
import random
import time

from paho.mqtt import client as mqtt_client

# 设置MQTT服务器地址(BROKER)和端口(PORT),以及要订阅的主题(TOPIC)。
BROKER = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
PORT = 8883
TOPIC = "python-mqtt/tls"
# 生成一个带有pub前缀的随机client ID,设置用户名(USERNAME)和密码(PASSWORD)。
CLIENT_ID = f'python-mqtt-tls-pub-sub-{random.randint(0, 1000)}'
USERNAME = 'pi'
PASSWORD = 'raspberry'

# 定义连接失败时的重连间隔和速率(FIRST_RECONNECT_DELAY、RECONNECT_RATE)、最大重连次数(MAX_RECONNECT_COUNT)和最大重连延迟(MAX_RECONNECT_DELAY)。
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60

FLAG_EXIT = False # 定义退出标志(FLAG_EXIT)。

# on_connect函数:当客户端成功连接到MQTT服务器时调用。如果连接成功,订阅指定主题;否则打印错误信息。
def on_connect(client, userdata, flags, rc):
    if rc == 0 and client.is_connected():
        print("Connected to MQTT Broker!")
        client.subscribe(TOPIC)
    else:
        print(f'Failed to connect, return code {rc}')

# on_disconnect函数:当客户端与MQTT服务器断开连接时调用。尝试重新连接,直到达到最大重连次数。如果重连失败,将退出标志设为True。
def on_disconnect(client, userdata, rc):
    logging.info("Disconnected with result code: %s", rc)
    reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
    while reconnect_count < MAX_RECONNECT_COUNT:
        logging.info("Reconnecting in %d seconds...", reconnect_delay)
        time.sleep(reconnect_delay)

        try:
            client.reconnect()
            logging.info("Reconnected successfully!")
            return
        except Exception as err:
            logging.error("%s. Reconnect failed. Retrying...", err)

        reconnect_delay *= RECONNECT_RATE
        reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
        reconnect_count += 1
    logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
    global FLAG_EXIT
    FLAG_EXIT = True

# on_message函数:当客户端收到订阅主题的消息时调用。打印接收到的消息内容和主题。
def on_message(client, userdata, msg):
    print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')

# connect_mqtt函数:创建一个MQTT客户端对象,设置TLS证书、用户名和密码,并绑定回调函数。返回客户端对象。
def connect_mqtt():
    client = mqtt_client.Client(CLIENT_ID)
    client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER, PORT, keepalive=3)
    client.on_disconnect = on_disconnect
    return client

# publish函数:持续发送消息,直到退出标志被设置为True。每隔1秒发送一条消息,消息内容为当前计数值。
def publish(client):
    msg_count = 0
    while not FLAG_EXIT:
        msg_dict = {
            'msg': msg_count
        }
        msg = json.dumps(msg_dict)
        if not client.is_connected():
            logging.error("publish: MQTT client is not connected!")
            time.sleep(1)
            continue
        result = client.publish(TOPIC, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f'Send `{msg}` to topic `{TOPIC}`')
        else:
            print(f'Failed to send message to topic {TOPIC}')
        msg_count += 1
        time.sleep(1)

# run函数:配置日志记录,启动MQTT客户端循环,等待1秒后开始发送消息。如果连接成功,继续发送消息;否则停止循环。
def run():
    logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    client = connect_mqtt()
    client.loop_start()
    time.sleep(1)
    if client.is_connected():
        publish(client)
    else:
        client.loop_stop()


if __name__ == '__main__':
    run() #在主程序中,调用run函数运行程序。

发表评论