在群晖上基于Mosquitto搭建MQTT服务器
首先,在套件中心搜索mosquitto或者直接搜mqtt,如图,并安装。
然后,在路由器上做好端口映射,将外网1883端口映射到群晖192.168.1.118的1883端口上,无用户名密码验证。
python实现 订阅 发布
订阅者
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
#设置用户名和密码
client.username_pw_set("mosquitto", "mosquitto")
client.on_connect = on_connect
client.on_message = on_message
#client.on_disconnect = on_disconnect
#连接 IP port keepalive
client.connect('nbzch.cn', 1883, 600)
#订阅的 topic
client.subscribe('test', qos=0)
client.loop_forever()
发布者
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
def on_connect(client, userdata, flags, rc):
print("Connected with result code: " + str(rc))
def on_message(client, userdata, msg):
print(msg.topic + " " + str(msg.payload))
client = mqtt.Client()
#设置用户名和密码
client.username_pw_set("mosquitto", "mosquitto")
client.on_connect = on_connect
client.on_message = on_message
#连接 IP port keepalive
client.connect('nbzch.cn', 1883, 600)
#发布 topic 内容
client.publish('test', payload='amazing', qos=0)
先启动订阅者,再启动发布者。 订阅者连接成功会打印 Connected with result code: 0,返回值是0。 订阅者收到 topic:test,内容:amazing
test b'amazing'
进阶程序
以上是基本的发布与订阅程序,实际生产环境还需要解决断线重连、同时具有订阅与发布能力等功能,以下程序将解决这些问题。
# python 3.6
import json
import logging
import random
import time
from paho.mqtt import client as mqtt_client
BROKER = 'nbzch.cn'
PORT = 1883
TOPIC = "python-mqtt/tcp"
# generate client ID with pub prefix randomly
CLIENT_ID = f'python-mqtt-tcp-pub-sub-{random.randint(0, 1000)}'
USERNAME = 'emqx'
PASSWORD = 'public'
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
FLAG_EXIT = False
def on_connect(client, userdata, flags, rc):
if rc == 0 and client.is_connected():
print("Connected to MQTT Broker!")
client.subscribe(TOPIC)
else:
print(f'Failed to connect, return code {rc}')
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
def on_message(client, userdata, msg):
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=3)
client.on_disconnect = on_disconnect
return client
def publish(client):
msg_count = 0
while not FLAG_EXIT:
msg_dict = {
'msg': msg_count
}
msg = json.dumps(msg_dict)
if not client.is_connected():
logging.error("publish: MQTT client is not connected!")
time.sleep(1)
continue
result = client.publish(TOPIC, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f'Send `{msg}` to topic `{TOPIC}`')
else:
print(f'Failed to send message to topic {TOPIC}')
msg_count += 1
time.sleep(1)
def run():
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
level=logging.DEBUG)
client = connect_mqtt()
client.loop_start()
time.sleep(1)
if client.is_connected():
publish(client)
else:
client.loop_stop()
if __name__ == '__main__':
run()