07 树莓派5 GPS数据解析
硬件连接与配置
1. 物理连接
GPS模块 树莓派5 =================== VCC -> 5V (Pin 1) GND -> GND (Pin 6) RX -> TX (GPIO14, Pin 8) TX -> RX (GPIO15, Pin 10)
2.安装解析 NMEA 格式 GPS 数据的常用库
pip install pynmea2
Python GPS数据解析程序
一定要注意自己的GPS模块的串口波特率,一般是9600,但是我的这个正点原子家的gps串口波特率是38400。
import serial import pynmea2 import time import json import socket import math from datetime import datetime, timezone class GPSParser: def __init__(self, port='/dev/ttyAMA0', baudrate=38400): self.port = port self.baudrate = baudrate self.serial_conn = None self.udp_socket = None self.udp_target = ('127.0.0.1', 5000) self.last_position = None self.data = { 'timestamp': None, 'latitude': None, 'longitude': None, 'altitude': None, 'speed': None, # 单位:米/秒 'course': None, # 航向(度) 'satellites': None, # 使用卫星数量 'hdop': None, # 水平精度因子(float) 'fix_quality': None, # 定位质量 'pdop': None, # 位置精度因子 'vdop': None, # 垂直精度因子 'distance': 0.0, # 累计距离(米) 'total_distance': 0.0 # 总距离(米) } def connect_serial(self): """连接串口设备""" try: self.serial_conn = serial.Serial( port=self.port, baudrate=self.baudrate, parity=serial.PARITY_NONE, stopbits=serial.STOPBITS_ONE, bytesize=serial.EIGHTBITS, timeout=1.0 ) print(f"成功连接到 {self.port} @ {self.baudrate} bps") return True except serial.SerialException as e: print(f"串口连接失败: {e}") return False def connect_udp(self, ip='127.0.0.1', port=5000): """配置UDP连接""" try: self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.udp_target = (ip, port) print(f"UDP目标: {ip}:{port}") return True except socket.error as e: print(f"UDP配置失败: {e}") return False def parse_nmea(self, nmea_string): """解析NMEA语句""" try: msg = pynmea2.parse(nmea_string) if isinstance(msg, pynmea2.GGA): # 全球定位系统定位数据 self.data['timestamp'] = datetime.combine( datetime.utcnow().date(), msg.timestamp ).replace(tzinfo=timezone.utc).isoformat() self.data['latitude'] = msg.latitude self.data['longitude'] = msg.longitude self.data['altitude'] = msg.altitude self.data['satellites'] = msg.num_sats self.data['hdop'] = float(msg.horizontal_dil) if msg.horizontal_dil else None # 确保是float self.data['fix_quality'] = msg.gps_qual return 'GGA' elif isinstance(msg, pynmea2.RMC): # 推荐最小定位信息 if msg.status == 'A': # 有效定位 self.data['speed'] = msg.spd_over_grnd * 0.51444 # 节转米/秒 self.data['course'] = msg.true_course self._calculate_distance() return 'RMC' return None elif isinstance(msg, pynmea2.GSA): # 当前卫星信息 self.data['pdop'] = float(msg.pdop) if msg.pdop else None self.data['hdop'] = float(msg.hdop) if msg.hdop else None self.data['vdop'] = float(msg.vdop) if msg.vdop else None return 'GSA' elif isinstance(msg, pynmea2.GSV): # 可见卫星信息 return 'GSV' except pynmea2.ParseError: return None except Exception as e: print(f"解析错误: {e}") return None def _calculate_distance(self): """计算距离(使用Haversine公式)""" if None in (self.data['latitude'], self.data['longitude']): return if self.last_position is None: self.last_position = (self.data['latitude'], self.data['longitude']) return lat1, lon1 = self.last_position lat2, lon2 = self.data['latitude'], self.data['longitude'] # 将角度转换为弧度 lat1_rad = math.radians(lat1) lon1_rad = math.radians(lon1) lat2_rad = math.radians(lat2) lon2_rad = math.radians(lon2) # 地球半径(米) R = 6371000 # 计算差值 dlat = lat2_rad - lat1_rad dlon = lon2_rad - lon1_rad # Haversine公式 a = math.sin(dlat/2)**2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon/2)**2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) distance = R * c self.data['distance'] = distance self.data['total_distance'] += distance self.last_position = (lat2, lon2) def send_data(self): """通过UDP发送JSON格式数据""" if not self.udp_socket: return False try: json_data = json.dumps(self.data, ensure_ascii=False) self.udp_socket.sendto(json_data.encode('utf-8'), self.udp_target) return True except Exception as e: print(f"UDP发送失败: {e}") return False def get_status(self): """获取GPS状态信息""" status = { 'connected': self.serial_conn is not None and self.serial_conn.is_open, 'valid_fix': self.data['fix_quality'] is not None and self.data['fix_quality'] > 0, 'satellites': self.data['satellites'], 'hdop': self.data['hdop'], 'position_accuracy': "未知" } if isinstance(status['hdop'], (int, float)): # 根据HDOP估算精度 if status['hdop'] <= 1: status['position_accuracy'] = "极佳 (<2.5m)" elif status['hdop'] <= 2: status['position_accuracy'] = "良好 (2.5-5m)" elif status['hdop'] <= 5: status['position_accuracy'] = "一般 (5-10m)" else: status['position_accuracy'] = "较差 (>10m)" return status def run(self, udp_ip='127.0.0.1', udp_port=5000, output_interval=1.0): """主运行循环""" if not self.connect_serial(): return False if not self.connect_udp(udp_ip, udp_port): return False print("开始接收GPS数据...") print("按Ctrl+C退出") last_output = time.time() last_status = time.time() try: while True: # 读取串口数据 try: raw_data = self.serial_conn.readline().decode('utf-8', errors='ignore').strip() except serial.SerialException as e: print(f"串口读取错误: {e}") time.sleep(1) continue # 解析NMEA语句 if raw_data.startswith('$'): sentence_type = self.parse_nmea(raw_data) # 每秒输出一次状态 current_time = time.time() if current_time - last_status >= 1.0: status = self.get_status() if status['valid_fix']: fix_status = ( f"定位有效 | 卫星: {status['satellites']} | " f"HDOP: {status['hdop']:.1f} | " f"精度: {status['position_accuracy']}" ) else: fix_status = "等待定位..." print(f"\r状态: {fix_status}", end='', flush=True) last_status = current_time # 定时发送数据 current_time = time.time() if current_time - last_output >= output_interval: if self.send_data(): last_output = current_time except KeyboardInterrupt: print("\n程序已终止") finally: if self.serial_conn and self.serial_conn.is_open: self.serial_conn.close() if self.udp_socket: self.udp_socket.close() def main(): # 创建GPS解析器 gps = GPSParser() # 配置参数 udp_ip = '127.0.0.1' # 目标IP地址 udp_port = 5000 # 目标端口 output_interval = 0.5 # 数据发送间隔(秒) # 启动主循环 gps.run(udp_ip, udp_port, output_interval) if __name__ == "__main__": main()
成果验证
#端口监听 nc -ul 5000
由于在室内收不到星,无法解析出经纬度,但是可以看到时间已经解析出来了。
将gps移到窗户旁,成功搜到星。
可以看到已经成功解析出经纬度。