Files
AVM360/test_serial.py

208 lines
7.6 KiB
Python
Raw Normal View History

2026-04-01 14:11:47 +08:00
import serial
import sys
import time
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext
# ==========================================================
# 串口配置
# ==========================================================
SERIAL_PORT = "/dev/ttyS3"
BAUD_RATE = 115200
TIMEOUT = 1
class SerialHexViewer:
def __init__(self):
self.ser = None
self.running = False
self.rx_thread = None
# ---- 主窗口 ----
self.root = tk.Tk()
self.root.title("串口 HEX 接收器 - {}".format(SERIAL_PORT))
self.root.geometry("700x500")
# ---- 顶部配置栏 ----
top = tk.Frame(self.root)
top.pack(fill='x', padx=8, pady=6)
tk.Label(top, text="串口:").pack(side='left')
self.port_entry = tk.Entry(top, width=12, justify='center')
self.port_entry.insert(0, SERIAL_PORT)
self.port_entry.pack(side='left', padx=4)
tk.Label(top, text="波特率:").pack(side='left')
self.baud_cb = ttk.Combobox(top, width=10,
values=["9600","19200","38400","57600","115200","230400","460800","921600"])
self.baud_cb.set(str(BAUD_RATE))
self.baud_cb.pack(side='left', padx=4)
tk.Label(top, text="数据位:").pack(side='left')
self.data_cb = ttk.Combobox(top, width=4, values=["5","6","7","8"])
self.data_cb.set("8")
self.data_cb.pack(side='left', padx=2)
tk.Label(top, text="停止位:").pack(side='left')
self.stop_cb = ttk.Combobox(top, width=4, values=["1","1.5","2"])
self.stop_cb.set("1")
self.stop_cb.pack(side='left', padx=2)
tk.Label(top, text="校验:").pack(side='left')
self.parity_cb = ttk.Combobox(top, width=6, values=["None","Odd","Even"])
self.parity_cb.set("None")
self.parity_cb.pack(side='left', padx=2)
# ---- 按钮 ----
btn_frame = tk.Frame(self.root)
btn_frame.pack(fill='x', padx=8, pady=2)
self.btn_open = tk.Button(btn_frame, text="打开串口", bg="#2ecc71", fg="white",
width=12, command=self.toggle_port)
self.btn_open.pack(side='left', padx=4)
tk.Button(btn_frame, text="清空显示", bg="#3498db", fg="white",
width=12, command=self.clear_display).pack(side='left', padx=4)
tk.Button(btn_frame, text="退出", bg="#e74c3c", fg="white",
width=8, command=self.shutdown).pack(side='right', padx=4)
# 显示格式选择
self.show_ascii = tk.BooleanVar(value=True)
tk.Checkbutton(btn_frame, text="同时显示 ASCII",
variable=self.show_ascii).pack(side='left', padx=8)
ttk.Separator(self.root, orient='horizontal').pack(fill='x')
# ---- HEX 显示区 ----
self.text_area = scrolledtext.ScrolledText(
self.root, font=("Courier New", 11),
bg="#1e1e1e", fg="#00ff00",
insertbackground="white",
state='disabled'
)
self.text_area.pack(fill='both', expand=True, padx=8, pady=6)
# ---- 状态栏 ----
self.status_var = tk.StringVar(value="状态: 未连接")
tk.Label(self.root, textvariable=self.status_var,
anchor='w', fg="gray").pack(fill='x', padx=8, pady=2)
self.rx_count = 0
# ----------------------------------------------------------
def toggle_port(self):
if not self.running:
self.open_port()
else:
self.close_port()
def open_port(self):
port = self.port_entry.get().strip()
baud = int(self.baud_cb.get())
data = int(self.data_cb.get())
stop = float(self.stop_cb.get())
parity_map = {"None": serial.PARITY_NONE,
"Odd": serial.PARITY_ODD,
"Even": serial.PARITY_EVEN}
parity = parity_map.get(self.parity_cb.get(), serial.PARITY_NONE)
try:
self.ser = serial.Serial(
port=port, baudrate=baud,
bytesize=data, stopbits=stop,
parity=parity, timeout=TIMEOUT
)
self.running = True
self.rx_count = 0
self.btn_open.config(text="关闭串口", bg="#e74c3c")
self.status_var.set("状态: 已连接 {} @ {} bps".format(port, baud))
self.rx_thread = threading.Thread(target=self._read_loop, daemon=True)
self.rx_thread.start()
except Exception as e:
self.status_var.set("状态: 打开失败 - {}".format(e))
def close_port(self):
self.running = False
if self.ser and self.ser.is_open:
self.ser.close()
self.btn_open.config(text="打开串口", bg="#2ecc71")
self.status_var.set("状态: 已断开 | 累计接收: {} 字节".format(self.rx_count))
# ----------------------------------------------------------
def _read_loop(self):
"""独立线程:持续读取串口数据"""
buf = bytearray()
last_flush = time.time()
while self.running:
try:
waiting = self.ser.in_waiting
if waiting > 0:
data = self.ser.read(waiting)
buf.extend(data)
self.rx_count += len(data)
# 每满 16 字节 或 超过 50ms 未刷新 则输出一行
now = time.time()
while len(buf) >= 16:
self._append_hex_line(bytes(buf[:16]))
buf = buf[16:]
if buf and (now - last_flush) >= 0.05:
self._append_hex_line(bytes(buf))
buf.clear()
last_flush = now
else:
# 无数据时刷新剩余缓冲
if buf and (time.time() - last_flush) >= 0.1:
self._append_hex_line(bytes(buf))
buf.clear()
last_flush = time.time()
time.sleep(0.005)
except Exception as e:
if self.running:
self.root.after(0, self.status_var.set,
"状态: 读取错误 - {}".format(e))
break
def _append_hex_line(self, data: bytes):
"""格式化为 HEX 行并追加到显示区(线程安全)"""
ts = time.strftime("%H:%M:%S")
hex_str = " ".join("{:02X}".format(b) for b in data)
# 对齐16字节 = 47字符宽
hex_padded = "{:<47}".format(hex_str)
if self.show_ascii.get():
ascii_str = "".join(chr(b) if 32 <= b < 127 else '.' for b in data)
line = "[{}] {} |{}|".format(ts, hex_padded, ascii_str)
else:
line = "[{}] {}".format(ts, hex_str)
self.root.after(0, self._insert_text, line)
self.root.after(0, self.status_var.set,
"状态: 接收中 | 累计: {} 字节".format(self.rx_count))
def _insert_text(self, line):
self.text_area.config(state='normal')
self.text_area.insert('end', line)
self.text_area.see('end') # 自动滚动到最新
self.text_area.config(state='disabled')
def clear_display(self):
self.text_area.config(state='normal')
self.text_area.delete('1.0', 'end')
self.text_area.config(state='disabled')
def shutdown(self):
self.close_port()
self.root.destroy()
def run(self):
self.root.mainloop()
if __name__ == "__main__":
viewer = SerialHexViewer()
viewer.run()