E108-GN03+Series_UserManual_CN_V1.2.pdf
E108-GN03D GNSS多模卫星定位模块
芯片方案:AT6558R
通信接口:UART
工作电压:2.7-3.6V
通信协议:NMEA0183 V4.1及以前版本
产品尺寸:20.022.07.8mm
产品简介:E108-GN03D是一款高性能、高集成度、低功耗、低成本的多模卫星定位导航模块,基于AT6558R方案,可用于车载导航、智能穿戴、无人机等GNSS定位导航的应用中,支持BDS/GPS/GLONASS,体积小、功耗低,而且提供了和其他模块厂商兼容的软、硬件接口,大幅减少了用户的开发周期。
import serial
import re
import json
import paho.mqtt.client as mqtt
def parse_nmea_sentence(nmea):
"""Parses an NMEA sentence and extracts latitude and longitude."""
# Check for GNGGA or GNRMC sentences which contain latitude and longitude
if nmea.startswith("$GNGGA") or nmea.startswith("$GNRMC"):
parts = nmea.split(",")
try:
# Latitude
raw_lat = parts[2]
lat_direction = parts[3]
# Longitude
raw_lon = parts[4]
lon_direction = parts[5]
# Convert latitude and longitude to decimal degrees
lat = convert_to_decimal_degrees(raw_lat, lat_direction)
lon = convert_to_decimal_degrees(raw_lon, lon_direction)
return lat, lon
except (IndexError, ValueError):
return None, None
return None, None
def convert_to_decimal_degrees(raw_value, direction):
"""Converts raw NMEA coordinate format to decimal degrees."""
if not raw_value or not direction:
return None
# Degrees are the first two digits for latitude, first three for longitude
if len(raw_value) < 4:
return None
if direction in ['N', 'S']:
degrees = int(raw_value[:2])
minutes = float(raw_value[2:])
elif direction in ['E', 'W']:
degrees = int(raw_value[:3])
minutes = float(raw_value[3:])
else:
return None
decimal_degrees = degrees + minutes / 60
if direction in ['S', 'W']:
decimal_degrees = -decimal_degrees
return decimal_degrees
def publish_to_mqtt(lat, lon):
"""Publishes latitude and longitude to an MQTT server."""
mqtt_broker = "nbzch.cn"
topic = "gps/location"
payload = {
"latitude": lat,
"longitude": lon
}
client = mqtt.Client()
try:
client.connect(mqtt_broker, 1883, 60)
client.publish(topic, json.dumps(payload))
print(f"Published to MQTT: {payload}")
except Exception as e:
print(f"Error publishing to MQTT: {e}")
def read_nmea_from_serial(port):
"""Reads and processes NMEA sentences from a serial port."""
try:
with serial.Serial(port, baudrate=9600, timeout=1) as ser:
print(f"Listening on {port}...")
while True:
line = ser.readline().decode('ascii', errors='ignore').strip()
if line:
lat, lon = parse_nmea_sentence(line)
if lat is not None and lon is not None:
print(f"Latitude: {lat:.6f}, Longitude: {lon:.6f}")
publish_to_mqtt(lat, lon)
except serial.SerialException as e:
print(f"Error accessing serial port: {e}")
if __name__ == "__main__":
read_nmea_from_serial("COM4")