
# 使用Python编写的MQTT客户端程序,用于连接到MQTT服务器并发布消息。
# 导入所需库:json、logging、random、time和paho-mqtt库。
import json
import logging
import random
import time
import pygame
from paho.mqtt import client as mqtt_client
# 设置MQTT服务器地址(BROKER)和端口(PORT),以及要订阅的主题(TOPIC)。
BROKER = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
PORT = 8883
TOPIC = "python-mqtt/tls"
# 生成一个带有pub前缀的随机client ID,设置用户名(USERNAME)和密码(PASSWORD)。
CLIENT_ID = f'python-mqtt-tls-pub-sub-{random.randint(0, 1000)}'
USERNAME = 'pi'
PASSWORD = 'raspberry'
# 定义连接失败时的重连间隔和速率(FIRST_RECONNECT_DELAY、RECONNECT_RATE)、最大重连次数(MAX_RECONNECT_COUNT)和最大重连延迟(MAX_RECONNECT_DELAY)。
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
FLAG_EXIT = False # 定义退出标志(FLAG_EXIT)。
# on_connect函数:当客户端成功连接到MQTT服务器时调用。如果连接成功,订阅指定主题;否则打印错误信息。
def on_connect(client, userdata, flags, rc):
if rc == 0 and client.is_connected():
print("Connected to MQTT Broker!")
client.subscribe(TOPIC)
else:
print(f'Failed to connect, return code {rc}')
# on_disconnect函数:当客户端与MQTT服务器断开连接时调用。尝试重新连接,直到达到最大重连次数。如果重连失败,将退出标志设为True。
def on_disconnect(client, userdata, rc):
logging.info("Disconnected with result code: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("Reconnecting in %d seconds...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("Reconnected successfully!")
return
except Exception as err:
logging.error("%s. Reconnect failed. Retrying...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
global FLAG_EXIT
FLAG_EXIT = True
# on_message函数:当客户端收到订阅主题的消息时调用。打印接收到的消息内容和主题。
def on_message(client, userdata, msg):
print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')
# connect_mqtt函数:创建一个MQTT客户端对象,设置TLS证书、用户名和密码,并绑定回调函数。返回客户端对象。
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
client.username_pw_set(USERNAME, PASSWORD)
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, PORT, keepalive=3)
client.on_disconnect = on_disconnect
return client
# publish函数
def publish(client):
# 上一次发送的摇杆状态
last_axis_data = {}
# 循环读取手柄状态并发送到MQTT
while not FLAG_EXIT:
# 从事件队列中获取事件
for event in pygame.event.get():
if event.type == pygame.JOYAXISMOTION:
# 获取摇杆状态
axis_data = {
"axis": event.axis,
"value": round(event.value, 2)
}
# 判断摇杆状态变化是否超过阈值
if event.axis not in last_axis_data or abs(axis_data['value'] - last_axis_data[event.axis]) >= 0.1:
client.publish(TOPIC, json.dumps(axis_data))
last_axis_data[event.axis] = axis_data['value']
elif event.type == pygame.JOYBUTTONDOWN or event.type == pygame.JOYBUTTONUP:
# 获取按键状态
button_data = {
"button": event.button,
"value": event.type == pygame.JOYBUTTONDOWN
}
client.publish(TOPIC, json.dumps(button_data))
# run函数
def run():
# 初始化pygame
pygame.init()
# 等待手柄准备好
while pygame.joystick.get_count() == 0:
print("等待手柄连接...")
pygame.time.wait(500)
# 获取第一个手柄
joystick = pygame.joystick.Joystick(0)
joystick.init()
# 配置日志记录,启动MQTT客户端循环,等待1秒后开始发送消息。如果连接成功,继续发送消息;否则停止循环。
logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
level=logging.DEBUG)
client = connect_mqtt()
client.loop_start()
time.sleep(1)
if client.is_connected():
publish(client)
else:
client.loop_stop()
if __name__ == '__main__':
run() #在主程序中,调用run函数运行程序。