本文用的EMQ的服务器,速度慢,请看更新文章《基于群晖服务器的高性能xbox手柄信号传输》

屏幕截图 2023-06-28 224124.png

# 使用Python编写的MQTT客户端程序,用于连接到MQTT服务器并发布消息。

# 导入所需库:json、logging、random、time和paho-mqtt库。
import json 
import logging
import random
import time
import pygame
from paho.mqtt import client as mqtt_client

# 设置MQTT服务器地址(BROKER)和端口(PORT),以及要订阅的主题(TOPIC)。
BROKER = 't1f3bfb6.ala.cn-hangzhou.emqxsl.cn'
PORT = 8883
TOPIC = "python-mqtt/tls"
# 生成一个带有pub前缀的随机client ID,设置用户名(USERNAME)和密码(PASSWORD)。
CLIENT_ID = f'python-mqtt-tls-pub-sub-{random.randint(0, 1000)}'
USERNAME = 'pi'
PASSWORD = 'raspberry'

# 定义连接失败时的重连间隔和速率(FIRST_RECONNECT_DELAY、RECONNECT_RATE)、最大重连次数(MAX_RECONNECT_COUNT)和最大重连延迟(MAX_RECONNECT_DELAY)。
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60

FLAG_EXIT = False # 定义退出标志(FLAG_EXIT)。

# on_connect函数:当客户端成功连接到MQTT服务器时调用。如果连接成功,订阅指定主题;否则打印错误信息。
def on_connect(client, userdata, flags, rc):
    if rc == 0 and client.is_connected():
        print("Connected to MQTT Broker!")
        client.subscribe(TOPIC)
    else:
        print(f'Failed to connect, return code {rc}')

# on_disconnect函数:当客户端与MQTT服务器断开连接时调用。尝试重新连接,直到达到最大重连次数。如果重连失败,将退出标志设为True。
def on_disconnect(client, userdata, rc):
    logging.info("Disconnected with result code: %s", rc)
    reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
    while reconnect_count < MAX_RECONNECT_COUNT:
        logging.info("Reconnecting in %d seconds...", reconnect_delay)
        time.sleep(reconnect_delay)
        try:
            client.reconnect()
            logging.info("Reconnected successfully!")
            return
        except Exception as err:
            logging.error("%s. Reconnect failed. Retrying...", err)

        reconnect_delay *= RECONNECT_RATE
        reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
        reconnect_count += 1
    logging.info("Reconnect failed after %s attempts. Exiting...", reconnect_count)
    global FLAG_EXIT
    FLAG_EXIT = True

# on_message函数:当客户端收到订阅主题的消息时调用。打印接收到的消息内容和主题。
def on_message(client, userdata, msg):
    print(f'Received `{msg.payload.decode()}` from `{msg.topic}` topic')

# connect_mqtt函数:创建一个MQTT客户端对象,设置TLS证书、用户名和密码,并绑定回调函数。返回客户端对象。
def connect_mqtt():
    client = mqtt_client.Client(CLIENT_ID)
    client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
    client.username_pw_set(USERNAME, PASSWORD)
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect(BROKER, PORT, keepalive=3)
    client.on_disconnect = on_disconnect
    return client

# publish函数
def publish(client):
    # 上一次发送的摇杆状态
    last_axis_data = {}
    # 循环读取手柄状态并发送到MQTT
    while not FLAG_EXIT:
        # 从事件队列中获取事件
        for event in pygame.event.get():
                if event.type == pygame.JOYAXISMOTION:
                    # 获取摇杆状态
                    axis_data = {
                        "axis": event.axis,
                        "value": round(event.value, 2)
                    }

                    # 判断摇杆状态变化是否超过阈值
                    if event.axis not in last_axis_data or abs(axis_data['value'] - last_axis_data[event.axis]) >= 0.1:
                        client.publish(TOPIC, json.dumps(axis_data))
                        last_axis_data[event.axis] = axis_data['value']

                elif event.type == pygame.JOYBUTTONDOWN or event.type == pygame.JOYBUTTONUP:
                    # 获取按键状态
                    button_data = {
                        "button": event.button,
                        "value": event.type == pygame.JOYBUTTONDOWN
                    }
                    client.publish(TOPIC, json.dumps(button_data)) 

# run函数
def run():
    # 初始化pygame
    pygame.init()
    # 等待手柄准备好
    while pygame.joystick.get_count() == 0:
        print("等待手柄连接...")
        pygame.time.wait(500)
    # 获取第一个手柄
    joystick = pygame.joystick.Joystick(0)
    joystick.init()
    # 配置日志记录,启动MQTT客户端循环,等待1秒后开始发送消息。如果连接成功,继续发送消息;否则停止循环。
    logging.basicConfig(format='%(asctime)s - %(levelname)s: %(message)s',
                        level=logging.DEBUG)
    client = connect_mqtt()
    client.loop_start()
    time.sleep(1)
    if client.is_connected():
        publish(client)
    else:
        client.loop_stop()


if __name__ == '__main__':
    run() #在主程序中,调用run函数运行程序。

发表评论