import cv2 import threading import sys, os, time import argparse import numpy as np import serial import json import glob import shutil from collections import deque from datetime import datetime from PyQt5.QtWidgets import (QApplication, QLabel, QWidget, QVBoxLayout, QHBoxLayout, QMainWindow, QStackedWidget, QFrame, QListWidget, QListWidgetItem, QSizePolicy, QProgressBar, QGridLayout) from PyQt5.QtGui import QImage, QPixmap, QFont from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject from PIL import Image, ImageDraw, ImageFont # ========================================================== # OpenCL GPU 加速初始化 # ========================================================== _OCL_ENABLED = False try: cv2.ocl.setUseOpenCL(True) _OCL_ENABLED = cv2.ocl.useOpenCL() if _OCL_ENABLED: ocl_devs = cv2.ocl.Device.getDefault() print("[INFO] OpenCL GPU 加速已启用: {}".format(ocl_devs.name())) else: print("[WARNING] OpenCL 不可用,将使用 CPU") except Exception as e: print("[WARNING] OpenCL 初始化失败 ({}), 将使用 CPU".format(e)) try: from surround_view import FisheyeCameraModel, BirdView import surround_view.param_settings as settings except ImportError as e: print("[ERROR] Cannot import surround_view modules: {}".format(e)) sys.exit(1) # ========================================================== # 遥控键码 # ========================================================== KEY_UP = 0x1B KEY_DOWN = 0x1A KEY_LEFT = 0x04 KEY_RIGHT = 0x06 KEY_BACK = 0x12 KEY_OK = 0x05 # ========================================================== # 录制参数 # ========================================================== VIDEO_DIR = "/videos" SEGMENT_SECS = 180 RECORD_FPS = 10 CELL_W = 640 CELL_H = 360 GRID_W = CELL_W * 2 GRID_H = CELL_H * 2 DISK_WARN_PCT = 90 # ========================================================== # 显示参数 自适应屏幕大小显示 兼容7寸 10寸 # ========================================================== SCREEN_W = 1024 SCREEN_H = 600 BIRD_W = SCREEN_W // 3 FRONT_W = SCREEN_W - BIRD_W DIR_CN = {"front":"前方","left":"左方","back":"后方","right":"右方"} WARN_COLOR = (255, 80, 80) SAFE_COLOR = (100, 255, 100) MODE_360 = 0 MODE_QUAD = 1 SEND_INTERVAL_MS = 100 # ========================================================== # 配置 # ========================================================== CONFIG_PATH = os.path.join(os.getcwd(), "surround_config.json") DEFAULT_CONFIG = { "carousel_interval": 4, "brightness": 50, "radar_unit": 0, "show_radar": 1, "radar_alarm_dist": 50, "display_mode": MODE_360, } def load_config(): if os.path.exists(CONFIG_PATH): try: with open(CONFIG_PATH, "r") as f: cfg = json.load(f) for k, v in DEFAULT_CONFIG.items(): cfg.setdefault(k, v) return cfg except Exception: pass return dict(DEFAULT_CONFIG) def save_config(cfg): try: with open(CONFIG_PATH, "w") as f: json.dump(cfg, f, indent=2, ensure_ascii=False) print("[CONFIG] 已保存: {}".format(CONFIG_PATH)) except Exception as e: print("[CONFIG ERROR] {}".format(e)) # ========================================================== # 磁盘管理 # ========================================================== def get_disk_usage_pct(path): try: usage = shutil.disk_usage(path) return usage.used * 100 // usage.total except Exception: return 0 def get_disk_info_str(path): try: usage = shutil.disk_usage(path) used_gb = usage.used / 1024**3 total_gb = usage.total / 1024**3 pct = usage.used * 100 // usage.total return "磁盘: {:.1f}GB / {:.1f}GB ({}%)".format( used_gb, total_gb, pct), pct except Exception: return "磁盘: 未知", 0 def cleanup_old_videos(): files = [] for pat in ("rec_*.mp4", "rec_*.avi"): files.extend(glob.glob(os.path.join(VIDEO_DIR, pat))) files = sorted(files) while files and get_disk_usage_pct(VIDEO_DIR) >= DISK_WARN_PCT: oldest = files.pop(0) try: os.remove(oldest) print("[清理] 已删除: {}".format(oldest)) except Exception as e: print("[清理 ERROR] {}".format(e)) break # ========================================================== # 1. 字体管理 # ========================================================== class FontManager: _fonts = {} _lock = threading.Lock() @classmethod def get(cls, size=20): with cls._lock: if size not in cls._fonts: candidates = [ "/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc", "/usr/share/fonts/truetype/wqy/wqy-microhei.ttc", "/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf", "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc", "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", ] loaded = False for p in candidates: if os.path.exists(p): try: cls._fonts[size] = ImageFont.truetype(p, size) loaded = True break except Exception: continue if not loaded: cls._fonts[size] = ImageFont.load_default() return cls._fonts[size] # ========================================================== # 2. 中文叠加 # ========================================================== def add_texts_to_pil(pil_image, text_list): draw = ImageDraw.Draw(pil_image) for item in text_list: if len(item) == 5: x, y, text, color, size = item font = FontManager.get(size) else: x, y, text, color = item font = FontManager.get(20) draw.text((x+1, y+1), text, font=font, fill=(0, 0, 0)) draw.text((x, y ), text, font=font, fill=color) # ========================================================== # 3. 串口1 # ========================================================== class SerialWorker(threading.Thread): def __init__(self, port="/dev/ttyS0", baudrate=115200): super().__init__(daemon=True) self.port = port self.baudrate = baudrate self.running = True self._rx_lock = threading.Lock() self._radar_dist = 0 self._stopped = False self._reverse = False self._active_dirs = [] self._tx_lock = threading.Lock() self._tx_front = 0x00 self._tx_left = 0x00 self._tx_back = 0x00 self._tx_right = 0x00 self._tx_radar_thr = 0x00 self._rx_enabled = True def set_rx_enabled(self, enabled): self._rx_enabled = enabled def update_send(self, front, left, back, right, radar_thr): with self._tx_lock: self._tx_front = front & 0xFF self._tx_left = left & 0xFF self._tx_back = back & 0xFF self._tx_right = right & 0xFF self._tx_radar_thr = radar_thr & 0xFF def _build_tx_frame(self): with self._tx_lock: b1 = self._tx_front b2 = self._tx_left b3 = self._tx_back b4 = self._tx_right b5 = 0x00 b6 = self._tx_radar_thr chk = (b1 + b2 + b3 + b4 + b5 + b6) & 0xFF return bytes([0x55, b1, b2, b3, b4, b5, b6, chk, 0xAA]) def get_state(self): with self._rx_lock: return list(self._active_dirs), self._radar_dist, self._stopped, self._reverse #add 返回倒车状态 def _parse_rx_frame(self, frame): if len(frame) != 10 or frame[0] != 0x55 or frame[9] != 0xAA: return front = frame[1]; left = frame[2] back = frame[3]; right = frame[4] radar = frame[5]; stop = frame[6] reverse = frame[7] # 倒车信号 00=正常 01=倒车 active = [] for name, val in [("front", front), ("left", left), ("back", back), ("right", right)]: if val != 0x00: active.append(name) with self._rx_lock: self._active_dirs = active self._radar_dist = radar self._stopped = (stop != 0x00) self._reverse = (reverse == 0x01) # add 倒车信号 def run(self): try: ser = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, stopbits=serial.STOPBITS_ONE, parity=serial.PARITY_NONE, timeout=0.05) except Exception as e: print("[串口1 ERROR] 无法打开: {}".format(e)) return def _send_loop(): interval = SEND_INTERVAL_MS / 1000.0 while self.running: try: frame = self._build_tx_frame() ser.write(frame) except Exception as e: if self.running: print("[串口1 TX ERROR] {}".format(e)) time.sleep(interval) tx_thread = threading.Thread(target=_send_loop, daemon=True) tx_thread.start() buf = bytearray() while self.running: try: if not self._rx_enabled: time.sleep(0.05) buf.clear() continue byte = ser.read(1) if not byte: continue b = byte[0] if len(buf) == 0: if b == 0x55: buf.append(b) continue buf.append(b) if len(buf) == 10: self._parse_rx_frame(bytes(buf)) print("[RX] active: {} radar: {} stopped: {} reverse: {}".format( self._active_dirs, self._radar_dist, self._stopped, self._reverse)) buf.clear() elif len(buf) > 10: buf.clear() except Exception as e: if self.running: print("[串口1 RX ERROR] {}".format(e)) time.sleep(0.1) ser.close() def stop(self): self.running = False # ========================================================== # 4. 串口2:红外遥控 # ========================================================== REMOTE_HEADER = bytes([0x01, 0x52, 0xF0, 0xFF, 0x00, 0x86, 0x6B]) REMOTE_FRAME_LEN = 11 class RemoteSignals(QObject): key_pressed = pyqtSignal(int) enter_settings = pyqtSignal() exit_settings = pyqtSignal() class RemoteWorker(threading.Thread): def __init__(self, port="/dev/ttyS3", baudrate=9600): super().__init__(daemon=True) self.port = port self.baudrate = baudrate self.running = True self.signals = RemoteSignals() def _parse_frame(self, frame): if len(frame) != REMOTE_FRAME_LEN: return if frame[:7] != REMOTE_HEADER: return code = frame[7] self.signals.key_pressed.emit(code) if code == KEY_OK: self.signals.enter_settings.emit() elif code == KEY_BACK: self.signals.exit_settings.emit() def run(self): try: ser = serial.Serial(port=self.port, baudrate=self.baudrate, bytesize=serial.EIGHTBITS, stopbits=serial.STOPBITS_ONE, parity=serial.PARITY_NONE, timeout=1) except Exception as e: print("[串口2 ERROR] {}".format(e)) return buf = bytearray() while self.running: try: byte = ser.read(1) if not byte: continue b = byte[0] if len(buf) == 0: if b == 0x01: buf.append(b) continue buf.append(b) if len(buf) <= 7: if buf[len(buf)-1] != REMOTE_HEADER[len(buf)-1]: buf.clear() continue if len(buf) == REMOTE_FRAME_LEN: self._parse_frame(bytes(buf)) buf.clear() elif len(buf) > REMOTE_FRAME_LEN: buf.clear() except Exception as e: if self.running: print("[串口2 ERROR] {}".format(e)) time.sleep(0.1) ser.close() def stop(self): self.running = False # ========================================================== # 5. 轮播调度器 # ========================================================== class CarouselScheduler: INTERVAL = 4.0 def __init__(self): self._queue = [] self._cur_dir = None self._cur_start = 0.0 def update(self, active_dirs): if not active_dirs: self._queue = [] self._cur_dir = None return None, 0 for d in active_dirs: if d not in self._queue: self._queue.append(d) self._queue = [d for d in self._queue if d in active_dirs or d == self._cur_dir] if self._cur_dir is None or self._cur_dir not in self._queue: self._cur_dir = self._queue[0] if self._queue else None self._cur_start = time.time() self._skip_gone(active_dirs) if time.time() - self._cur_start >= self.INTERVAL: self._advance(active_dirs) remaining = max(0.0, self.INTERVAL - (time.time() - self._cur_start)) return self._cur_dir, remaining def _advance(self, active_dirs): if not self._queue: self._cur_dir = None return try: idx = self._queue.index(self._cur_dir) except ValueError: idx = -1 self._cur_dir = self._queue[(idx+1) % len(self._queue)] self._cur_start = time.time() self._skip_gone(active_dirs) def _skip_gone(self, active_dirs): for _ in range(len(self._queue)): if self._cur_dir in active_dirs: break self._advance(active_dirs) # ========================================================== # 6. 原始帧采集 # ========================================================== class RawCaptureWorker(threading.Thread): def __init__(self, caps, names): super().__init__(daemon=True) self.caps = caps self.names = names self.running = True self.queues = {name: deque(maxlen=2) for name in names} def get_frame(self, name): q = self.queues.get(name) if q: try: return q[-1] except: pass return None def run(self): while self.running: for cap, name in zip(self.caps, self.names): ret, frame = cap.read() if ret and frame is not None: self.queues[name].append(frame) def stop(self): self.running = False # ========================================================== # 7. 鸟瞰处理 # ========================================================== class BirdViewWorker(threading.Thread): def __init__(self, raw_worker, camera_models, birdview, framestep=2): super().__init__(daemon=True) self.raw_worker = raw_worker self.models = camera_models self.names = raw_worker.names self.birdview = birdview self.framestep = framestep self.running = True self.frame_idx = 0 self._rq = deque(maxlen=2) self._lock = threading.Lock() def get_bird(self): with self._lock: try: return self._rq[-1] except: return None def run(self): while self.running: self.frame_idx += 1 if self.frame_idx % self.framestep != 0: time.sleep(0.033) continue frames = [] for name, model in zip(self.names, self.models): frame = self.raw_worker.get_frame(name) if frame is None: break try: frame = model.process_frame(frame) if frame is not None: frames.append(frame) except Exception as e: print("[BIRD ERROR] {}".format(e)) break if len(frames) != 4: time.sleep(0.033) continue try: self.birdview.update_frames(frames) self.birdview.stitch_all_parts() self.birdview.make_white_balance() self.birdview.copy_car_image() bird = self.birdview.image.copy() with self._lock: self._rq.append(bird) except Exception as e: print("[BIRDVIEW ERROR] {}".format(e)) time.sleep(0.01) def stop(self): self.running = False # ========================================================== # 8. 四路录制线程 # ========================================================== class RecordWorker(threading.Thread): ORDER = ["front", "left", "back", "right"] CODEC_CANDIDATES = [ ("mp4v", ".mp4"), ("MJPG", ".avi"), ("XVID", ".avi"), ] def __init__(self, raw_worker): super().__init__(daemon=True) self.raw_worker = raw_worker self.running = True self._writer = None self._seg_start = 0.0 self._cur_path = "" self._ext = ".mp4" self._fourcc = cv2.VideoWriter_fourcc(*"mp4v") os.makedirs(VIDEO_DIR, exist_ok=True) self._probe_codec() def _probe_codec(self): test_base = os.path.join(VIDEO_DIR, "_probe_test") for codec, ext in self.CODEC_CANDIDATES: test_path = test_base + ext try: fourcc = cv2.VideoWriter_fourcc(*codec) w = cv2.VideoWriter(test_path, fourcc, RECORD_FPS, (GRID_W, GRID_H)) if not w.isOpened(): w.release() continue blank = np.zeros((GRID_H, GRID_W, 3), dtype=np.uint8) w.write(blank) w.release() if os.path.exists(test_path) and \ os.path.getsize(test_path) > 500: self._fourcc = fourcc self._ext = ext print("[录制] 编码器: {} 格式: {}".format(codec, ext)) try: os.remove(test_path) except: pass return try: os.remove(test_path) except: pass except Exception as e: print("[录制] 编码器 {} 不可用: {}".format(codec, e)) print("[录制] 使用默认 mp4v") def _new_segment(self): if self._writer: self._writer.release() print("[录制] 已保存: {}".format(self._cur_path)) cleanup_old_videos() ts = datetime.now().strftime("%Y%m%d_%H%M%S") self._cur_path = os.path.join( VIDEO_DIR, "rec_{}{}".format(ts, self._ext)) self._writer = cv2.VideoWriter( self._cur_path, self._fourcc, RECORD_FPS, (GRID_W, GRID_H)) if not self._writer.isOpened(): print("[录制 ERROR] 无法创建: {}".format(self._cur_path)) self._writer = None self._seg_start = time.time() print("[录制] 新段落: {}".format(self._cur_path)) def run(self): self._new_segment() frame_interval = 1.0 / RECORD_FPS next_frame_time = time.time() while self.running: now = time.time() if now - self._seg_start >= SEGMENT_SECS: self._new_segment() next_frame_time = time.time() continue wait = next_frame_time - now if wait > 0: time.sleep(wait) next_frame_time += frame_interval if self._writer is None: self._new_segment() continue cells = {} for name in self.ORDER: f = self.raw_worker.get_frame(name) cells[name] = ( cv2.resize(f, (CELL_W, CELL_H), interpolation=cv2.INTER_LINEAR) if f is not None else np.zeros((CELL_H, CELL_W, 3), dtype=np.uint8) ) top = np.hstack([cells["front"], cells["left"]]) bottom = np.hstack([cells["back"], cells["right"]]) grid = np.vstack([top, bottom]) for label, x, y in [ ("Front", 10, 10), ("Left", CELL_W+10, 10), ("Back", 10, CELL_H+10), ("Right", CELL_W+10, CELL_H+10), ]: cv2.putText(grid, label, (x, y+28), cv2.FONT_HERSHEY_SIMPLEX, 1.0, (0, 255, 100), 2) self._writer.write(grid) def stop(self): self.running = False if self._writer: self._writer.release() print("[录制] 最终段落: {}".format(self._cur_path)) @staticmethod def list_videos(): files = [] for pat in ("rec_*.mp4", "rec_*.avi"): files.extend(glob.glob(os.path.join(VIDEO_DIR, pat))) return sorted(files, reverse=True) @staticmethod def parse_ts(path): base = os.path.basename(path) for rep in ("rec_", ".mp4", ".avi"): base = base.replace(rep, "") try: return datetime.strptime(base, "%Y%m%d_%H%M%S").strftime( "%Y-%m-%d %H:%M:%S") except: return base @staticmethod def file_size_str(path): try: sz = os.path.getsize(path) if sz >= 1024**3: return " {:.1f}GB".format(sz / 1024**3) if sz >= 1024**2: return " {:.1f}MB".format(sz / 1024**2) return " {:.0f}KB".format(sz / 1024) except: return " ??" # ========================================================== # 9. 多摄像头初始化 # ========================================================== class MultiCameraBirdView: def __init__(self): self.running = True self.names = settings.camera_names self.yamls = [ os.path.join(os.getcwd(), "yaml", n + ".yaml") for n in self.names ] try: self.camera_models = [ FisheyeCameraModel(f, n) for f, n in zip(self.yamls, self.names) ] except Exception as e: print("[ERROR] {}".format(e)) self.running = False return self.caps = [] self.which_cameras = {"front":2, "left":3, "back":0, "right":1} for name in self.names: cap_id = self.which_cameras.get(name, 0) cap = cv2.VideoCapture(cap_id, cv2.CAP_V4L2) cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV")) cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720) if not cap.isOpened(): print("[ERROR] 无法打开摄像头 {} ID:{}".format(name, cap_id)) self.running = False return self.caps.append(cap) self.birdview = BirdView() self._init_weights() def _init_weights(self): try: imgs = [ os.path.join(os.getcwd(), "images", n + ".png") for n in self.names ] frames = [] for p, model in zip(imgs, self.camera_models): img = cv2.imread(p) if img is None: continue frames.append( model.process_frame(img)) if len(frames) == 4: self.birdview.get_weights_and_masks(frames) print("[INFO] 权重初始化完成") else: print("[WARNING] 静态图不完整") except Exception as e: print("[ERROR] 权重初始化失败: {}".format(e)) # ========================================================== # 10. 360渲染线程(全屏轮播模式 + 画面裁切) # ========================================================== class RenderWorker(threading.Thread): TARGET_FPS = 30 FRAME_DELAY = 1.0 / TARGET_FPS # 画面裁切比例配置(左右裁切,0-0.5) CROP_RATIO = { "front": 0.15, # 前视裁切8% "left": 0.15, # 左视裁切10% "back": 0.15, # 后视裁切8% "right": 0.15, # 右视裁切10% } def __init__(self, bird_worker, raw_worker, serial_worker, carousel, cfg_ref, screen_w, screen_h): super().__init__(daemon=True) self.bird_worker = bird_worker self.raw_worker = raw_worker self.serial_worker = serial_worker self.carousel = carousel self.cfg_ref = cfg_ref self.sw = screen_w self.sh = screen_h self.running = True self._out_q = deque(maxlen=2) self._lock = threading.Lock() self._last_bird = None self._fullscreen_mode = False # 全屏轮播模式标志 self._last_active_dirs = [] # 上次检测到的人方向 for sz in (18, 20, 24, 28, 32, 36): FontManager.get(sz) def crop_image(self, img, crop_ratio): """ 对图像进行左右裁切 img: 输入图像 (h, w, c) crop_ratio: 裁切比例 (0-0.5),表示左右各裁切的比例 返回: 裁切后的图像 """ if crop_ratio <= 0 or crop_ratio >= 0.5: return img h, w = img.shape[:2] crop_width = int(w * crop_ratio) # 左右裁切 cropped = img[:, crop_width:w-crop_width] return cropped def get_qimage(self): with self._lock: try: return self._out_q[-1] except: return None def run(self): while self.running: t0 = time.time() bird = self.bird_worker.get_bird() if bird is not None: self._last_bird = bird if self._last_bird is None: time.sleep(0.02) continue try: sw = self.sw sh = self.sh # 获取状态 add 修改解包状态 active_dirs, radar_dist, is_stopped, is_reverse = \ self.serial_worker.get_state() display_dir, remaining = self.carousel.update(active_dirs) # 决定显示模式 # 当有人时(active_dirs非空)进入全屏轮播模式 # 决定显示模式(倒车优先级最高) if is_reverse: display_mode = "reverse" elif len(active_dirs) > 0: display_mode = "fullscreen" else: display_mode = "normal" # self._fullscreen_mode = (display_mode == "fullscreen") cfg = self.cfg_ref brightness = cfg.get("brightness", 50) if display_mode == "reverse": # ========================================== # 倒车模式:左侧鸟瞰 + 右侧后视摄像头 # ========================================== bw = sw // 3 fw = sw - bw back_frame = self.raw_worker.get_frame("back") crop_ratio = self.CROP_RATIO.get("back", 0) if back_frame is not None and crop_ratio > 0: back_frame = self.crop_image(back_frame, crop_ratio) bird_part = cv2.resize(self._last_bird, (bw, sh), interpolation=cv2.INTER_LINEAR) if back_frame is not None: back_part = cv2.resize(back_frame, (fw, sh), interpolation=cv2.INTER_LINEAR) else: back_part = np.zeros((sh, fw, 3), dtype=np.uint8) # -------------------------- # 新增:绘制红/黄/绿倒车辅助线 # -------------------------- if back_frame is not None: h, w = back_part.shape[:2] h=h//2 # 只在下半部分绘制辅助线 # ========================================== # 倒车线参数 - 梯形向远方伸长版本 # ========================================== # 底部(车尾)两个端点 - 宽 bottom_left = (int(w * 0.10), h+h) # 左侧起始10% bottom_right = (int(w * 0.90), h+h) # 右侧结束90% # 顶部(远方)- 更窄,形成更强的透视感 # 绿色(最远,安全区)- 伸长到更高更窄的位置 green_top_left = (int(w * 0.35), int(h * 0.12+h)) # 更窄更远 green_top_right = (int(w * 0.65), int(h * 0.12+h)) # 黄色(中间,警示区) yellow_top_left = (int(w * 0.28), int(h * 0.35+h)) # 稍宽 yellow_top_right = (int(w * 0.72), int(h * 0.35+h)) # 红色(最近,危险区) red_top_left = (int(w * 0.21), int(h * 0.60+h)) # 更宽 red_top_right = (int(w * 0.79), int(h * 0.60+h)) # 颜色定义(BGR格式) color_green = (0, 0, 255) # 绿色 color_red = (0, 255, 255) # 黄色 color_yellow = (0, 255, 0) # 红色 # 绘制梯形(填充半透明) overlay = back_part.copy() alpha = 0.2 # 透明度 # 绘制绿色区域(最远,安全区) green_pts = np.array([bottom_left, green_top_left, green_top_right, bottom_right], np.int32) cv2.fillPoly(overlay, [green_pts], color_green) # 绘制黄色区域(中间,警示区) yellow_pts = np.array([green_top_left, yellow_top_left, yellow_top_right, green_top_right], np.int32) cv2.fillPoly(overlay, [yellow_pts], color_yellow) # 绘制红色区域(最近,危险区) red_pts = np.array([yellow_top_left, red_top_left, red_top_right, yellow_top_right], np.int32) cv2.fillPoly(overlay, [red_pts], color_red) # 绘制左右车道线(加强透视感) # 左线 cv2.line(overlay, bottom_left, green_top_left, (255, 255, 255), 2) # 右线 cv2.line(overlay, bottom_right, green_top_right, (255, 255, 255), 2) # 混合半透明效果 back_part = cv2.addWeighted(overlay, alpha, back_part, 1 - alpha, 0) # -------------------------- # 原有亮度调节逻辑(保留) # -------------------------- if brightness != 50: alpha_v = brightness / 50.0 back_part = cv2.convertScaleAbs(back_part, alpha=alpha_v, beta=0) display = np.hstack((bird_part, back_part)) rgb = cv2.cvtColor(display, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(rgb) texts = [] # 倒车标题 texts.append((bw + 10, 10, "倒车模式 ◀ 后视", (255, 180, 0), 32)) # 雷达信息 if cfg.get("show_radar", 1): radar_dist_dm = radar_dist # 原始数据是分米 alarm_d_dm = cfg.get("radar_alarm_dist", 50) # 报警值单位也是分米 # 当距离大于60分米(6米)时不显示 if radar_dist_dm > 60: pass # 不显示 else: # 转换为米(保留1位小数)或厘米 if radar_dist_dm >= 10: # 大于等于1米,用米显示 dist_val = radar_dist_dm / 10.0 unit_str = "米" dist_text = f"{dist_val:.1f}" else: # 小于1米,用厘米显示 dist_val = radar_dist_dm * 10 unit_str = "厘米" dist_text = f"{dist_val:.0f}" # 报警值也转换为对应单位 if alarm_d_dm >= 10: alarm_val = alarm_d_dm / 10.0 alarm_unit = "米" alarm_text = f"{alarm_val:.1f}" else: alarm_val = alarm_d_dm * 10 alarm_unit = "厘米" alarm_text = f"{alarm_val:.0f}" # 颜色逻辑:使用转换后的分米值进行比较(报警值原始单位是分米) rc = ((255, 80, 80) if radar_dist_dm <= alarm_d_dm else (180, 220, 255)) texts.append((10, sh-108, f"雷达: {dist_text}{unit_str} 报警值: {alarm_text}{alarm_unit}", rc, 22)) # 停机状态 stop_text = "已停机" if is_stopped else "运行中" stop_color = ((255, 100, 100) if is_stopped else (100, 255, 100)) texts.append((bw + 500, 20, f"状态: {stop_text}", stop_color, 30)) # 时间戳 # texts.append((bw + 10, sh - 26, # time.strftime("%Y-%m-%d %H:%M:%S"), (160, 160, 160), 20)) elif display_mode == "fullscreen": # ========================================== # 全屏轮播模式:显示有人方向的画面 # ========================================== # 获取当前轮播方向的画面 if display_dir: frame = self.raw_worker.get_frame(display_dir) else: # 如果没有轮播方向,使用第一个有人方向 frame = self.raw_worker.get_frame(active_dirs[0]) if active_dirs else None # 如果获取失败,使用前视作为备用 if frame is None: frame = self.raw_worker.get_frame("front") if frame is not None: # 应用左右裁切 crop_ratio = self.CROP_RATIO.get(display_dir or active_dirs[0] if active_dirs else "front", 0) if crop_ratio > 0: frame = self.crop_image(frame, crop_ratio) # 强制拉伸到全屏尺寸,无黑边 display = cv2.resize(frame, (sw, sh), interpolation=cv2.INTER_LINEAR) # 亮度调节 if brightness != 50: alpha = brightness / 50.0 display = cv2.convertScaleAbs(display, alpha=alpha, beta=0) else: # 无画面时显示黑色背景 display = np.zeros((sh, sw, 3), dtype=np.uint8) # 转换为PIL图像添加文字 rgb = cv2.cvtColor(display, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(rgb) # 添加文字信息 texts = [] # 当前显示的方向 current_dir = display_dir or (active_dirs[0] if active_dirs else "front") dir_cn = DIR_CN.get(current_dir, current_dir) # 标题 - 全屏轮播模式 # texts.append((10, 10, "全景监控 - 全屏轮播", (0, 255, 100), 28)) # 显示当前方向和人员信息 texts.append((10, 50, f"检测到人员: {dir_cn}", WARN_COLOR, 36)) # 显示所有检测到的方向 # if len(active_dirs) > 1: # dirs_str = "、".join([DIR_CN.get(d, d) for d in active_dirs]) # texts.append((10, 92, f"所有方向: {dirs_str}", (255, 200, 0), 24)) # # 显示轮播信息 # queue_str = " → ".join([DIR_CN.get(d, d) for d in self.carousel._queue]) # texts.append((10, 124, f"轮播队列: {queue_str}", (200, 200, 200), 22)) # texts.append((10, 154, f"当前: {dir_cn} 倒计时: {remaining:.1f}s", (200, 200, 200), 22)) # else: # texts.append((10, 92, f"仅 {dir_cn} 方向有人", (200, 200, 200), 24)) # 雷达信息 if cfg.get("show_radar", 1): radar_dist_dm = radar_dist # 原始数据是分米 alarm_d_dm = cfg.get("radar_alarm_dist", 50) # 报警值单位也是分米 # 当距离大于60分米(6米)时不显示 if radar_dist_dm > 60: pass # 不显示 else: # 转换为米(保留1位小数)或厘米 if radar_dist_dm >= 10: # 大于等于1米,用米显示 dist_val = radar_dist_dm / 10.0 unit_str = "米" dist_text = f"{dist_val:.1f}" else: # 小于1米,用厘米显示 dist_val = radar_dist_dm * 10 unit_str = "厘米" dist_text = f"{dist_val:.0f}" # 报警值也转换为对应单位 if alarm_d_dm >= 10: alarm_val = alarm_d_dm / 10.0 alarm_unit = "米" alarm_text = f"{alarm_val:.1f}" else: alarm_val = alarm_d_dm * 10 alarm_unit = "厘米" alarm_text = f"{alarm_val:.0f}" # 颜色逻辑:使用转换后的分米值进行比较(报警值原始单位是分米) rc = ((255, 80, 80) if radar_dist_dm <= alarm_d_dm else (180, 220, 255)) texts.append((10, sh-108, f"雷达: {dist_text}{unit_str} 报警值: {alarm_text}{alarm_unit}", rc, 22)) # 状态信息 stop_text = "已停机" if is_stopped else "运行中" stop_color = ((255, 100, 100) if is_stopped else (100, 255, 100)) texts.append((10, sh-78, f"状态: {stop_text}", stop_color, 22)) # 时间戳 texts.append((10, sh-52, time.strftime("%Y-%m-%d %H:%M:%S"), (160, 160, 160), 20)) # 提示信息 # texts.append((sw-250, sh-30, "天煤机电科技", (100, 100, 100), 16)) else: # ========================================== # 正常模式:显示鸟瞰+前视 # ========================================== bw = sw // 3 fw = sw - bw # 获取画面 right_frame = (self.raw_worker.get_frame(display_dir) if display_dir else None) if right_frame is None: right_frame = self.raw_worker.get_frame("front") if right_frame is None: time.sleep(0.01) continue # 对右侧画面应用裁切 crop_ratio = self.CROP_RATIO.get(display_dir or "front", 0) if crop_ratio > 0: right_frame = self.crop_image(right_frame, crop_ratio) # 强制拉伸到精确尺寸,无黑边 bird_part = cv2.resize(self._last_bird, (bw, sh), interpolation=cv2.INTER_LINEAR) front_part = cv2.resize(right_frame, (fw, sh), interpolation=cv2.INTER_LINEAR) # 亮度调节 if brightness != 50: alpha = brightness / 50.0 front_part = cv2.convertScaleAbs(front_part, alpha=alpha, beta=0) display = np.hstack((bird_part, front_part)) rgb = cv2.cvtColor(display, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(rgb) # 添加文字信息 texts = [(10, 10, "鸟瞰视图", (0, 255, 100), 24)] texts.append((bw+10, 10, "周围安全", SAFE_COLOR, 28)) # if display_dir: # dir_cn = DIR_CN.get(display_dir, display_dir) # texts.append((bw+10, 10, f"{dir_cn} 检测到人员", WARN_COLOR, 36)) # if len(active_dirs) > 1: # queue_cn = " → ".join(DIR_CN.get(d, d) for d in self.carousel._queue) # texts.append((bw+10, 58, f"轮播: {queue_cn}", (255, 200, 0), 22)) # texts.append((bw+10, 88, f"当前: {dir_cn} 倒计时: {remaining:.1f}s", (200, 200, 200), 22)) # else: # texts.append((bw+10, 58, f"仅 {dir_cn} 有人", (200, 200, 200), 22)) # else: # texts.append((bw+10, 10, "前视监控 (周围无人)", SAFE_COLOR, 28)) # 雷达信息 if cfg.get("show_radar", 1): radar_dist_dm = radar_dist # 原始数据是分米 alarm_d_dm = cfg.get("radar_alarm_dist", 50) # 报警值单位也是分米 # 当距离大于60分米(6米)时不显示 if radar_dist_dm > 60: pass # 不显示 else: # 转换为米(保留1位小数)或厘米 if radar_dist_dm >= 10: # 大于等于1米,用米显示 dist_val = radar_dist_dm / 10.0 unit_str = "米" dist_text = f"{dist_val:.1f}" else: # 小于1米,用厘米显示 dist_val = radar_dist_dm * 10 unit_str = "厘米" dist_text = f"{dist_val:.0f}" # 报警值也转换为对应单位 if alarm_d_dm >= 10: alarm_val = alarm_d_dm / 10.0 alarm_unit = "米" alarm_text = f"{alarm_val:.1f}" else: alarm_val = alarm_d_dm * 10 alarm_unit = "厘米" alarm_text = f"{alarm_val:.0f}" # 颜色逻辑:使用转换后的分米值进行比较(报警值原始单位是分米) rc = ((255, 80, 80) if radar_dist_dm <= alarm_d_dm else (180, 220, 255)) texts.append((10, sh-108, f"雷达: {dist_text}{unit_str} 报警值: {alarm_text}{alarm_unit}", rc, 22)) # 状态信息 stop_text = "已停机" if is_stopped else "运行中" stop_color = ((255, 100, 100) if is_stopped else (100, 255, 100)) texts.append((bw+10, sh-52, f"状态: {stop_text}", stop_color, 22)) texts.append((bw+380, 10, time.strftime("%Y-%m-%d %H:%M:%S"), (160, 160, 160), 28)) # 添加文字到图像 add_texts_to_pil(pil_img, texts) arr = np.ascontiguousarray(np.array(pil_img)) h, w, ch = arr.shape qimg = QImage(arr.tobytes(), w, h, w * ch, QImage.Format_RGB888) with self._lock: self._out_q.append(qimg) # 更新发送内容 dir_map = {"front": 0, "left": 1, "back": 2, "right": 3} presence = [0x00, 0x00, 0x00, 0x00] for d in active_dirs: idx = dir_map.get(d) if idx is not None: presence[idx] = 0x01 radar_thr = cfg.get("radar_alarm_dist", 50) self.serial_worker.update_send( presence[0], presence[1], presence[2], presence[3], radar_thr) except Exception as e: print("[RENDER ERROR] {}".format(e)) import traceback traceback.print_exc() elapsed = time.time() - t0 sl = self.FRAME_DELAY - elapsed if sl > 0: time.sleep(sl) def update_crop_ratio(self, camera_name, ratio): """动态更新裁切比例""" if camera_name in self.CROP_RATIO: self.CROP_RATIO[camera_name] = max(0, min(0.5, ratio)) print(f"[CROP] {camera_name} 裁切比例更新为: {ratio}") def stop(self): self.running = False # ========================================================== # 11. 四分屏渲染线程(带画面裁切) # ========================================================== class QuadRenderWorker(threading.Thread): TARGET_FPS = 30 FRAME_DELAY = 1.0 / TARGET_FPS ORDER = [("front","前"), ("left","左"), ("back", "后"), ("right","右")] # 裁切比例配置(左右裁切百分比,0-0.5之间) # 例如 0.1 表示左右各裁切10% CROP_RATIO = { "front": 0.15, # 前视左右各裁切8% "left": 0.15, # 左视左右各裁切10% "back": 0.15, # 后视左右各裁切8% "right": 0.15, # 右视左右各裁切10% } def __init__(self, raw_worker, serial_worker, cfg_ref, screen_w, screen_h): super().__init__(daemon=True) self.raw_worker = raw_worker self.serial_worker = serial_worker self.cfg_ref = cfg_ref self.sw = screen_w self.sh = screen_h self.running = True self._out_q = deque(maxlen=2) self._lock = threading.Lock() for sz in (24, 28, 32): FontManager.get(sz) def crop_image(self, img, crop_ratio): """ 对图像进行左右裁切 img: 输入图像 (h, w, c) crop_ratio: 裁切比例 (0-0.5),表示左右各裁切的比例 返回: 裁切后的图像 """ if crop_ratio <= 0 or crop_ratio >= 0.5: return img h, w = img.shape[:2] crop_width = int(w * crop_ratio) # 左右裁切 cropped = img[:, crop_width:w-crop_width] return cropped def get_qimage(self): with self._lock: try: return self._out_q[-1] except: return None def run(self): while self.running: t0 = time.time() try: sw = self.sw sh = self.sh cw = sw // 2 ch = sh // 2 cells = [] labels_info = [] for (name, label_cn) in self.ORDER: f = self.raw_worker.get_frame(name) if f is not None: # 先裁切,再拉伸 crop_ratio = self.CROP_RATIO.get(name, 0) if crop_ratio > 0: f = self.crop_image(f, crop_ratio) # 强制拉伸到格子尺寸,无黑边 cell = cv2.resize(f, (cw, ch), interpolation=cv2.INTER_LINEAR) brightness = self.cfg_ref.get("brightness", 50) if brightness != 50: alpha = brightness / 50.0 cell = cv2.convertScaleAbs( cell, alpha=alpha, beta=0) else: cell = np.zeros((ch, cw, 3), dtype=np.uint8) cells.append(cell) labels_info.append(label_cn) top = np.hstack([cells[0], cells[1]]) bottom = np.hstack([cells[2], cells[3]]) grid = np.vstack([top, bottom]) rgb = cv2.cvtColor(grid, cv2.COLOR_BGR2RGB) pil_img = Image.fromarray(rgb) positions = [ (10, 10, labels_info[0]), (cw+10, 10, labels_info[1]), (10, ch+10, labels_info[2]), (cw+10, ch+10, labels_info[3]), ] texts = [] for (x, y, lbl) in positions: texts.append((x, y, lbl, (0, 255, 100), 32)) texts.append((10, sh-26, time.strftime("%Y-%m-%d %H:%M:%S"), (160, 160, 160), 20)) add_texts_to_pil(pil_img, texts) arr = np.ascontiguousarray(np.array(pil_img)) h, w, ch2 = arr.shape qimg = QImage(arr.tobytes(), w, h, w * ch2, QImage.Format_RGB888) with self._lock: self._out_q.append(qimg) radar_thr = self.cfg_ref.get("radar_alarm_dist", 50) # self.serial_worker.update_send(0, 0, 0, 0, radar_thr) except Exception as e: print("[QUAD RENDER ERROR] {}".format(e)) elapsed = time.time() - t0 sl = self.FRAME_DELAY - elapsed if sl > 0: time.sleep(sl) def stop(self): self.running = False def update_crop_ratio(self, camera_name, ratio): """动态更新裁切比例""" if camera_name in self.CROP_RATIO: self.CROP_RATIO[camera_name] = max(0, min(0.5, ratio)) print(f"[CROP] {camera_name} 裁切比例更新为: {ratio}") # ========================================================== # 12. 回放面板 # 视频充满右侧区域:强制拉伸到 label 尺寸(不保持宽高比) # 字体放大 # ========================================================== class PlaybackPanel(QWidget): def __init__(self, parent=None): super().__init__(parent) self._video_list = [] self._video_cursor = 0 self._play_cap = None self._play_timer = None self._play_paused = False self._play_fps = 15.0 self._play_total = 0 self._last_frame_t = 0.0 self._build_ui() def _build_ui(self): root = QHBoxLayout(self) root.setContentsMargins(8, 8, 8, 8) root.setSpacing(8) # ---- 左侧列表 ---- left = QWidget() left.setFixedWidth(260) ll = QVBoxLayout(left) ll.setContentsMargins(0, 0, 0, 0) ll.setSpacing(6) list_title = QLabel("📋 录像列表") list_title.setStyleSheet( "font-size:18px; color:#e94560; font-weight:bold;") ll.addWidget(list_title) self._list_widget = QListWidget() self._list_widget.setFocusPolicy(Qt.NoFocus) # 行高增大,字体加大 self._list_widget.setStyleSheet(""" QListWidget { background:#0d1117; border:1px solid #0f3460; font-size:15px; color:#ccc; } QListWidget::item { padding:8px 4px; border-bottom:1px solid #1a2540; min-height:36px; } QListWidget::item:selected { background:#e94560; color:#fff; } """) ll.addWidget(self._list_widget, stretch=1) self._disk_label = QLabel("") self._disk_label.setStyleSheet("color:#aaa; font-size:13px;") self._disk_label.setWordWrap(True) ll.addWidget(self._disk_label) self._disk_bar = QProgressBar() self._disk_bar.setRange(0, 100) self._disk_bar.setFixedHeight(10) self._disk_bar.setTextVisible(False) ll.addWidget(self._disk_bar) hint = QLabel("↑↓选择 OK播放/暂停 返回退出") hint.setStyleSheet("color:#555; font-size:13px;") hint.setWordWrap(True) ll.addWidget(hint) root.addWidget(left) # ---- 右侧播放区 ---- right = QWidget() right.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) rl = QVBoxLayout(right) rl.setContentsMargins(0, 0, 0, 0) rl.setSpacing(4) play_title = QLabel("▶ 回放画面") play_title.setFixedHeight(32) play_title.setStyleSheet( "font-size:18px; color:#e94560; font-weight:bold;") rl.addWidget(play_title) # label 充满右侧:Expanding + setScaledContents(True) # setScaledContents(True) 让 pixmap 自动随 label 尺寸拉伸 self._play_label = QLabel("选择左侧录像后按 OK 播放") self._play_label.setAlignment(Qt.AlignCenter) self._play_label.setStyleSheet( "background:#0d1117; color:#555; font-size:16px;" " border:1px solid #0f3460;") self._play_label.setSizePolicy( QSizePolicy.Expanding, QSizePolicy.Expanding) self._play_label.setScaledContents(True) # 自动拉伸填满 rl.addWidget(self._play_label, stretch=1) self._status_label = QLabel("") self._status_label.setAlignment(Qt.AlignCenter) self._status_label.setFixedHeight(28) self._status_label.setStyleSheet( "color:#aaa; font-size:15px;") rl.addWidget(self._status_label) root.addWidget(right, stretch=1) def load(self): self._stop_play() self._video_list = RecordWorker.list_videos() self._list_widget.clear() self._video_cursor = 0 if not self._video_list: self._list_widget.addItem(" (暂无录像)") else: for path in self._video_list: ts = RecordWorker.parse_ts(path) sz = RecordWorker.file_size_str(path) item = QListWidgetItem(" {}{}".format(ts, sz)) self._list_widget.addItem(item) self._list_widget.setCurrentRow(0) self._update_disk_ui() self._play_label.setScaledContents(False) self._play_label.setText("选择左侧录像后按 OK 播放") self._status_label.setText("") def _update_disk_ui(self): info_str, pct = get_disk_info_str(VIDEO_DIR) self._disk_label.setText(info_str) self._disk_bar.setValue(pct) color = "#e94560" if pct >= 90 else "#4CAF50" self._disk_bar.setStyleSheet(""" QProgressBar {{ background:#1a2540; border-radius:4px; }} QProgressBar::chunk {{ background:{}; border-radius:4px; }} """.format(color)) def _stop_play(self): if self._play_timer: self._play_timer.stop() self._play_timer = None if self._play_cap: self._play_cap.release() self._play_cap = None self._play_paused = False def _start_play(self): if not self._video_list: return self._stop_play() path = self._video_list[self._video_cursor] self._play_cap = cv2.VideoCapture(path) if not self._play_cap.isOpened(): self._status_label.setText("无法打开文件") return self._play_total = int(self._play_cap.get(cv2.CAP_PROP_FRAME_COUNT)) self._play_fps = self._play_cap.get(cv2.CAP_PROP_FPS) or RECORD_FPS self._play_paused = False # 开始播放时启用 ScaledContents 填满 label self._play_label.setScaledContents(True) self._play_label.setText("") interval_ms = max(1, int(1000.0 / self._play_fps)) self._play_timer = QTimer() self._play_timer.timeout.connect(self._tick) self._play_timer.start(interval_ms) self._status_label.setText("▶ 播放中 | {}".format( RecordWorker.parse_ts(path))) def _tick(self): if self._play_cap is None or self._play_paused: return ret, frame = self._play_cap.read() if not ret: self._play_timer.stop() self._status_label.setText("✅ 播放完毕") return # 直接设置 pixmap,label.setScaledContents(True) 自动填满 rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) h, w = rgb.shape[:2] qimg = QImage(rgb.tobytes(), w, h, w * 3, QImage.Format_RGB888) self._play_label.setPixmap(QPixmap.fromImage(qimg)) cur = int(self._play_cap.get(cv2.CAP_PROP_POS_FRAMES)) es = cur / self._play_fps ts = self._play_total / self._play_fps ps = " 【已暂停】" if self._play_paused else "" self._status_label.setText( "{:02d}:{:02d} / {:02d}:{:02d}{}".format( int(es)//60, int(es)%60, int(ts)//60, int(ts)%60, ps)) def on_key(self, code): if code == KEY_UP: if self._video_list: self._video_cursor = max(0, self._video_cursor - 1) self._list_widget.setCurrentRow(self._video_cursor) self._stop_play() self._play_label.setScaledContents(False) self._play_label.setText("按 OK 播放") self._status_label.setText("") elif code == KEY_DOWN: if self._video_list: self._video_cursor = min( len(self._video_list) - 1, self._video_cursor + 1) self._list_widget.setCurrentRow(self._video_cursor) self._stop_play() self._play_label.setScaledContents(False) self._play_label.setText("按 OK 播放") self._status_label.setText("") elif code == KEY_OK: if self._play_cap and self._play_cap.isOpened(): self._play_paused = not self._play_paused self._status_label.setText( "⏸ 已暂停" if self._play_paused else "▶ 播放中") else: self._start_play() def cleanup(self): self._stop_play() # ========================================================== # 13. 设置面板 # 字体全面放大(列表 20px、标题 26px、值标签 36px) # ========================================================== class SettingsPanel(QWidget): SUB_MAIN = 0 SUB_PLAYBACK = 1 def __init__(self, cfg, parent=None): super().__init__(parent) self._cfg = cfg self._cursor = 0 self._sub_page = self.SUB_MAIN self._build_ui() def _define_menu(self): return [ {"name":"显示模式", "type":"toggle", "key":"display_mode", "values":[MODE_360, MODE_QUAD], "labels":["360环视", "四分屏"]}, {"name":"轮播间隔(秒)", "type":"range", "key":"carousel_interval", "min":1, "max":10, "step":1}, {"name":"画面亮度", "type":"range", "key":"brightness", "min":0, "max":100, "step":5}, {"name":"雷达报警距离", "type":"range", "key":"radar_alarm_dist", "min":5, "max":50, "step":5}, {"name":"雷达距离单位", "type":"toggle", "key":"radar_unit", "values":[0, 1], "labels":["米", "厘米"]}, {"name":"显示雷达距离", "type":"toggle", "key":"show_radar", "values":[0, 1], "labels":["关闭", "开启"]}, {"name":"查看录像回放", "type":"page", "action": self._enter_playback}, ] def _build_ui(self): # 全面放大字号 self.setStyleSheet(""" QWidget { background:#1a1a2e; color:#e0e0e0; font-size:20px; } QLabel { font-size:20px; } QListWidget { background:#16213e; border:1px solid #0f3460; font-size:20px; color:#e0e0e0; } QListWidget::item { padding:10px 8px; border-bottom:1px solid #0f3460; min-height:44px; } QListWidget::item:selected { background:#e94560; color:#ffffff; } """) self._stack = QStackedWidget(self) root = QVBoxLayout(self) root.setContentsMargins(0, 0, 0, 0) root.addWidget(self._stack) # 主菜单页 mw = QWidget() ml = QVBoxLayout(mw) ml.setContentsMargins(50, 24, 50, 24) ml.setSpacing(14) title = QLabel("⚙ 系统设置") title.setAlignment(Qt.AlignCenter) title.setFixedHeight(50) title.setStyleSheet( "color:#e94560; font-size:30px; font-weight:bold;") ml.addWidget(title) sep = QFrame() sep.setFrameShape(QFrame.HLine) sep.setFixedHeight(2) sep.setStyleSheet("background:#0f3460;") ml.addWidget(sep) self._list_widget = QListWidget() self._list_widget.setFocusPolicy(Qt.NoFocus) ml.addWidget(self._list_widget, stretch=1) # 当前值标签放大 self._val_label = QLabel("") self._val_label.setAlignment(Qt.AlignCenter) self._val_label.setFixedHeight(52) self._val_label.setStyleSheet( "color:#e94560; font-size:36px; font-weight:bold;") ml.addWidget(self._val_label) hint = QLabel("↑↓ 移动 ←→ 调值 OK 切换/进入 返回 退出保存") hint.setAlignment(Qt.AlignCenter) hint.setFixedHeight(30) hint.setStyleSheet("color:#555; font-size:16px;") ml.addWidget(hint) self._stack.addWidget(mw) # 回放页 self._playback_panel = PlaybackPanel() self._stack.addWidget(self._playback_panel) self._stack.setCurrentIndex(0) self._menu_items = self._define_menu() def _refresh(self): self._list_widget.clear() for item in self._menu_items: vstr = self._item_val_str(item) self._list_widget.addItem( " {} {}".format(item["name"], vstr)) self._list_widget.setCurrentRow(self._cursor) item = self._menu_items[self._cursor] if item["type"] == "range": self._val_label.setText(str(self._cfg.get(item["key"], ""))) elif item["type"] == "toggle": try: idx = item["values"].index( self._cfg.get(item["key"], item["values"][0])) except ValueError: idx = 0 self._val_label.setText(item["labels"][idx]) else: self._val_label.setText("") def _item_val_str(self, item): if item["type"] == "range": return "[ {} ]".format(self._cfg.get(item["key"], "?")) elif item["type"] == "toggle": try: idx = item["values"].index( self._cfg.get(item["key"], item["values"][0])) except ValueError: idx = 0 return "[ {} ]".format(item["labels"][idx]) return "→" def _enter_playback(self): self._playback_panel.load() self._sub_page = self.SUB_PLAYBACK self._stack.setCurrentIndex(1) def on_key(self, code): if self._sub_page == self.SUB_MAIN: n = len(self._menu_items) if code == KEY_UP: self._cursor = (self._cursor - 1) % n self._refresh() elif code == KEY_DOWN: self._cursor = (self._cursor + 1) % n self._refresh() elif code == KEY_LEFT: item = self._menu_items[self._cursor] if item["type"] == "range": v = self._cfg.get(item["key"], item["min"]) self._cfg[item["key"]] = max( item["min"], v - item["step"]) self._refresh() elif code == KEY_RIGHT: item = self._menu_items[self._cursor] if item["type"] == "range": v = self._cfg.get(item["key"], item["min"]) self._cfg[item["key"]] = min( item["max"], v + item["step"]) self._refresh() elif code == KEY_OK: item = self._menu_items[self._cursor] if item["type"] == "toggle": cur = self._cfg.get(item["key"], item["values"][0]) try: idx = item["values"].index(cur) except ValueError: idx = 0 self._cfg[item["key"]] = \ item["values"][(idx+1) % len(item["values"])] self._refresh() elif item["type"] == "page": item["action"]() elif self._sub_page == self.SUB_PLAYBACK: if code == KEY_BACK: self._playback_panel.cleanup() self._sub_page = self.SUB_MAIN self._stack.setCurrentIndex(0) self._refresh() else: self._playback_panel.on_key(code) def load(self): self._sub_page = self.SUB_MAIN self._stack.setCurrentIndex(0) self._cursor = 0 self._refresh() def save(self): save_config(self._cfg) def get_settings(self): return dict(self._cfg) # ========================================================== # 14. 主窗口 # 真全屏:showFullScreen() + image_label 用 setScaledContents(True) # 渲染线程按真实屏幕尺寸生成图像,label 自动填满无黑边 # ========================================================== class SurroundViewWindow(QMainWindow): PAGE_MAIN = 0 PAGE_SETTINGS = 1 def __init__(self, multi_cam, serial_worker, remote_worker, cfg): super().__init__() self.multi_cam = multi_cam self.serial_worker = serial_worker self.remote_worker = remote_worker self.cfg = cfg self.carousel = CarouselScheduler() self.setWindowTitle("环视系统") # 先全屏,再取实际尺寸传给渲染线程 self.showFullScreen() self.sw = 1024 self.sh = 600 print("[INFO] 实际屏幕尺寸: {}x{}".format(self.sw, self.sh)) self.stack = QStackedWidget() self.setCentralWidget(self.stack) # 主显示页 main_page = QWidget() main_page.setStyleSheet("background:black;") ml = QVBoxLayout(main_page) ml.setContentsMargins(0, 0, 0, 0) ml.setSpacing(0) # setScaledContents(True) 让 pixmap 自动充满 label,无黑边 self.image_label = QLabel() self.image_label.setAlignment(Qt.AlignCenter) self.image_label.setStyleSheet("background:black;") self.image_label.setSizePolicy( QSizePolicy.Expanding, QSizePolicy.Expanding) self.image_label.setScaledContents(True) # 关键 ml.addWidget(self.image_label) self.stack.addWidget(main_page) # 设置页(全屏) self.settings_panel = SettingsPanel(cfg) self.stack.addWidget(self.settings_panel) self.stack.setCurrentIndex(self.PAGE_MAIN) # 工作线程(传入实际屏幕尺寸) self.raw_worker = RawCaptureWorker(multi_cam.caps, multi_cam.names) self.raw_worker.start() self.bird_worker = BirdViewWorker( self.raw_worker, multi_cam.camera_models, multi_cam.birdview, framestep=2) self.bird_worker.start() # 渲染线程用实际屏幕尺寸 self.render_360 = RenderWorker( self.bird_worker, self.raw_worker, serial_worker, self.carousel, cfg, self.sw, self.sh) self.render_360.start() self.render_quad = QuadRenderWorker( self.raw_worker, serial_worker, cfg, self.sw, self.sh) self.render_quad.start() self.record_worker = RecordWorker(self.raw_worker) self.record_worker.start() self.timer = QTimer() self.timer.timeout.connect(self._flush) self.timer.start(16) remote_worker.signals.key_pressed.connect(self._on_key) remote_worker.signals.enter_settings.connect(self._on_enter_settings) remote_worker.signals.exit_settings.connect(self._on_exit_settings) self._apply_display_mode() def _apply_display_mode(self): mode = self.cfg.get("display_mode", MODE_360) self.serial_worker.set_rx_enabled(mode == MODE_360) print("[UI] 显示模式: {}".format("360环视" if mode == MODE_360 else "四分屏")) def _flush(self): if self.stack.currentIndex() != self.PAGE_MAIN: return mode = self.cfg.get("display_mode", MODE_360) qimg = (self.render_quad.get_qimage() if mode == MODE_QUAD else self.render_360.get_qimage()) if qimg is None: return # 直接 setPixmap,setScaledContents(True) 自动填满,无黑边 self.image_label.setPixmap(QPixmap.fromImage(qimg)) def _on_key(self, code): if self.stack.currentIndex() == self.PAGE_SETTINGS: if (code == KEY_BACK and self.settings_panel._sub_page == SettingsPanel.SUB_MAIN): self._on_exit_settings() else: self.settings_panel.on_key(code) def _on_enter_settings(self): if self.stack.currentIndex() == self.PAGE_SETTINGS: return self.settings_panel.load() self.stack.setCurrentIndex(self.PAGE_SETTINGS) print("[UI] 进入设置界面") def _on_exit_settings(self): if self.stack.currentIndex() != self.PAGE_SETTINGS: return self.settings_panel.save() CarouselScheduler.INTERVAL = float( self.cfg.get("carousel_interval", 4)) self._apply_display_mode() self.stack.setCurrentIndex(self.PAGE_MAIN) print("[UI] 退出设置,返回主视图") def closeEvent(self, event): self.timer.stop() self.render_360.stop() self.render_quad.stop() self.bird_worker.stop() self.raw_worker.stop() self.record_worker.stop() self.serial_worker.stop() self.remote_worker.stop() self.multi_cam.running = False for cap in self.multi_cam.caps: cap.release() event.accept() # ========================================================== # 15. 主函数 # ========================================================== def main(): parser = argparse.ArgumentParser(description="环视系统") parser.add_argument("--port", type=str, default="/dev/ttyS0") parser.add_argument("--remote-port", type=str, default="/dev/ttyS3") parser.add_argument("--baudrate", type=int, default=115200) parser.add_argument("--framestep", type=int, default=2) args = parser.parse_args() if True: cfg = load_config() CarouselScheduler.INTERVAL = float(cfg.get("carousel_interval", 4)) serial_worker = SerialWorker(port=args.port, baudrate=args.baudrate) serial_worker.start() print("[INFO] 雷达串口: {} @ {} 发送周期: {}ms".format( args.port, args.baudrate, SEND_INTERVAL_MS)) remote_worker = RemoteWorker(port=args.remote_port, baudrate=args.baudrate) remote_worker.start() print("[INFO] 遥控串口: {} @ {}".format( args.remote_port, args.baudrate)) multi_cam = MultiCameraBirdView() if not multi_cam.running: print("[ERROR] 摄像头初始化失败") serial_worker.stop() remote_worker.stop() return app = QApplication(sys.argv) # 禁用系统缩放干扰,确保全屏像素对齐 app.setAttribute(Qt.AA_DisableHighDpiScaling, True) win = SurroundViewWindow( multi_cam, serial_worker, remote_worker, cfg) win.bird_worker.framestep = args.framestep sys.exit(app.exec_()) elif args.mode == "static": print("[INFO] Static mode not implemented.") if __name__ == "__main__": main()