Xedu手的识别,pwm输出,mqtt通信相结合。mqtt只在信息改变时发送。
gpt:
请编写程序,实现以下功能:
- 摄像头中手的位置位于画面上半部分时,通过mqtt协议向nbzch.cn发送主题为“led”信息为“on”,pwm输出200。
- 摄像头中手的位置位于画面下半部分时,通过mqtt协议向nbzch.cn发送主题为“led”信息为“off”,pwm输出100。
- on和off只需要在改变后重新发送,没改变无需重复发送。
只需要在手的位置状态发生改变时才发送MQTT消息,这样可以减少不必要的通信。你可以通过添加一个变量来保存上一次发送的状态,然后在手的位置状态发生改变时才发送新的MQTT消息。下面是修改后的代码:
python程序
import cv2
import paho.mqtt.client as mqtt
from XEdu.hub import Workflow as wf
from pyfirmata import Arduino, util
import time
# MQTT Broker的地址和端口
broker_address = "nbzch.cn"
broker_port = 1883
# MQTT Topic
led_topic = "led"
# 初始化摄像头
cap = cv2.VideoCapture(0)
# 初始化手部检测模型
det = wf(task='det_hand')
hand = wf(task='pose_hand21')
# 初始化Arduino板
board = Arduino('COM4')
# 获取PWM引脚
pin3 = board.get_pin('d:3:p')
def set_pwm(pin, value):
board.digital[pin].write(value / 255.0)
# MQTT连接回调函数
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
# 订阅LED主题
client.subscribe(led_topic)
# MQTT接收消息回调函数
def on_message(client, userdata, msg):
print(msg.topic+" "+str(msg.payload))
# 初始化MQTT客户端
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# 连接MQTT Broker
client.connect(broker_address, broker_port, 60)
# 开启MQTT客户端循环
client.loop_start()
# 初始化上一次发送的状态
last_status = None
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
bboxs = det.inference(data=frame, thr=0.3)
img = frame
for i in bboxs:
keypoints, img = hand.inference(data=frame, img_type='cv2', bbox=i)
# 获取手部关键点的坐标
x, y = keypoints[0][0], keypoints[0][1]
# 获取摄像头画面的高度
height = img.shape[0]
# 根据手部位置计算PWM输出值和MQTT消息
if y <= height // 2: # 上半部分
pwm_value = 200
new_status = "on"
else: # 下半部分
pwm_value = 100
new_status = "off"
# 发送MQTT消息
if new_status != last_status:
client.publish(led_topic, new_status)
last_status = new_status
# 设置PWM输出
set_pwm(3, pwm_value)
cv2.imshow('video', img)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
# 停止MQTT客户端循环
client.loop_stop()
# 断开MQTT连接
client.disconnect()
# 释放摄像头资源
cap.release()
cv2.destroyAllWindows()
esp32程序
#include <WiFi.h>
#include <PubSubClient.h>
const char* ssid = "42101";
const char* password = "15657859912";
const char* mqttServer = "nbzch.cn";
const int mqttPort = 1883;
const char* mqttUser = "Your_MQTT_Username";
const char* mqttPassword = "Your_MQTT_Password";
const int ledPin = 2;
WiFiClient espClient;
PubSubClient client(espClient);
void setup() {
pinMode(ledPin, OUTPUT);
digitalWrite(ledPin, LOW);
Serial.begin(115200);
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
Serial.println("Connecting to WiFi...");
}
Serial.println("Connected to WiFi");
digitalWrite(ledPin, HIGH);
delay(2000);
digitalWrite(ledPin, LOW);
client.setServer(mqttServer, mqttPort);
client.setCallback(callback);
while (!client.connected()) {
if (client.connect("ESP32Client", mqttUser, mqttPassword )) {
Serial.println("Connected to MQTT broker");
client.subscribe("led");
} else {
Serial.print("Failed to connect to MQTT broker, rc=");
Serial.print(client.state());
Serial.println(" Retrying in 5 seconds...");
delay(5000);
}
}
}
void callback(char* topic, byte* payload, unsigned int length) {
String message;
for (unsigned int i = 0; i < length; i++) {
message += (char)payload[i];
}
Serial.print("Message received on topic '");
Serial.print(topic);
Serial.print("': ");
Serial.println(message);
if (strcmp(topic, "led") == 0) {
if (message.equals("on")) {
digitalWrite(ledPin, HIGH);
Serial.println("LED turned ON");
} else if (message.equals("off")) {
digitalWrite(ledPin, LOW);
Serial.println("LED turned OFF");
}
}
}
void reconnect() {
while (!client.connected()) {
Serial.println("Attempting MQTT connection...");
if (client.connect("ESP32Client", mqttUser, mqttPassword )) {
Serial.println("Connected to MQTT broker");
client.subscribe("led");
} else {
Serial.print("Failed to connect to MQTT broker, rc=");
Serial.print(client.state());
Serial.println(" Retrying in 5 seconds...");
delay(5000);
}
}
}
void loop() {
if (!client.connected()) {
reconnect();
}
client.loop();
}