在群晖上基于Mosquitto搭建MQTT服务器

首先,在套件中心搜索mosquitto或者直接搜mqtt,如图,并安装。
QQ截图20230628232632.png

然后,在路由器上做好端口映射,将外网1883端口映射到群晖192.168.1.118的1883端口上,无用户名密码验证。

python实现 订阅 发布

订阅者

# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
#设置用户名和密码
client.username_pw_set("mosquitto", "mosquitto")
client.on_connect = on_connect
client.on_message = on_message
#client.on_disconnect = on_disconnect
#连接 IP port keepalive
client.connect('nbzch.cn', 1883, 600)
#订阅的 topic
client.subscribe('test', qos=0)
client.loop_forever()

发布者

# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

def on_connect(client, userdata, flags, rc):
    print("Connected with result code: " + str(rc))

def on_message(client, userdata, msg):
    print(msg.topic + " " + str(msg.payload))

client = mqtt.Client()
#设置用户名和密码
client.username_pw_set("mosquitto", "mosquitto")
client.on_connect = on_connect
client.on_message = on_message
#连接 IP port keepalive
client.connect('nbzch.cn', 1883, 600)
#发布 topic 内容
client.publish('test', payload='amazing', qos=0)

先启动订阅者,再启动发布者。 订阅者连接成功会打印 Connected with result code: 0,返回值是0。 订阅者收到 topic:test,内容:amazing

test b'amazing'

进阶程序

以上是基本的发布与订阅程序,实际生产环境还需要解决断线重连、同时具有订阅与发布能力等功能,以下程序将解决这些问题。

# python 3.6

import json
import logging
import random
import time

from paho.mqtt import client as mqtt_client

BROKER = 'nbzch.cn'
PORT = 1883
TOPIC = "python-mqtt/tcp"
# generate client ID with pub prefix randomly
CLIENT_ID = f'python-mqtt-tcp-pub-sub-{random.randint(0, 1000)}'
USERNAME = 'emqx'
PASSWORD = 'public'

FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60

FLAG_EXIT = False

def on_connect(client, userdata, flags, rc):
    if rc == 0 and client.is_connected():
        print("Connected to MQTT Broker!")
        client.subscribe(TOPIC)
    else:
        print(f'Failed to connect, return code {rc}')

def on_disconnect(client, userdata, rc):
    logging.info("Disconnected with result code: %s", rc)
    reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
    while reconnect_count < MAX_RECONNECT_COUNT:
        logging.info("Reconnecting in %d seconds...", reconnect_delay)
        time.sleep(reconnect_delay)

        try:
            client.reconnect()
            logging.info("Reconnected successfully!")
            return
        except Exception as err:
            logging.error("%s. Reconnect failed. Retrying...", err)

        reconnect_delay *= RECONNECT_RATE
        reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
        reconnect_count += 1
    logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
    global FLAG_EXIT
    FLAG_EXIT = True

def on_message(client, userdata, msg):
    print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')

def connect_mqtt():
    client = mqtt_client.Client(CLIENT_ID)
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER, PORT, keepalive=3)
    client.on_disconnect = on_disconnect
    return client

def publish(client):
    msg_count = 0
    while not FLAG_EXIT:
        msg_dict = {
            'msg': msg_count
        }
        msg = json.dumps(msg_dict)
        if not client.is_connected():
            logging.error("publish: MQTT client is not connected!")
            time.sleep(1)
            continue
        result = client.publish(TOPIC, msg)
        # result: [0, 1]
        status = result[0]
        if status == 0:
            print(f'Send `{msg}` to topic `{TOPIC}`')
        else:
            print(f'Failed to send message to topic {TOPIC}')
        msg_count += 1
        time.sleep(1)

def run():
    logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    client = connect_mqtt()
    client.loop_start()
    time.sleep(1)
    if client.is_connected():
        publish(client)
    else:
        client.loop_stop()

if __name__ == '__main__':
    run()

发表评论