Files
AVM360/test_serial.py
2026-04-01 17:27:55 +08:00

208 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import serial
import sys
import time
import threading
import tkinter as tk
from tkinter import ttk, scrolledtext
# ==========================================================
# 串口配置
# ==========================================================
SERIAL_PORT = "/dev/ttyS0"
BAUD_RATE = 115200
TIMEOUT = 1
class SerialHexViewer:
def __init__(self):
self.ser = None
self.running = False
self.rx_thread = None
# ---- 主窗口 ----
self.root = tk.Tk()
self.root.title("串口 HEX 接收器 - {}".format(SERIAL_PORT))
self.root.geometry("700x500")
# ---- 顶部配置栏 ----
top = tk.Frame(self.root)
top.pack(fill='x', padx=8, pady=6)
tk.Label(top, text="串口:").pack(side='left')
self.port_entry = tk.Entry(top, width=12, justify='center')
self.port_entry.insert(0, SERIAL_PORT)
self.port_entry.pack(side='left', padx=4)
tk.Label(top, text="波特率:").pack(side='left')
self.baud_cb = ttk.Combobox(top, width=10,
values=["9600","19200","38400","57600","115200","230400","460800","921600"])
self.baud_cb.set(str(BAUD_RATE))
self.baud_cb.pack(side='left', padx=4)
tk.Label(top, text="数据位:").pack(side='left')
self.data_cb = ttk.Combobox(top, width=4, values=["5","6","7","8"])
self.data_cb.set("8")
self.data_cb.pack(side='left', padx=2)
tk.Label(top, text="停止位:").pack(side='left')
self.stop_cb = ttk.Combobox(top, width=4, values=["1","1.5","2"])
self.stop_cb.set("1")
self.stop_cb.pack(side='left', padx=2)
tk.Label(top, text="校验:").pack(side='left')
self.parity_cb = ttk.Combobox(top, width=6, values=["None","Odd","Even"])
self.parity_cb.set("None")
self.parity_cb.pack(side='left', padx=2)
# ---- 按钮 ----
btn_frame = tk.Frame(self.root)
btn_frame.pack(fill='x', padx=8, pady=2)
self.btn_open = tk.Button(btn_frame, text="打开串口", bg="#2ecc71", fg="white",
width=12, command=self.toggle_port)
self.btn_open.pack(side='left', padx=4)
tk.Button(btn_frame, text="清空显示", bg="#3498db", fg="white",
width=12, command=self.clear_display).pack(side='left', padx=4)
tk.Button(btn_frame, text="退出", bg="#e74c3c", fg="white",
width=8, command=self.shutdown).pack(side='right', padx=4)
# 显示格式选择
self.show_ascii = tk.BooleanVar(value=True)
tk.Checkbutton(btn_frame, text="同时显示 ASCII",
variable=self.show_ascii).pack(side='left', padx=8)
ttk.Separator(self.root, orient='horizontal').pack(fill='x')
# ---- HEX 显示区 ----
self.text_area = scrolledtext.ScrolledText(
self.root, font=("Courier New", 11),
bg="#1e1e1e", fg="#00ff00",
insertbackground="white",
state='disabled'
)
self.text_area.pack(fill='both', expand=True, padx=8, pady=6)
# ---- 状态栏 ----
self.status_var = tk.StringVar(value="状态: 未连接")
tk.Label(self.root, textvariable=self.status_var,
anchor='w', fg="gray").pack(fill='x', padx=8, pady=2)
self.rx_count = 0
# ----------------------------------------------------------
def toggle_port(self):
if not self.running:
self.open_port()
else:
self.close_port()
def open_port(self):
port = self.port_entry.get().strip()
baud = int(self.baud_cb.get())
data = int(self.data_cb.get())
stop = float(self.stop_cb.get())
parity_map = {"None": serial.PARITY_NONE,
"Odd": serial.PARITY_ODD,
"Even": serial.PARITY_EVEN}
parity = parity_map.get(self.parity_cb.get(), serial.PARITY_NONE)
try:
self.ser = serial.Serial(
port=port, baudrate=baud,
bytesize=data, stopbits=stop,
parity=parity, timeout=TIMEOUT
)
self.running = True
self.rx_count = 0
self.btn_open.config(text="关闭串口", bg="#e74c3c")
self.status_var.set("状态: 已连接 {} @ {} bps".format(port, baud))
self.rx_thread = threading.Thread(target=self._read_loop, daemon=True)
self.rx_thread.start()
except Exception as e:
self.status_var.set("状态: 打开失败 - {}".format(e))
def close_port(self):
self.running = False
if self.ser and self.ser.is_open:
self.ser.close()
self.btn_open.config(text="打开串口", bg="#2ecc71")
self.status_var.set("状态: 已断开 | 累计接收: {} 字节".format(self.rx_count))
# ----------------------------------------------------------
def _read_loop(self):
"""独立线程:持续读取串口数据"""
buf = bytearray()
last_flush = time.time()
while self.running:
try:
waiting = self.ser.in_waiting
if waiting > 0:
data = self.ser.read(waiting)
buf.extend(data)
self.rx_count += len(data)
# 每满 16 字节 或 超过 50ms 未刷新 则输出一行
now = time.time()
while len(buf) >= 16:
self._append_hex_line(bytes(buf[:16]))
buf = buf[16:]
if buf and (now - last_flush) >= 0.05:
self._append_hex_line(bytes(buf))
buf.clear()
last_flush = now
else:
# 无数据时刷新剩余缓冲
if buf and (time.time() - last_flush) >= 0.1:
self._append_hex_line(bytes(buf))
buf.clear()
last_flush = time.time()
time.sleep(0.005)
except Exception as e:
if self.running:
self.root.after(0, self.status_var.set,
"状态: 读取错误 - {}".format(e))
break
def _append_hex_line(self, data: bytes):
"""格式化为 HEX 行并追加到显示区(线程安全)"""
ts = time.strftime("%H:%M:%S")
hex_str = " ".join("{:02X}".format(b) for b in data)
# 对齐16字节 = 47字符宽
hex_padded = "{:<47}".format(hex_str)
if self.show_ascii.get():
ascii_str = "".join(chr(b) if 32 <= b < 127 else '.' for b in data)
line = "[{}] {} |{}|".format(ts, hex_padded, ascii_str)
else:
line = "[{}] {}".format(ts, hex_str)
self.root.after(0, self._insert_text, line)
self.root.after(0, self.status_var.set,
"状态: 接收中 | 累计: {} 字节".format(self.rx_count))
def _insert_text(self, line):
self.text_area.config(state='normal')
self.text_area.insert('end', line)
self.text_area.see('end') # 自动滚动到最新
self.text_area.config(state='disabled')
def clear_display(self):
self.text_area.config(state='normal')
self.text_area.delete('1.0', 'end')
self.text_area.config(state='disabled')
def shutdown(self):
self.close_port()
self.root.destroy()
def run(self):
self.root.mainloop()
if __name__ == "__main__":
viewer = SerialHexViewer()
viewer.run()