Files
AVM360/web.py

1859 lines
70 KiB
Python
Raw Normal View History

2026-04-01 14:11:47 +08:00
import cv2
import threading
import sys, os, time
import argparse
import numpy as np
import serial
import json
import glob
import shutil
from collections import deque
from datetime import datetime
from PyQt5.QtWidgets import (QApplication, QLabel, QWidget, QVBoxLayout,
QHBoxLayout, QMainWindow, QStackedWidget,
QFrame, QListWidget, QListWidgetItem,
QSizePolicy, QProgressBar, QGridLayout)
from PyQt5.QtGui import QImage, QPixmap, QFont
from PyQt5.QtCore import Qt, QTimer, pyqtSignal, QObject
from PIL import Image, ImageDraw, ImageFont
try:
from surround_view import FisheyeCameraModel, BirdView
import surround_view.param_settings as settings
except ImportError as e:
print("[ERROR] Cannot import surround_view modules: {}".format(e))
sys.exit(1)
# ==========================================================
# 遥控键码
# ==========================================================
KEY_UP = 0x1B
KEY_DOWN = 0x1A
KEY_LEFT = 0x04
KEY_RIGHT = 0x06
KEY_BACK = 0x12
KEY_OK = 0x05
# ==========================================================
# 录制参数
# ==========================================================
VIDEO_DIR = "/videos"
SEGMENT_SECS = 180
RECORD_FPS = 10
CELL_W = 640
CELL_H = 360
GRID_W = CELL_W * 2
GRID_H = CELL_H * 2
DISK_WARN_PCT = 90
# ==========================================================
# 显示参数 自适应屏幕大小显示 兼容7寸 10寸
# ==========================================================
SCREEN_W = 1024
SCREEN_H = 600
BIRD_W = SCREEN_W // 3
FRONT_W = SCREEN_W - BIRD_W
DIR_CN = {"front":"前方","left":"左方","back":"后方","right":"右方"}
WARN_COLOR = (255, 80, 80)
SAFE_COLOR = (100, 255, 100)
MODE_360 = 0
MODE_QUAD = 1
SEND_INTERVAL_MS = 100
# ==========================================================
# 配置
# ==========================================================
CONFIG_PATH = os.path.join(os.getcwd(), "surround_config.json")
DEFAULT_CONFIG = {
"carousel_interval": 4,
"brightness": 50,
"radar_unit": 0,
"show_radar": 1,
"radar_alarm_dist": 50,
"display_mode": MODE_360,
}
def load_config():
if os.path.exists(CONFIG_PATH):
try:
with open(CONFIG_PATH, "r") as f:
cfg = json.load(f)
for k, v in DEFAULT_CONFIG.items():
cfg.setdefault(k, v)
return cfg
except Exception:
pass
return dict(DEFAULT_CONFIG)
def save_config(cfg):
try:
with open(CONFIG_PATH, "w") as f:
json.dump(cfg, f, indent=2, ensure_ascii=False)
print("[CONFIG] 已保存: {}".format(CONFIG_PATH))
except Exception as e:
print("[CONFIG ERROR] {}".format(e))
# ==========================================================
# 磁盘管理
# ==========================================================
def get_disk_usage_pct(path):
try:
usage = shutil.disk_usage(path)
return usage.used * 100 // usage.total
except Exception:
return 0
def get_disk_info_str(path):
try:
usage = shutil.disk_usage(path)
used_gb = usage.used / 1024**3
total_gb = usage.total / 1024**3
pct = usage.used * 100 // usage.total
return "磁盘: {:.1f}GB / {:.1f}GB ({}%)".format(
used_gb, total_gb, pct), pct
except Exception:
return "磁盘: 未知", 0
def cleanup_old_videos():
files = []
for pat in ("rec_*.mp4", "rec_*.avi"):
files.extend(glob.glob(os.path.join(VIDEO_DIR, pat)))
files = sorted(files)
while files and get_disk_usage_pct(VIDEO_DIR) >= DISK_WARN_PCT:
oldest = files.pop(0)
try:
os.remove(oldest)
print("[清理] 已删除: {}".format(oldest))
except Exception as e:
print("[清理 ERROR] {}".format(e))
break
# ==========================================================
# 1. 字体管理
# ==========================================================
class FontManager:
_fonts = {}
_lock = threading.Lock()
@classmethod
def get(cls, size=20):
with cls._lock:
if size not in cls._fonts:
candidates = [
"/usr/share/fonts/truetype/wqy/wqy-zenhei.ttc",
"/usr/share/fonts/truetype/wqy/wqy-microhei.ttc",
"/usr/share/fonts/truetype/droid/DroidSansFallbackFull.ttf",
"/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
"/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
]
loaded = False
for p in candidates:
if os.path.exists(p):
try:
cls._fonts[size] = ImageFont.truetype(p, size)
loaded = True
break
except Exception:
continue
if not loaded:
cls._fonts[size] = ImageFont.load_default()
return cls._fonts[size]
# ==========================================================
# 2. 中文叠加
# ==========================================================
def add_texts_to_pil(pil_image, text_list):
draw = ImageDraw.Draw(pil_image)
for item in text_list:
if len(item) == 5:
x, y, text, color, size = item
font = FontManager.get(size)
else:
x, y, text, color = item
font = FontManager.get(20)
draw.text((x+1, y+1), text, font=font, fill=(0, 0, 0))
draw.text((x, y ), text, font=font, fill=color)
# ==========================================================
# 3. 串口1
# ==========================================================
class SerialWorker(threading.Thread):
def __init__(self, port="/dev/ttyS0", baudrate=115200):
super().__init__(daemon=True)
self.port = port
self.baudrate = baudrate
self.running = True
self._rx_lock = threading.Lock()
self._radar_dist = 0
self._stopped = False
self._reverse = False
self._active_dirs = []
self._tx_lock = threading.Lock()
self._tx_front = 0x00
self._tx_left = 0x00
self._tx_back = 0x00
self._tx_right = 0x00
self._tx_radar_thr = 0x00
self._rx_enabled = True
def set_rx_enabled(self, enabled):
self._rx_enabled = enabled
def update_send(self, front, left, back, right, radar_thr):
with self._tx_lock:
self._tx_front = front & 0xFF
self._tx_left = left & 0xFF
self._tx_back = back & 0xFF
self._tx_right = right & 0xFF
self._tx_radar_thr = radar_thr & 0xFF
def _build_tx_frame(self):
with self._tx_lock:
b1 = self._tx_front
b2 = self._tx_left
b3 = self._tx_back
b4 = self._tx_right
b5 = 0x00
b6 = self._tx_radar_thr
chk = (b1 + b2 + b3 + b4 + b5 + b6) & 0xFF
return bytes([0x55, b1, b2, b3, b4, b5, b6, chk, 0xAA])
def get_state(self):
with self._rx_lock:
2026-04-01 17:27:55 +08:00
return list(self._active_dirs), self._radar_dist, self._stopped, self._reverse #add 返回倒车状态
2026-04-01 14:11:47 +08:00
def _parse_rx_frame(self, frame):
2026-04-07 14:11:33 +08:00
2026-04-01 17:27:55 +08:00
if len(frame) != 10 or frame[0] != 0x55 or frame[9] != 0xAA:
2026-04-01 14:11:47 +08:00
return
front = frame[1]; left = frame[2]
back = frame[3]; right = frame[4]
radar = frame[5]; stop = frame[6]
2026-04-01 17:27:55 +08:00
reverse = frame[7] # 倒车信号 00=正常 01=倒车
2026-04-01 14:11:47 +08:00
active = []
for name, val in [("front", front), ("left", left),
2026-04-01 17:27:55 +08:00
("back", back), ("right", right)]:
2026-04-01 14:11:47 +08:00
if val != 0x00:
active.append(name)
2026-04-01 17:27:55 +08:00
2026-04-01 14:11:47 +08:00
with self._rx_lock:
self._active_dirs = active
self._radar_dist = radar
self._stopped = (stop != 0x00)
2026-04-01 17:27:55 +08:00
self._reverse = (reverse == 0x01) # add 倒车信号
2026-04-01 14:11:47 +08:00
def run(self):
try:
ser = serial.Serial(
port=self.port, baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
stopbits=serial.STOPBITS_ONE,
parity=serial.PARITY_NONE,
timeout=0.05)
except Exception as e:
print("[串口1 ERROR] 无法打开: {}".format(e))
return
def _send_loop():
interval = SEND_INTERVAL_MS / 1000.0
while self.running:
try:
frame = self._build_tx_frame()
ser.write(frame)
except Exception as e:
if self.running:
print("[串口1 TX ERROR] {}".format(e))
time.sleep(interval)
tx_thread = threading.Thread(target=_send_loop, daemon=True)
tx_thread.start()
buf = bytearray()
while self.running:
try:
if not self._rx_enabled:
time.sleep(0.05)
buf.clear()
continue
byte = ser.read(1)
if not byte:
continue
b = byte[0]
if len(buf) == 0:
if b == 0x55:
buf.append(b)
continue
buf.append(b)
2026-04-01 17:27:55 +08:00
if len(buf) == 10:
2026-04-01 14:11:47 +08:00
self._parse_rx_frame(bytes(buf))
buf.clear()
2026-04-01 17:27:55 +08:00
elif len(buf) > 10:
2026-04-01 14:11:47 +08:00
buf.clear()
except Exception as e:
if self.running:
print("[串口1 RX ERROR] {}".format(e))
time.sleep(0.1)
ser.close()
def stop(self):
self.running = False
# ==========================================================
# 4. 串口2红外遥控
# ==========================================================
REMOTE_HEADER = bytes([0x01, 0x52, 0xF0, 0xFF, 0x00, 0x86, 0x6B])
REMOTE_FRAME_LEN = 11
class RemoteSignals(QObject):
key_pressed = pyqtSignal(int)
enter_settings = pyqtSignal()
exit_settings = pyqtSignal()
class RemoteWorker(threading.Thread):
def __init__(self, port="/dev/ttyS3", baudrate=115200):
super().__init__(daemon=True)
self.port = port
self.baudrate = baudrate
self.running = True
self.signals = RemoteSignals()
def _parse_frame(self, frame):
if len(frame) != REMOTE_FRAME_LEN:
return
if frame[:7] != REMOTE_HEADER:
return
code = frame[7]
self.signals.key_pressed.emit(code)
if code == KEY_OK:
self.signals.enter_settings.emit()
elif code == KEY_BACK:
self.signals.exit_settings.emit()
def run(self):
try:
ser = serial.Serial(port=self.port, baudrate=self.baudrate,
bytesize=serial.EIGHTBITS,
stopbits=serial.STOPBITS_ONE,
parity=serial.PARITY_NONE, timeout=1)
except Exception as e:
print("[串口2 ERROR] {}".format(e))
return
buf = bytearray()
while self.running:
try:
byte = ser.read(1)
if not byte:
continue
b = byte[0]
if len(buf) == 0:
if b == 0x01:
buf.append(b)
continue
buf.append(b)
if len(buf) <= 7:
if buf[len(buf)-1] != REMOTE_HEADER[len(buf)-1]:
buf.clear()
continue
if len(buf) == REMOTE_FRAME_LEN:
self._parse_frame(bytes(buf))
buf.clear()
elif len(buf) > REMOTE_FRAME_LEN:
buf.clear()
except Exception as e:
if self.running:
print("[串口2 ERROR] {}".format(e))
time.sleep(0.1)
ser.close()
def stop(self):
self.running = False
# ==========================================================
# 5. 轮播调度器
# ==========================================================
class CarouselScheduler:
INTERVAL = 4.0
def __init__(self):
self._queue = []
self._cur_dir = None
self._cur_start = 0.0
def update(self, active_dirs):
if not active_dirs:
self._queue = []
self._cur_dir = None
return None, 0
for d in active_dirs:
if d not in self._queue:
self._queue.append(d)
self._queue = [d for d in self._queue
if d in active_dirs or d == self._cur_dir]
if self._cur_dir is None or self._cur_dir not in self._queue:
self._cur_dir = self._queue[0] if self._queue else None
self._cur_start = time.time()
self._skip_gone(active_dirs)
if time.time() - self._cur_start >= self.INTERVAL:
self._advance(active_dirs)
remaining = max(0.0, self.INTERVAL - (time.time() - self._cur_start))
return self._cur_dir, remaining
def _advance(self, active_dirs):
if not self._queue:
self._cur_dir = None
return
try:
idx = self._queue.index(self._cur_dir)
except ValueError:
idx = -1
self._cur_dir = self._queue[(idx+1) % len(self._queue)]
self._cur_start = time.time()
self._skip_gone(active_dirs)
def _skip_gone(self, active_dirs):
for _ in range(len(self._queue)):
if self._cur_dir in active_dirs:
break
self._advance(active_dirs)
# ==========================================================
# 6. 原始帧采集
# ==========================================================
class RawCaptureWorker(threading.Thread):
def __init__(self, caps, names):
super().__init__(daemon=True)
self.caps = caps
self.names = names
self.running = True
self.queues = {name: deque(maxlen=2) for name in names}
def get_frame(self, name):
q = self.queues.get(name)
if q:
try: return q[-1]
except: pass
return None
def run(self):
while self.running:
for cap, name in zip(self.caps, self.names):
ret, frame = cap.read()
if ret and frame is not None:
self.queues[name].append(frame)
def stop(self):
self.running = False
# ==========================================================
# 7. 鸟瞰处理
# ==========================================================
class BirdViewWorker(threading.Thread):
def __init__(self, raw_worker, camera_models, birdview, framestep=2):
super().__init__(daemon=True)
self.raw_worker = raw_worker
self.models = camera_models
self.names = raw_worker.names
self.birdview = birdview
self.framestep = framestep
self.running = True
self.frame_idx = 0
self._rq = deque(maxlen=2)
self._lock = threading.Lock()
def get_bird(self):
with self._lock:
try: return self._rq[-1]
except: return None
def run(self):
while self.running:
self.frame_idx += 1
if self.frame_idx % self.framestep != 0:
time.sleep(0.005)
continue
frames = []
for name, model in zip(self.names, self.models):
frame = self.raw_worker.get_frame(name)
if frame is None:
break
try:
frames.append(
model.flip(model.project(model.undistort(frame))))
except Exception as e:
print("[BIRD ERROR] {}".format(e))
break
if len(frames) != 4:
time.sleep(0.01)
continue
try:
self.birdview.update_frames(frames)
self.birdview.stitch_all_parts()
self.birdview.make_white_balance()
self.birdview.copy_car_image()
bird = self.birdview.image.copy()
with self._lock:
self._rq.append(bird)
except Exception as e:
print("[BIRDVIEW ERROR] {}".format(e))
time.sleep(0.01)
def stop(self):
self.running = False
# ==========================================================
# 8. 四路录制线程
# ==========================================================
class RecordWorker(threading.Thread):
ORDER = ["front", "left", "back", "right"]
CODEC_CANDIDATES = [
("mp4v", ".mp4"),
("MJPG", ".avi"),
("XVID", ".avi"),
]
def __init__(self, raw_worker):
super().__init__(daemon=True)
self.raw_worker = raw_worker
self.running = True
self._writer = None
self._seg_start = 0.0
self._cur_path = ""
self._ext = ".mp4"
self._fourcc = cv2.VideoWriter_fourcc(*"mp4v")
os.makedirs(VIDEO_DIR, exist_ok=True)
self._probe_codec()
def _probe_codec(self):
test_base = os.path.join(VIDEO_DIR, "_probe_test")
for codec, ext in self.CODEC_CANDIDATES:
test_path = test_base + ext
try:
fourcc = cv2.VideoWriter_fourcc(*codec)
w = cv2.VideoWriter(test_path, fourcc, RECORD_FPS,
(GRID_W, GRID_H))
if not w.isOpened():
w.release()
continue
blank = np.zeros((GRID_H, GRID_W, 3), dtype=np.uint8)
w.write(blank)
w.release()
if os.path.exists(test_path) and \
os.path.getsize(test_path) > 500:
self._fourcc = fourcc
self._ext = ext
print("[录制] 编码器: {} 格式: {}".format(codec, ext))
try: os.remove(test_path)
except: pass
return
try: os.remove(test_path)
except: pass
except Exception as e:
print("[录制] 编码器 {} 不可用: {}".format(codec, e))
print("[录制] 使用默认 mp4v")
def _new_segment(self):
if self._writer:
self._writer.release()
print("[录制] 已保存: {}".format(self._cur_path))
cleanup_old_videos()
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
self._cur_path = os.path.join(
VIDEO_DIR, "rec_{}{}".format(ts, self._ext))
self._writer = cv2.VideoWriter(
self._cur_path, self._fourcc, RECORD_FPS, (GRID_W, GRID_H))
if not self._writer.isOpened():
print("[录制 ERROR] 无法创建: {}".format(self._cur_path))
self._writer = None
self._seg_start = time.time()
print("[录制] 新段落: {}".format(self._cur_path))
def run(self):
self._new_segment()
frame_interval = 1.0 / RECORD_FPS
next_frame_time = time.time()
while self.running:
now = time.time()
if now - self._seg_start >= SEGMENT_SECS:
self._new_segment()
next_frame_time = time.time()
continue
wait = next_frame_time - now
if wait > 0:
time.sleep(wait)
next_frame_time += frame_interval
if self._writer is None:
self._new_segment()
continue
cells = {}
for name in self.ORDER:
f = self.raw_worker.get_frame(name)
cells[name] = (
cv2.resize(f, (CELL_W, CELL_H),
interpolation=cv2.INTER_LINEAR)
if f is not None
else np.zeros((CELL_H, CELL_W, 3), dtype=np.uint8)
)
top = np.hstack([cells["front"], cells["left"]])
bottom = np.hstack([cells["back"], cells["right"]])
grid = np.vstack([top, bottom])
for label, x, y in [
("Front", 10, 10),
("Left", CELL_W+10, 10),
("Back", 10, CELL_H+10),
("Right", CELL_W+10, CELL_H+10),
]:
cv2.putText(grid, label, (x, y+28),
cv2.FONT_HERSHEY_SIMPLEX, 1.0,
(0, 255, 100), 2)
self._writer.write(grid)
def stop(self):
self.running = False
if self._writer:
self._writer.release()
print("[录制] 最终段落: {}".format(self._cur_path))
@staticmethod
def list_videos():
files = []
for pat in ("rec_*.mp4", "rec_*.avi"):
files.extend(glob.glob(os.path.join(VIDEO_DIR, pat)))
return sorted(files, reverse=True)
@staticmethod
def parse_ts(path):
base = os.path.basename(path)
for rep in ("rec_", ".mp4", ".avi"):
base = base.replace(rep, "")
try:
return datetime.strptime(base, "%Y%m%d_%H%M%S").strftime(
"%Y-%m-%d %H:%M:%S")
except:
return base
@staticmethod
def file_size_str(path):
try:
sz = os.path.getsize(path)
if sz >= 1024**3: return " {:.1f}GB".format(sz / 1024**3)
if sz >= 1024**2: return " {:.1f}MB".format(sz / 1024**2)
return " {:.0f}KB".format(sz / 1024)
except:
return " ??"
# ==========================================================
# 9. 多摄像头初始化
# ==========================================================
class MultiCameraBirdView:
def __init__(self):
self.running = True
self.names = settings.camera_names
self.yamls = [
os.path.join(os.getcwd(), "yaml", n + ".yaml")
for n in self.names
]
try:
self.camera_models = [
FisheyeCameraModel(f, n)
for f, n in zip(self.yamls, self.names)
]
except Exception as e:
print("[ERROR] {}".format(e))
self.running = False
return
self.caps = []
self.which_cameras = {"front":0, "left":1, "back":2, "right":3}
for name in self.names:
cap_id = self.which_cameras.get(name, 0)
cap = cv2.VideoCapture(cap_id, cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, 1280)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 720)
if not cap.isOpened():
print("[ERROR] 无法打开摄像头 {} ID:{}".format(name, cap_id))
self.running = False
return
self.caps.append(cap)
self.birdview = BirdView()
self._init_weights()
def _init_weights(self):
try:
imgs = [
os.path.join(os.getcwd(), "images", n + ".png")
for n in self.names
]
frames = []
for p, model in zip(imgs, self.camera_models):
img = cv2.imread(p)
if img is None:
continue
frames.append(
model.flip(model.project(model.undistort(img))))
if len(frames) == 4:
self.birdview.get_weights_and_masks(frames)
print("[INFO] 权重初始化完成")
else:
print("[WARNING] 静态图不完整")
except Exception as e:
print("[ERROR] 权重初始化失败: {}".format(e))
# ==========================================================
# 10. 360渲染线程全屏轮播模式 + 画面裁切)
# ==========================================================
class RenderWorker(threading.Thread):
TARGET_FPS = 30
FRAME_DELAY = 1.0 / TARGET_FPS
# 画面裁切比例配置左右裁切0-0.5
CROP_RATIO = {
"front": 0.15, # 前视裁切8%
"left": 0.15, # 左视裁切10%
"back": 0.15, # 后视裁切8%
"right": 0.15, # 右视裁切10%
}
def __init__(self, bird_worker, raw_worker, serial_worker, carousel,
cfg_ref, screen_w, screen_h):
super().__init__(daemon=True)
self.bird_worker = bird_worker
self.raw_worker = raw_worker
self.serial_worker = serial_worker
self.carousel = carousel
self.cfg_ref = cfg_ref
self.sw = screen_w
self.sh = screen_h
self.running = True
self._out_q = deque(maxlen=2)
self._lock = threading.Lock()
self._last_bird = None
self._fullscreen_mode = False # 全屏轮播模式标志
self._last_active_dirs = [] # 上次检测到的人方向
for sz in (18, 20, 24, 28, 32, 36):
FontManager.get(sz)
def crop_image(self, img, crop_ratio):
"""
对图像进行左右裁切
img: 输入图像 (h, w, c)
crop_ratio: 裁切比例 (0-0.5)表示左右各裁切的比例
返回: 裁切后的图像
"""
if crop_ratio <= 0 or crop_ratio >= 0.5:
return img
h, w = img.shape[:2]
crop_width = int(w * crop_ratio)
# 左右裁切
cropped = img[:, crop_width:w-crop_width]
return cropped
def get_qimage(self):
with self._lock:
try: return self._out_q[-1]
except: return None
def run(self):
while self.running:
t0 = time.time()
bird = self.bird_worker.get_bird()
if bird is not None:
self._last_bird = bird
if self._last_bird is None:
time.sleep(0.02)
continue
try:
sw = self.sw
sh = self.sh
2026-04-01 17:27:55 +08:00
# 获取状态 add 修改解包状态
active_dirs, radar_dist, is_stopped, is_reverse = \
2026-04-01 14:11:47 +08:00
self.serial_worker.get_state()
display_dir, remaining = self.carousel.update(active_dirs)
# 决定显示模式
# 当有人时active_dirs非空进入全屏轮播模式
2026-04-01 17:27:55 +08:00
# 决定显示模式(倒车优先级最高)
if is_reverse:
display_mode = "reverse"
elif len(active_dirs) > 0:
display_mode = "fullscreen"
else:
display_mode = "normal"
# self._fullscreen_mode = (display_mode == "fullscreen")
2026-04-01 14:11:47 +08:00
cfg = self.cfg_ref
brightness = cfg.get("brightness", 50)
2026-04-01 17:27:55 +08:00
if display_mode == "reverse":
# ==========================================
# 倒车模式:左侧鸟瞰 + 右侧后视摄像头
# ==========================================
bw = sw // 3
fw = sw - bw
back_frame = self.raw_worker.get_frame("back")
crop_ratio = self.CROP_RATIO.get("back", 0)
if back_frame is not None and crop_ratio > 0:
back_frame = self.crop_image(back_frame, crop_ratio)
bird_part = cv2.resize(self._last_bird, (bw, sh),
interpolation=cv2.INTER_LINEAR)
if back_frame is not None:
back_part = cv2.resize(back_frame, (fw, sh),
interpolation=cv2.INTER_LINEAR)
else:
back_part = np.zeros((sh, fw, 3), dtype=np.uint8)
2026-04-07 14:11:33 +08:00
# --------------------------
# 新增:绘制红/黄/绿倒车辅助线
# --------------------------
if back_frame is not None:
h, w = back_part.shape[:2]
h=h//2 # 只在下半部分绘制辅助线
# ==========================================
# 倒车线参数 - 梯形向远方伸长版本
# ==========================================
# 底部(车尾)两个端点 - 宽
bottom_left = (int(w * 0.10), h+h) # 左侧起始10%
bottom_right = (int(w * 0.90), h+h) # 右侧结束90%
# 顶部(远方)- 更窄,形成更强的透视感
# 绿色(最远,安全区)- 伸长到更高更窄的位置
green_top_left = (int(w * 0.35), int(h * 0.12+h)) # 更窄更远
green_top_right = (int(w * 0.65), int(h * 0.12+h))
# 黄色(中间,警示区)
yellow_top_left = (int(w * 0.28), int(h * 0.35+h)) # 稍宽
yellow_top_right = (int(w * 0.72), int(h * 0.35+h))
# 红色(最近,危险区)
red_top_left = (int(w * 0.21), int(h * 0.60+h)) # 更宽
red_top_right = (int(w * 0.79), int(h * 0.60+h))
# 颜色定义BGR格式
color_green = (0, 0, 255) # 绿色
color_red = (0, 255, 255) # 黄色
color_yellow = (0, 255, 0) # 红色
# 绘制梯形(填充半透明)
overlay = back_part.copy()
alpha = 0.2 # 透明度
# 绘制绿色区域(最远,安全区)
green_pts = np.array([bottom_left, green_top_left, green_top_right, bottom_right], np.int32)
cv2.fillPoly(overlay, [green_pts], color_green)
# 绘制黄色区域(中间,警示区)
yellow_pts = np.array([green_top_left, yellow_top_left, yellow_top_right, green_top_right], np.int32)
cv2.fillPoly(overlay, [yellow_pts], color_yellow)
# 绘制红色区域(最近,危险区)
red_pts = np.array([yellow_top_left, red_top_left, red_top_right, yellow_top_right], np.int32)
cv2.fillPoly(overlay, [red_pts], color_red)
# 绘制左右车道线(加强透视感)
# 左线
cv2.line(overlay, bottom_left, green_top_left, (255, 255, 255), 2)
# 右线
cv2.line(overlay, bottom_right, green_top_right, (255, 255, 255), 2)
# 混合半透明效果
back_part = cv2.addWeighted(overlay, alpha, back_part, 1 - alpha, 0)
# --------------------------
# 原有亮度调节逻辑(保留)
# --------------------------
2026-04-01 17:27:55 +08:00
if brightness != 50:
alpha_v = brightness / 50.0
back_part = cv2.convertScaleAbs(back_part, alpha=alpha_v, beta=0)
display = np.hstack((bird_part, back_part))
rgb = cv2.cvtColor(display, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(rgb)
texts = []
# 倒车标题
texts.append((bw + 10, 10, "倒车模式 ◀ 后视", (255, 180, 0), 32))
# 雷达信息
if cfg.get("show_radar", 1):
unit_str = "厘米" if cfg.get("radar_unit", 0) else "分米"
dist_val = (radar_dist * 10 if cfg.get("radar_unit", 0)
else radar_dist)
alarm_d = cfg.get("radar_alarm_dist", 50)
rc = ((255, 80, 80) if dist_val <= alarm_d else (180, 220, 255))
2026-04-07 14:11:33 +08:00
texts.append((bw + 10, 48,
2026-04-01 17:27:55 +08:00
f"雷达: {dist_val} {unit_str} 报警值: {alarm_d}{unit_str}",
2026-04-07 14:11:33 +08:00
rc, 26))
2026-04-01 17:27:55 +08:00
# 停机状态
stop_text = "已停机" if is_stopped else "运行中"
stop_color = ((255, 100, 100) if is_stopped else (100, 255, 100))
2026-04-07 14:11:33 +08:00
texts.append((bw + 500, 20, f"状态: {stop_text}", stop_color, 30))
2026-04-01 17:27:55 +08:00
# 时间戳
2026-04-07 14:11:33 +08:00
# texts.append((bw + 10, sh - 26,
# time.strftime("%Y-%m-%d %H:%M:%S"), (160, 160, 160), 20))
2026-04-01 17:27:55 +08:00
elif display_mode == "fullscreen":
2026-04-01 14:11:47 +08:00
# ==========================================
# 全屏轮播模式:显示有人方向的画面
# ==========================================
# 获取当前轮播方向的画面
if display_dir:
frame = self.raw_worker.get_frame(display_dir)
else:
# 如果没有轮播方向,使用第一个有人方向
frame = self.raw_worker.get_frame(active_dirs[0]) if active_dirs else None
# 如果获取失败,使用前视作为备用
if frame is None:
frame = self.raw_worker.get_frame("front")
if frame is not None:
2026-04-01 17:27:55 +08:00
# 应用左右裁切
2026-04-01 14:11:47 +08:00
crop_ratio = self.CROP_RATIO.get(display_dir or active_dirs[0] if active_dirs else "front", 0)
if crop_ratio > 0:
frame = self.crop_image(frame, crop_ratio)
# 强制拉伸到全屏尺寸,无黑边
display = cv2.resize(frame, (sw, sh), interpolation=cv2.INTER_LINEAR)
# 亮度调节
if brightness != 50:
alpha = brightness / 50.0
display = cv2.convertScaleAbs(display, alpha=alpha, beta=0)
else:
# 无画面时显示黑色背景
display = np.zeros((sh, sw, 3), dtype=np.uint8)
# 转换为PIL图像添加文字
rgb = cv2.cvtColor(display, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(rgb)
# 添加文字信息
texts = []
# 当前显示的方向
current_dir = display_dir or (active_dirs[0] if active_dirs else "front")
dir_cn = DIR_CN.get(current_dir, current_dir)
# 标题 - 全屏轮播模式
# texts.append((10, 10, "全景监控 - 全屏轮播", (0, 255, 100), 28))
# 显示当前方向和人员信息
texts.append((10, 50, f"检测到人员: {dir_cn}", WARN_COLOR, 36))
# 显示所有检测到的方向
# if len(active_dirs) > 1:
# dirs_str = "、".join([DIR_CN.get(d, d) for d in active_dirs])
# texts.append((10, 92, f"所有方向: {dirs_str}", (255, 200, 0), 24))
# # 显示轮播信息
# queue_str = " → ".join([DIR_CN.get(d, d) for d in self.carousel._queue])
# texts.append((10, 124, f"轮播队列: {queue_str}", (200, 200, 200), 22))
# texts.append((10, 154, f"当前: {dir_cn} 倒计时: {remaining:.1f}s", (200, 200, 200), 22))
# else:
# texts.append((10, 92, f"仅 {dir_cn} 方向有人", (200, 200, 200), 24))
# 雷达信息
if cfg.get("show_radar", 1):
unit_str = "厘米" if cfg.get("radar_unit", 0) else "分米"
dist_val = (radar_dist * 10 if cfg.get("radar_unit", 0) else radar_dist)
alarm_d = cfg.get("radar_alarm_dist", 50)
rc = ((255, 80, 80) if dist_val <= alarm_d else (180, 220, 255))
texts.append((10, sh-108, f"雷达: {dist_val} {unit_str} 报警值: {alarm_d}{unit_str}", rc, 22))
# 状态信息
stop_text = "已停机" if is_stopped else "运行中"
stop_color = ((255, 100, 100) if is_stopped else (100, 255, 100))
texts.append((10, sh-78, f"状态: {stop_text}", stop_color, 22))
# 时间戳
texts.append((10, sh-52, time.strftime("%Y-%m-%d %H:%M:%S"), (160, 160, 160), 20))
# 提示信息
# texts.append((sw-250, sh-30, "天煤机电科技", (100, 100, 100), 16))
else:
# ==========================================
# 正常模式:显示鸟瞰+前视
# ==========================================
bw = sw // 3
fw = sw - bw
# 获取画面
right_frame = (self.raw_worker.get_frame(display_dir) if display_dir else None)
if right_frame is None:
right_frame = self.raw_worker.get_frame("front")
if right_frame is None:
time.sleep(0.01)
continue
2026-04-01 17:27:55 +08:00
# 对右侧画面应用裁切
2026-04-01 14:11:47 +08:00
crop_ratio = self.CROP_RATIO.get(display_dir or "front", 0)
if crop_ratio > 0:
right_frame = self.crop_image(right_frame, crop_ratio)
# 强制拉伸到精确尺寸,无黑边
bird_part = cv2.resize(self._last_bird, (bw, sh), interpolation=cv2.INTER_LINEAR)
front_part = cv2.resize(right_frame, (fw, sh), interpolation=cv2.INTER_LINEAR)
# 亮度调节
if brightness != 50:
alpha = brightness / 50.0
front_part = cv2.convertScaleAbs(front_part, alpha=alpha, beta=0)
display = np.hstack((bird_part, front_part))
rgb = cv2.cvtColor(display, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(rgb)
# 添加文字信息
texts = [(10, 10, "鸟瞰视图", (0, 255, 100), 24)]
texts.append((bw+10, 10, "周围安全", SAFE_COLOR, 28))
# if display_dir:
# dir_cn = DIR_CN.get(display_dir, display_dir)
# texts.append((bw+10, 10, f"{dir_cn} 检测到人员", WARN_COLOR, 36))
# if len(active_dirs) > 1:
# queue_cn = " → ".join(DIR_CN.get(d, d) for d in self.carousel._queue)
# texts.append((bw+10, 58, f"轮播: {queue_cn}", (255, 200, 0), 22))
# texts.append((bw+10, 88, f"当前: {dir_cn} 倒计时: {remaining:.1f}s", (200, 200, 200), 22))
# else:
# texts.append((bw+10, 58, f"仅 {dir_cn} 有人", (200, 200, 200), 22))
# else:
# texts.append((bw+10, 10, "前视监控 (周围无人)", SAFE_COLOR, 28))
# 雷达信息
if cfg.get("show_radar", 1):
unit_str = "厘米" if cfg.get("radar_unit", 0) else "分米"
dist_val = (radar_dist * 10 if cfg.get("radar_unit", 0) else radar_dist)
alarm_d = cfg.get("radar_alarm_dist", 50)
rc = ((255, 80, 80) if dist_val <= alarm_d else (180, 220, 255))
texts.append((bw+10, sh-80, f"雷达: {dist_val} {unit_str} 阈值: {alarm_d}{unit_str}", rc, 22))
# 状态信息
stop_text = "已停机" if is_stopped else "运行中"
stop_color = ((255, 100, 100) if is_stopped else (100, 255, 100))
texts.append((bw+10, sh-52, f"状态: {stop_text}", stop_color, 22))
texts.append((bw+380, 10, time.strftime("%Y-%m-%d %H:%M:%S"), (160, 160, 160), 28))
# 添加文字到图像
add_texts_to_pil(pil_img, texts)
arr = np.ascontiguousarray(np.array(pil_img))
h, w, ch = arr.shape
qimg = QImage(arr.tobytes(), w, h, w * ch, QImage.Format_RGB888)
with self._lock:
self._out_q.append(qimg)
# 更新发送内容
dir_map = {"front": 0, "left": 1, "back": 2, "right": 3}
presence = [0x00, 0x00, 0x00, 0x00]
for d in active_dirs:
idx = dir_map.get(d)
if idx is not None:
presence[idx] = 0x01
radar_thr = cfg.get("radar_alarm_dist", 50)
self.serial_worker.update_send(
presence[0], presence[1], presence[2], presence[3],
radar_thr)
except Exception as e:
print("[RENDER ERROR] {}".format(e))
import traceback
traceback.print_exc()
elapsed = time.time() - t0
sl = self.FRAME_DELAY - elapsed
if sl > 0:
time.sleep(sl)
def update_crop_ratio(self, camera_name, ratio):
"""动态更新裁切比例"""
if camera_name in self.CROP_RATIO:
self.CROP_RATIO[camera_name] = max(0, min(0.5, ratio))
print(f"[CROP] {camera_name} 裁切比例更新为: {ratio}")
def stop(self):
self.running = False
# ==========================================================
# 11. 四分屏渲染线程(带画面裁切)
# ==========================================================
class QuadRenderWorker(threading.Thread):
TARGET_FPS = 30
FRAME_DELAY = 1.0 / TARGET_FPS
ORDER = [("front",""), ("left",""),
("back", ""), ("right","")]
# 裁切比例配置左右裁切百分比0-0.5之间)
# 例如 0.1 表示左右各裁切10%
CROP_RATIO = {
"front": 0.15, # 前视左右各裁切8%
"left": 0.15, # 左视左右各裁切10%
"back": 0.15, # 后视左右各裁切8%
"right": 0.15, # 右视左右各裁切10%
}
def __init__(self, raw_worker, serial_worker, cfg_ref, screen_w, screen_h):
super().__init__(daemon=True)
self.raw_worker = raw_worker
self.serial_worker = serial_worker
self.cfg_ref = cfg_ref
self.sw = screen_w
self.sh = screen_h
self.running = True
self._out_q = deque(maxlen=2)
self._lock = threading.Lock()
for sz in (24, 28, 32):
FontManager.get(sz)
def crop_image(self, img, crop_ratio):
"""
对图像进行左右裁切
img: 输入图像 (h, w, c)
crop_ratio: 裁切比例 (0-0.5)表示左右各裁切的比例
返回: 裁切后的图像
"""
if crop_ratio <= 0 or crop_ratio >= 0.5:
return img
h, w = img.shape[:2]
crop_width = int(w * crop_ratio)
# 左右裁切
cropped = img[:, crop_width:w-crop_width]
return cropped
def get_qimage(self):
with self._lock:
try: return self._out_q[-1]
except: return None
def run(self):
while self.running:
t0 = time.time()
try:
sw = self.sw
sh = self.sh
cw = sw // 2
ch = sh // 2
cells = []
labels_info = []
for (name, label_cn) in self.ORDER:
f = self.raw_worker.get_frame(name)
if f is not None:
2026-04-01 17:27:55 +08:00
# 先裁切,再拉伸
2026-04-01 14:11:47 +08:00
crop_ratio = self.CROP_RATIO.get(name, 0)
if crop_ratio > 0:
f = self.crop_image(f, crop_ratio)
# 强制拉伸到格子尺寸,无黑边
cell = cv2.resize(f, (cw, ch),
interpolation=cv2.INTER_LINEAR)
brightness = self.cfg_ref.get("brightness", 50)
if brightness != 50:
alpha = brightness / 50.0
cell = cv2.convertScaleAbs(
cell, alpha=alpha, beta=0)
else:
cell = np.zeros((ch, cw, 3), dtype=np.uint8)
cells.append(cell)
labels_info.append(label_cn)
top = np.hstack([cells[0], cells[1]])
bottom = np.hstack([cells[2], cells[3]])
grid = np.vstack([top, bottom])
rgb = cv2.cvtColor(grid, cv2.COLOR_BGR2RGB)
pil_img = Image.fromarray(rgb)
positions = [
(10, 10, labels_info[0]),
(cw+10, 10, labels_info[1]),
(10, ch+10, labels_info[2]),
(cw+10, ch+10, labels_info[3]),
]
texts = []
for (x, y, lbl) in positions:
texts.append((x, y, lbl, (0, 255, 100), 32))
texts.append((10, sh-26,
time.strftime("%Y-%m-%d %H:%M:%S"),
(160, 160, 160), 20))
add_texts_to_pil(pil_img, texts)
arr = np.ascontiguousarray(np.array(pil_img))
h, w, ch2 = arr.shape
qimg = QImage(arr.tobytes(), w, h, w * ch2,
QImage.Format_RGB888)
with self._lock:
self._out_q.append(qimg)
radar_thr = self.cfg_ref.get("radar_alarm_dist", 50)
# self.serial_worker.update_send(0, 0, 0, 0, radar_thr)
except Exception as e:
print("[QUAD RENDER ERROR] {}".format(e))
elapsed = time.time() - t0
sl = self.FRAME_DELAY - elapsed
if sl > 0:
time.sleep(sl)
def stop(self):
self.running = False
def update_crop_ratio(self, camera_name, ratio):
"""动态更新裁切比例"""
if camera_name in self.CROP_RATIO:
self.CROP_RATIO[camera_name] = max(0, min(0.5, ratio))
print(f"[CROP] {camera_name} 裁切比例更新为: {ratio}")
# ==========================================================
# 12. 回放面板
2026-04-01 17:27:55 +08:00
# 视频充满右侧区域:强制拉伸到 label 尺寸(不保持宽高比)
# 字体放大
2026-04-01 14:11:47 +08:00
# ==========================================================
class PlaybackPanel(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
self._video_list = []
self._video_cursor = 0
self._play_cap = None
self._play_timer = None
self._play_paused = False
self._play_fps = 15.0
self._play_total = 0
self._last_frame_t = 0.0
self._build_ui()
def _build_ui(self):
root = QHBoxLayout(self)
root.setContentsMargins(8, 8, 8, 8)
root.setSpacing(8)
# ---- 左侧列表 ----
left = QWidget()
left.setFixedWidth(260)
ll = QVBoxLayout(left)
ll.setContentsMargins(0, 0, 0, 0)
ll.setSpacing(6)
list_title = QLabel("📋 录像列表")
list_title.setStyleSheet(
"font-size:18px; color:#e94560; font-weight:bold;")
ll.addWidget(list_title)
self._list_widget = QListWidget()
self._list_widget.setFocusPolicy(Qt.NoFocus)
2026-04-01 17:27:55 +08:00
# 行高增大,字体加大
2026-04-01 14:11:47 +08:00
self._list_widget.setStyleSheet("""
QListWidget {
background:#0d1117; border:1px solid #0f3460;
font-size:15px; color:#ccc;
}
QListWidget::item {
padding:8px 4px;
border-bottom:1px solid #1a2540;
min-height:36px;
}
QListWidget::item:selected {
background:#e94560; color:#fff;
}
""")
ll.addWidget(self._list_widget, stretch=1)
self._disk_label = QLabel("")
self._disk_label.setStyleSheet("color:#aaa; font-size:13px;")
self._disk_label.setWordWrap(True)
ll.addWidget(self._disk_label)
self._disk_bar = QProgressBar()
self._disk_bar.setRange(0, 100)
self._disk_bar.setFixedHeight(10)
self._disk_bar.setTextVisible(False)
ll.addWidget(self._disk_bar)
hint = QLabel("↑↓选择 OK播放/暂停 返回退出")
hint.setStyleSheet("color:#555; font-size:13px;")
hint.setWordWrap(True)
ll.addWidget(hint)
root.addWidget(left)
# ---- 右侧播放区 ----
right = QWidget()
right.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
rl = QVBoxLayout(right)
rl.setContentsMargins(0, 0, 0, 0)
rl.setSpacing(4)
play_title = QLabel("▶ 回放画面")
play_title.setFixedHeight(32)
play_title.setStyleSheet(
"font-size:18px; color:#e94560; font-weight:bold;")
rl.addWidget(play_title)
2026-04-01 17:27:55 +08:00
# label 充满右侧Expanding + setScaledContents(True)
2026-04-01 14:11:47 +08:00
# setScaledContents(True) 让 pixmap 自动随 label 尺寸拉伸
self._play_label = QLabel("选择左侧录像后按 OK 播放")
self._play_label.setAlignment(Qt.AlignCenter)
self._play_label.setStyleSheet(
"background:#0d1117; color:#555; font-size:16px;"
" border:1px solid #0f3460;")
self._play_label.setSizePolicy(
QSizePolicy.Expanding, QSizePolicy.Expanding)
2026-04-01 17:27:55 +08:00
self._play_label.setScaledContents(True) # 自动拉伸填满
2026-04-01 14:11:47 +08:00
rl.addWidget(self._play_label, stretch=1)
self._status_label = QLabel("")
self._status_label.setAlignment(Qt.AlignCenter)
self._status_label.setFixedHeight(28)
self._status_label.setStyleSheet(
"color:#aaa; font-size:15px;")
rl.addWidget(self._status_label)
root.addWidget(right, stretch=1)
def load(self):
self._stop_play()
self._video_list = RecordWorker.list_videos()
self._list_widget.clear()
self._video_cursor = 0
if not self._video_list:
self._list_widget.addItem(" (暂无录像)")
else:
for path in self._video_list:
ts = RecordWorker.parse_ts(path)
sz = RecordWorker.file_size_str(path)
item = QListWidgetItem(" {}{}".format(ts, sz))
self._list_widget.addItem(item)
self._list_widget.setCurrentRow(0)
self._update_disk_ui()
self._play_label.setScaledContents(False)
self._play_label.setText("选择左侧录像后按 OK 播放")
self._status_label.setText("")
def _update_disk_ui(self):
info_str, pct = get_disk_info_str(VIDEO_DIR)
self._disk_label.setText(info_str)
self._disk_bar.setValue(pct)
color = "#e94560" if pct >= 90 else "#4CAF50"
self._disk_bar.setStyleSheet("""
QProgressBar {{ background:#1a2540; border-radius:4px; }}
QProgressBar::chunk {{ background:{}; border-radius:4px; }}
""".format(color))
def _stop_play(self):
if self._play_timer:
self._play_timer.stop()
self._play_timer = None
if self._play_cap:
self._play_cap.release()
self._play_cap = None
self._play_paused = False
def _start_play(self):
if not self._video_list:
return
self._stop_play()
path = self._video_list[self._video_cursor]
self._play_cap = cv2.VideoCapture(path)
if not self._play_cap.isOpened():
self._status_label.setText("无法打开文件")
return
self._play_total = int(self._play_cap.get(cv2.CAP_PROP_FRAME_COUNT))
self._play_fps = self._play_cap.get(cv2.CAP_PROP_FPS) or RECORD_FPS
self._play_paused = False
2026-04-01 17:27:55 +08:00
# 开始播放时启用 ScaledContents 填满 label
2026-04-01 14:11:47 +08:00
self._play_label.setScaledContents(True)
self._play_label.setText("")
interval_ms = max(1, int(1000.0 / self._play_fps))
self._play_timer = QTimer()
self._play_timer.timeout.connect(self._tick)
self._play_timer.start(interval_ms)
self._status_label.setText("▶ 播放中 | {}".format(
RecordWorker.parse_ts(path)))
def _tick(self):
if self._play_cap is None or self._play_paused:
return
ret, frame = self._play_cap.read()
if not ret:
self._play_timer.stop()
self._status_label.setText("✅ 播放完毕")
return
2026-04-01 17:27:55 +08:00
# 直接设置 pixmaplabel.setScaledContents(True) 自动填满
2026-04-01 14:11:47 +08:00
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
h, w = rgb.shape[:2]
qimg = QImage(rgb.tobytes(), w, h, w * 3, QImage.Format_RGB888)
self._play_label.setPixmap(QPixmap.fromImage(qimg))
cur = int(self._play_cap.get(cv2.CAP_PROP_POS_FRAMES))
es = cur / self._play_fps
ts = self._play_total / self._play_fps
ps = " 【已暂停】" if self._play_paused else ""
self._status_label.setText(
"{:02d}:{:02d} / {:02d}:{:02d}{}".format(
int(es)//60, int(es)%60,
int(ts)//60, int(ts)%60, ps))
def on_key(self, code):
if code == KEY_UP:
if self._video_list:
self._video_cursor = max(0, self._video_cursor - 1)
self._list_widget.setCurrentRow(self._video_cursor)
self._stop_play()
self._play_label.setScaledContents(False)
self._play_label.setText("按 OK 播放")
self._status_label.setText("")
elif code == KEY_DOWN:
if self._video_list:
self._video_cursor = min(
len(self._video_list) - 1, self._video_cursor + 1)
self._list_widget.setCurrentRow(self._video_cursor)
self._stop_play()
self._play_label.setScaledContents(False)
self._play_label.setText("按 OK 播放")
self._status_label.setText("")
elif code == KEY_OK:
if self._play_cap and self._play_cap.isOpened():
self._play_paused = not self._play_paused
self._status_label.setText(
"⏸ 已暂停" if self._play_paused else "▶ 播放中")
else:
self._start_play()
def cleanup(self):
self._stop_play()
# ==========================================================
# 13. 设置面板
2026-04-01 17:27:55 +08:00
# 字体全面放大(列表 20px、标题 26px、值标签 36px
2026-04-01 14:11:47 +08:00
# ==========================================================
class SettingsPanel(QWidget):
SUB_MAIN = 0
SUB_PLAYBACK = 1
def __init__(self, cfg, parent=None):
super().__init__(parent)
self._cfg = cfg
self._cursor = 0
self._sub_page = self.SUB_MAIN
self._build_ui()
def _define_menu(self):
return [
{"name":"显示模式", "type":"toggle",
"key":"display_mode",
"values":[MODE_360, MODE_QUAD],
"labels":["360环视", "四分屏"]},
{"name":"轮播间隔(秒)", "type":"range",
"key":"carousel_interval", "min":1, "max":10, "step":1},
{"name":"画面亮度", "type":"range",
"key":"brightness", "min":0, "max":100, "step":5},
{"name":"雷达报警距离", "type":"range",
2026-04-01 17:27:55 +08:00
"key":"radar_alarm_dist", "min":5, "max":50, "step":5},
2026-04-01 14:11:47 +08:00
{"name":"雷达距离单位", "type":"toggle",
"key":"radar_unit",
"values":[0, 1], "labels":["分米", "厘米"]},
{"name":"显示雷达距离", "type":"toggle",
"key":"show_radar",
"values":[0, 1], "labels":["关闭", "开启"]},
{"name":"查看录像回放", "type":"page",
"action": self._enter_playback},
]
def _build_ui(self):
2026-04-01 17:27:55 +08:00
# 全面放大字号
2026-04-01 14:11:47 +08:00
self.setStyleSheet("""
QWidget {
background:#1a1a2e;
color:#e0e0e0;
font-size:20px;
}
QLabel { font-size:20px; }
QListWidget {
background:#16213e;
border:1px solid #0f3460;
font-size:20px;
color:#e0e0e0;
}
QListWidget::item {
padding:10px 8px;
border-bottom:1px solid #0f3460;
min-height:44px;
}
QListWidget::item:selected {
background:#e94560;
color:#ffffff;
}
""")
self._stack = QStackedWidget(self)
root = QVBoxLayout(self)
root.setContentsMargins(0, 0, 0, 0)
root.addWidget(self._stack)
# 主菜单页
mw = QWidget()
ml = QVBoxLayout(mw)
ml.setContentsMargins(50, 24, 50, 24)
ml.setSpacing(14)
title = QLabel("⚙ 系统设置")
title.setAlignment(Qt.AlignCenter)
title.setFixedHeight(50)
title.setStyleSheet(
"color:#e94560; font-size:30px; font-weight:bold;")
ml.addWidget(title)
sep = QFrame()
sep.setFrameShape(QFrame.HLine)
sep.setFixedHeight(2)
sep.setStyleSheet("background:#0f3460;")
ml.addWidget(sep)
self._list_widget = QListWidget()
self._list_widget.setFocusPolicy(Qt.NoFocus)
ml.addWidget(self._list_widget, stretch=1)
2026-04-01 17:27:55 +08:00
# 当前值标签放大
2026-04-01 14:11:47 +08:00
self._val_label = QLabel("")
self._val_label.setAlignment(Qt.AlignCenter)
self._val_label.setFixedHeight(52)
self._val_label.setStyleSheet(
"color:#e94560; font-size:36px; font-weight:bold;")
ml.addWidget(self._val_label)
hint = QLabel("↑↓ 移动 ←→ 调值 OK 切换/进入 返回 退出保存")
hint.setAlignment(Qt.AlignCenter)
hint.setFixedHeight(30)
hint.setStyleSheet("color:#555; font-size:16px;")
ml.addWidget(hint)
self._stack.addWidget(mw)
# 回放页
self._playback_panel = PlaybackPanel()
self._stack.addWidget(self._playback_panel)
self._stack.setCurrentIndex(0)
self._menu_items = self._define_menu()
def _refresh(self):
self._list_widget.clear()
for item in self._menu_items:
vstr = self._item_val_str(item)
self._list_widget.addItem(
" {} {}".format(item["name"], vstr))
self._list_widget.setCurrentRow(self._cursor)
item = self._menu_items[self._cursor]
if item["type"] == "range":
self._val_label.setText(str(self._cfg.get(item["key"], "")))
elif item["type"] == "toggle":
try:
idx = item["values"].index(
self._cfg.get(item["key"], item["values"][0]))
except ValueError:
idx = 0
self._val_label.setText(item["labels"][idx])
else:
self._val_label.setText("")
def _item_val_str(self, item):
if item["type"] == "range":
return "[ {} ]".format(self._cfg.get(item["key"], "?"))
elif item["type"] == "toggle":
try:
idx = item["values"].index(
self._cfg.get(item["key"], item["values"][0]))
except ValueError:
idx = 0
return "[ {} ]".format(item["labels"][idx])
return ""
def _enter_playback(self):
self._playback_panel.load()
self._sub_page = self.SUB_PLAYBACK
self._stack.setCurrentIndex(1)
def on_key(self, code):
if self._sub_page == self.SUB_MAIN:
n = len(self._menu_items)
if code == KEY_UP:
self._cursor = (self._cursor - 1) % n
self._refresh()
elif code == KEY_DOWN:
self._cursor = (self._cursor + 1) % n
self._refresh()
elif code == KEY_LEFT:
item = self._menu_items[self._cursor]
if item["type"] == "range":
v = self._cfg.get(item["key"], item["min"])
self._cfg[item["key"]] = max(
item["min"], v - item["step"])
self._refresh()
elif code == KEY_RIGHT:
item = self._menu_items[self._cursor]
if item["type"] == "range":
v = self._cfg.get(item["key"], item["min"])
self._cfg[item["key"]] = min(
item["max"], v + item["step"])
self._refresh()
elif code == KEY_OK:
item = self._menu_items[self._cursor]
if item["type"] == "toggle":
cur = self._cfg.get(item["key"], item["values"][0])
try:
idx = item["values"].index(cur)
except ValueError:
idx = 0
self._cfg[item["key"]] = \
item["values"][(idx+1) % len(item["values"])]
self._refresh()
elif item["type"] == "page":
item["action"]()
elif self._sub_page == self.SUB_PLAYBACK:
if code == KEY_BACK:
self._playback_panel.cleanup()
self._sub_page = self.SUB_MAIN
self._stack.setCurrentIndex(0)
self._refresh()
else:
self._playback_panel.on_key(code)
def load(self):
self._sub_page = self.SUB_MAIN
self._stack.setCurrentIndex(0)
self._cursor = 0
self._refresh()
def save(self):
save_config(self._cfg)
def get_settings(self):
return dict(self._cfg)
# ==========================================================
# 14. 主窗口
2026-04-01 17:27:55 +08:00
# 真全屏showFullScreen() + image_label 用 setScaledContents(True)
2026-04-01 14:11:47 +08:00
# 渲染线程按真实屏幕尺寸生成图像label 自动填满无黑边
# ==========================================================
class SurroundViewWindow(QMainWindow):
PAGE_MAIN = 0
PAGE_SETTINGS = 1
def __init__(self, multi_cam, serial_worker, remote_worker, cfg):
super().__init__()
self.multi_cam = multi_cam
self.serial_worker = serial_worker
self.remote_worker = remote_worker
self.cfg = cfg
self.carousel = CarouselScheduler()
self.setWindowTitle("环视系统")
2026-04-01 17:27:55 +08:00
# 先全屏,再取实际尺寸传给渲染线程
2026-04-01 14:11:47 +08:00
self.showFullScreen()
self.sw = 1024
self.sh = 600
print("[INFO] 实际屏幕尺寸: {}x{}".format(self.sw, self.sh))
self.stack = QStackedWidget()
self.setCentralWidget(self.stack)
# 主显示页
main_page = QWidget()
main_page.setStyleSheet("background:black;")
ml = QVBoxLayout(main_page)
ml.setContentsMargins(0, 0, 0, 0)
ml.setSpacing(0)
2026-04-01 17:27:55 +08:00
# setScaledContents(True) 让 pixmap 自动充满 label无黑边
2026-04-01 14:11:47 +08:00
self.image_label = QLabel()
self.image_label.setAlignment(Qt.AlignCenter)
self.image_label.setStyleSheet("background:black;")
self.image_label.setSizePolicy(
QSizePolicy.Expanding, QSizePolicy.Expanding)
2026-04-01 17:27:55 +08:00
self.image_label.setScaledContents(True) # 关键
2026-04-01 14:11:47 +08:00
ml.addWidget(self.image_label)
self.stack.addWidget(main_page)
# 设置页(全屏)
self.settings_panel = SettingsPanel(cfg)
self.stack.addWidget(self.settings_panel)
self.stack.setCurrentIndex(self.PAGE_MAIN)
# 工作线程(传入实际屏幕尺寸)
self.raw_worker = RawCaptureWorker(multi_cam.caps, multi_cam.names)
self.raw_worker.start()
self.bird_worker = BirdViewWorker(
self.raw_worker, multi_cam.camera_models,
multi_cam.birdview, framestep=2)
self.bird_worker.start()
2026-04-01 17:27:55 +08:00
# 渲染线程用实际屏幕尺寸
2026-04-01 14:11:47 +08:00
self.render_360 = RenderWorker(
self.bird_worker, self.raw_worker,
serial_worker, self.carousel, cfg,
self.sw, self.sh)
self.render_360.start()
self.render_quad = QuadRenderWorker(
self.raw_worker, serial_worker, cfg,
self.sw, self.sh)
self.render_quad.start()
self.record_worker = RecordWorker(self.raw_worker)
self.record_worker.start()
self.timer = QTimer()
self.timer.timeout.connect(self._flush)
self.timer.start(16)
remote_worker.signals.key_pressed.connect(self._on_key)
remote_worker.signals.enter_settings.connect(self._on_enter_settings)
remote_worker.signals.exit_settings.connect(self._on_exit_settings)
self._apply_display_mode()
def _apply_display_mode(self):
mode = self.cfg.get("display_mode", MODE_360)
self.serial_worker.set_rx_enabled(mode == MODE_360)
print("[UI] 显示模式: {}".format("360环视" if mode == MODE_360
else "四分屏"))
def _flush(self):
if self.stack.currentIndex() != self.PAGE_MAIN:
return
mode = self.cfg.get("display_mode", MODE_360)
qimg = (self.render_quad.get_qimage()
if mode == MODE_QUAD
else self.render_360.get_qimage())
if qimg is None:
return
2026-04-01 17:27:55 +08:00
# 直接 setPixmapsetScaledContents(True) 自动填满,无黑边
2026-04-01 14:11:47 +08:00
self.image_label.setPixmap(QPixmap.fromImage(qimg))
def _on_key(self, code):
if self.stack.currentIndex() == self.PAGE_SETTINGS:
if (code == KEY_BACK and
self.settings_panel._sub_page == SettingsPanel.SUB_MAIN):
self._on_exit_settings()
else:
self.settings_panel.on_key(code)
def _on_enter_settings(self):
if self.stack.currentIndex() == self.PAGE_SETTINGS:
return
self.settings_panel.load()
self.stack.setCurrentIndex(self.PAGE_SETTINGS)
print("[UI] 进入设置界面")
def _on_exit_settings(self):
if self.stack.currentIndex() != self.PAGE_SETTINGS:
return
self.settings_panel.save()
CarouselScheduler.INTERVAL = float(
self.cfg.get("carousel_interval", 4))
self._apply_display_mode()
self.stack.setCurrentIndex(self.PAGE_MAIN)
print("[UI] 退出设置,返回主视图")
def closeEvent(self, event):
self.timer.stop()
self.render_360.stop()
self.render_quad.stop()
self.bird_worker.stop()
self.raw_worker.stop()
self.record_worker.stop()
self.serial_worker.stop()
self.remote_worker.stop()
self.multi_cam.running = False
for cap in self.multi_cam.caps:
cap.release()
event.accept()
# ==========================================================
# 15. 主函数
# ==========================================================
def main():
parser = argparse.ArgumentParser(description="环视系统")
parser.add_argument("--port", type=str, default="/dev/ttyS0")
parser.add_argument("--remote-port", type=str, default="/dev/ttyS3")
parser.add_argument("--baudrate", type=int, default=115200)
parser.add_argument("--framestep", type=int, default=2)
args = parser.parse_args()
if True:
cfg = load_config()
CarouselScheduler.INTERVAL = float(cfg.get("carousel_interval", 4))
serial_worker = SerialWorker(port=args.port, baudrate=args.baudrate)
serial_worker.start()
print("[INFO] 雷达串口: {} @ {} 发送周期: {}ms".format(
args.port, args.baudrate, SEND_INTERVAL_MS))
remote_worker = RemoteWorker(port=args.remote_port,
baudrate=args.baudrate)
remote_worker.start()
print("[INFO] 遥控串口: {} @ {}".format(
args.remote_port, args.baudrate))
multi_cam = MultiCameraBirdView()
if not multi_cam.running:
print("[ERROR] 摄像头初始化失败")
serial_worker.stop()
remote_worker.stop()
return
app = QApplication(sys.argv)
2026-04-01 17:27:55 +08:00
# 禁用系统缩放干扰,确保全屏像素对齐
2026-04-01 14:11:47 +08:00
app.setAttribute(Qt.AA_DisableHighDpiScaling, True)
win = SurroundViewWindow(
multi_cam, serial_worker, remote_worker, cfg)
win.bird_worker.framestep = args.framestep
sys.exit(app.exec_())
elif args.mode == "static":
print("[INFO] Static mode not implemented.")
if __name__ == "__main__":
main()