import serial import sys import time import threading import tkinter as tk from tkinter import ttk, scrolledtext # ========================================================== # 串口配置 # ========================================================== SERIAL_PORT = "/dev/ttyS0" BAUD_RATE = 115200 TIMEOUT = 1 class SerialHexViewer: def __init__(self): self.ser = None self.running = False self.rx_thread = None # ---- 主窗口 ---- self.root = tk.Tk() self.root.title("串口 HEX 接收器 - {}".format(SERIAL_PORT)) self.root.geometry("700x500") # ---- 顶部配置栏 ---- top = tk.Frame(self.root) top.pack(fill='x', padx=8, pady=6) tk.Label(top, text="串口:").pack(side='left') self.port_entry = tk.Entry(top, width=12, justify='center') self.port_entry.insert(0, SERIAL_PORT) self.port_entry.pack(side='left', padx=4) tk.Label(top, text="波特率:").pack(side='left') self.baud_cb = ttk.Combobox(top, width=10, values=["9600","19200","38400","57600","115200","230400","460800","921600"]) self.baud_cb.set(str(BAUD_RATE)) self.baud_cb.pack(side='left', padx=4) tk.Label(top, text="数据位:").pack(side='left') self.data_cb = ttk.Combobox(top, width=4, values=["5","6","7","8"]) self.data_cb.set("8") self.data_cb.pack(side='left', padx=2) tk.Label(top, text="停止位:").pack(side='left') self.stop_cb = ttk.Combobox(top, width=4, values=["1","1.5","2"]) self.stop_cb.set("1") self.stop_cb.pack(side='left', padx=2) tk.Label(top, text="校验:").pack(side='left') self.parity_cb = ttk.Combobox(top, width=6, values=["None","Odd","Even"]) self.parity_cb.set("None") self.parity_cb.pack(side='left', padx=2) # ---- 按钮 ---- btn_frame = tk.Frame(self.root) btn_frame.pack(fill='x', padx=8, pady=2) self.btn_open = tk.Button(btn_frame, text="打开串口", bg="#2ecc71", fg="white", width=12, command=self.toggle_port) self.btn_open.pack(side='left', padx=4) tk.Button(btn_frame, text="清空显示", bg="#3498db", fg="white", width=12, command=self.clear_display).pack(side='left', padx=4) tk.Button(btn_frame, text="退出", bg="#e74c3c", fg="white", width=8, command=self.shutdown).pack(side='right', padx=4) # 显示格式选择 self.show_ascii = tk.BooleanVar(value=True) tk.Checkbutton(btn_frame, text="同时显示 ASCII", variable=self.show_ascii).pack(side='left', padx=8) ttk.Separator(self.root, orient='horizontal').pack(fill='x') # ---- HEX 显示区 ---- self.text_area = scrolledtext.ScrolledText( self.root, font=("Courier New", 11), bg="#1e1e1e", fg="#00ff00", insertbackground="white", state='disabled' ) self.text_area.pack(fill='both', expand=True, padx=8, pady=6) # ---- 状态栏 ---- self.status_var = tk.StringVar(value="状态: 未连接") tk.Label(self.root, textvariable=self.status_var, anchor='w', fg="gray").pack(fill='x', padx=8, pady=2) self.rx_count = 0 # ---------------------------------------------------------- def toggle_port(self): if not self.running: self.open_port() else: self.close_port() def open_port(self): port = self.port_entry.get().strip() baud = int(self.baud_cb.get()) data = int(self.data_cb.get()) stop = float(self.stop_cb.get()) parity_map = {"None": serial.PARITY_NONE, "Odd": serial.PARITY_ODD, "Even": serial.PARITY_EVEN} parity = parity_map.get(self.parity_cb.get(), serial.PARITY_NONE) try: self.ser = serial.Serial( port=port, baudrate=baud, bytesize=data, stopbits=stop, parity=parity, timeout=TIMEOUT ) self.running = True self.rx_count = 0 self.btn_open.config(text="关闭串口", bg="#e74c3c") self.status_var.set("状态: 已连接 {} @ {} bps".format(port, baud)) self.rx_thread = threading.Thread(target=self._read_loop, daemon=True) self.rx_thread.start() except Exception as e: self.status_var.set("状态: 打开失败 - {}".format(e)) def close_port(self): self.running = False if self.ser and self.ser.is_open: self.ser.close() self.btn_open.config(text="打开串口", bg="#2ecc71") self.status_var.set("状态: 已断开 | 累计接收: {} 字节".format(self.rx_count)) # ---------------------------------------------------------- def _read_loop(self): """独立线程:持续读取串口数据""" buf = bytearray() last_flush = time.time() while self.running: try: waiting = self.ser.in_waiting if waiting > 0: data = self.ser.read(waiting) buf.extend(data) self.rx_count += len(data) # 每满 16 字节 或 超过 50ms 未刷新 则输出一行 now = time.time() while len(buf) >= 16: self._append_hex_line(bytes(buf[:16])) buf = buf[16:] if buf and (now - last_flush) >= 0.05: self._append_hex_line(bytes(buf)) buf.clear() last_flush = now else: # 无数据时刷新剩余缓冲 if buf and (time.time() - last_flush) >= 0.1: self._append_hex_line(bytes(buf)) buf.clear() last_flush = time.time() time.sleep(0.005) except Exception as e: if self.running: self.root.after(0, self.status_var.set, "状态: 读取错误 - {}".format(e)) break def _append_hex_line(self, data: bytes): """格式化为 HEX 行并追加到显示区(线程安全)""" ts = time.strftime("%H:%M:%S") hex_str = " ".join("{:02X}".format(b) for b in data) # 对齐:16字节 = 47字符宽 hex_padded = "{:<47}".format(hex_str) if self.show_ascii.get(): ascii_str = "".join(chr(b) if 32 <= b < 127 else '.' for b in data) line = "[{}] {} |{}|".format(ts, hex_padded, ascii_str) else: line = "[{}] {}".format(ts, hex_str) self.root.after(0, self._insert_text, line) self.root.after(0, self.status_var.set, "状态: 接收中 | 累计: {} 字节".format(self.rx_count)) def _insert_text(self, line): self.text_area.config(state='normal') self.text_area.insert('end', line) self.text_area.see('end') # 自动滚动到最新 self.text_area.config(state='disabled') def clear_display(self): self.text_area.config(state='normal') self.text_area.delete('1.0', 'end') self.text_area.config(state='disabled') def shutdown(self): self.close_port() self.root.destroy() def run(self): self.root.mainloop() if __name__ == "__main__": viewer = SerialHexViewer() viewer.run()