165 lines
6.4 KiB
Python
165 lines
6.4 KiB
Python
# 相机参考工具 (Optimized by XSimple)
|
|
# 作用: 实时预览指定摄像头,并在主界面对比原始画面与去畸变画面,支持按键抓拍
|
|
# 特性: 彻底修复 RK3588 退出后的 MIPI_CSI2 内核报错刷屏问题
|
|
|
|
import cv2
|
|
import threading
|
|
import sys
|
|
import os
|
|
import argparse
|
|
import numpy as np
|
|
import time # <--- 修复:引入 time 模块以处理底层 timeout 重试
|
|
|
|
# ==================== 全局控制与参数 ====================
|
|
running = True
|
|
capture_flag = False
|
|
|
|
# Front 相机内参 (K) 与 畸变系数 (D)
|
|
K = np.array([[ 3.9875655282072148e+02, 0., 6.3788553583130647e+02],
|
|
[0.,3.7214138046449295e+02, 3.4915903287538981e+02],
|
|
[ 0., 0., 1. ]])
|
|
D = np.array([ 8.6662165805452579e-02, -2.8155159006680735e-02,
|
|
-3.5111914522638211e-02, 1.4621518110074484e-02 ])
|
|
|
|
W, H = 1280, 720
|
|
|
|
# ==================== 终端指令后台监听 ====================
|
|
def input_thread():
|
|
"""
|
|
负责在后台默默倾听 SSH 终端的输入。
|
|
由于 input() 是阻塞式的,将其放入独立后台线程可以防止卡死画面。
|
|
"""
|
|
global running, capture_flag
|
|
print("\n[XSimple Tool] Terminal commands Ready: 's' = screenshot, 'q' = quit")
|
|
while running:
|
|
try:
|
|
cmd = input().strip().lower()
|
|
if cmd == 's':
|
|
capture_flag = True
|
|
elif cmd == 'q':
|
|
print("[SSH] Quit command received. Shutting down...")
|
|
running = False
|
|
break
|
|
except EOFError:
|
|
break
|
|
except Exception:
|
|
pass
|
|
|
|
# ==================== 主控与渲染流程 ====================
|
|
def main():
|
|
global running, capture_flag
|
|
|
|
# 1. 解析参数
|
|
parser = argparse.ArgumentParser(description="Multi-Camera Hardware Reference Tool")
|
|
parser.add_argument("--i", type=str, required=True,
|
|
choices=["front", "back", "left", "right"],
|
|
help="摄像头方位 (front, back, left, right)")
|
|
args = parser.parse_args()
|
|
|
|
camera_mapping = {"front": 0, "left": 1, "back": 2, "right": 3}
|
|
which_camera = camera_mapping[args.i]
|
|
cam_name = args.i.upper()
|
|
print(f"[INFO] Initializing Camera: {cam_name} (Node: /dev/video{which_camera})")
|
|
|
|
# 2. 检查输出文件夹
|
|
save_dir = os.path.join(os.getcwd(), "images")
|
|
if not os.path.exists(save_dir):
|
|
os.makedirs(save_dir)
|
|
|
|
# 3. 开启终端监听线程
|
|
cmd_listener = threading.Thread(target=input_thread, daemon=True)
|
|
cmd_listener.start()
|
|
|
|
# 4. 初始化 V4L2 摄像头
|
|
cap = None
|
|
try:
|
|
cap = cv2.VideoCapture(which_camera, cv2.CAP_V4L2)
|
|
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
|
|
cap.set(cv2.CAP_PROP_FRAME_WIDTH, W)
|
|
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, H)
|
|
|
|
# 【关键防内核报错修复】:保证底层的 V4L2 有充足缓冲池
|
|
cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
|
|
|
|
if not cap.isOpened():
|
|
print(f"[ERROR] Cannot open camera /dev/video{which_camera}", file=sys.stderr)
|
|
running = False
|
|
return
|
|
|
|
# 预计算鱼眼镜头的抗畸变映射矩阵
|
|
map1, map2 = cv2.fisheye.initUndistortRectifyMap(K, D, np.eye(3), K, (W, H), cv2.CV_16SC2)
|
|
|
|
cv2.namedWindow('Video old vs new', cv2.WND_PROP_FULLSCREEN)
|
|
cv2.setWindowProperty('Video old vs new', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
|
|
|
|
print("[INFO] Stream started. Press 'q' on windows or terminal to exit.")
|
|
|
|
# 5. 主线程 UI 渲染循环 (OpenCV 原生归属)
|
|
while running:
|
|
ret, f = cap.read()
|
|
if not ret or f is None:
|
|
# 出现 select() timeout 时,给物理层重试的机会,不直接崩溃
|
|
time.sleep(0.01)
|
|
continue
|
|
|
|
raw_frame = f.copy()
|
|
|
|
# —— 截图逻辑拦截 ——
|
|
if capture_flag:
|
|
filename = os.path.join(save_dir, f"{args.i.lower()}.png")
|
|
cv2.imwrite(filename, raw_frame)
|
|
print(f"[SUCCESS] Image saved accurately: {filename}")
|
|
capture_flag = False
|
|
|
|
# —— 图像处理与拼接 ——
|
|
undistorted = cv2.remap(f, map1, map2, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
|
|
comparison = np.hstack((f, undistorted))
|
|
|
|
# 等比例缩放以适应屏幕
|
|
scale = min(1920 / comparison.shape[1], 1080 / comparison.shape[0])
|
|
if scale < 1:
|
|
comparison = cv2.resize(comparison, (int(comparison.shape[1] * scale), int(comparison.shape[0] * scale)))
|
|
|
|
# 为了绘制字体时流畅,确保内存连续并转换类型
|
|
comparison = np.ascontiguousarray(comparison, dtype=np.uint8)
|
|
|
|
# 绘制 OSD 信息层
|
|
text_info = f"Camera: {cam_name} | Press 'q'='Quit', 's'='Screenshot'"
|
|
cv2.putText(comparison, text_info, (20, 50),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 255), 3, cv2.LINE_AA)
|
|
|
|
# 区分原本与去畸变后的区域
|
|
w_half = comparison.shape[1] // 2
|
|
h_bottom = comparison.shape[0] - 30
|
|
cv2.putText(comparison, "RAW (FISHEYE)", (20, h_bottom),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3)
|
|
cv2.putText(comparison, "UNDISTORTED", (w_half + 20, h_bottom),
|
|
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3)
|
|
|
|
# —— 显示与按键监控 ——
|
|
cv2.imshow('Video old vs new', comparison)
|
|
|
|
key = cv2.waitKey(1) & 0xFF
|
|
if key == ord('q'):
|
|
print("[GUI] Quit command triggered by UI window.")
|
|
running = False
|
|
break
|
|
elif key == ord('s'):
|
|
capture_flag = True
|
|
|
|
finally:
|
|
# 6. 安全清理底层资源 (无论遇到什么错误或中断,此代码块必会执行)
|
|
print("\n[INFO] Exiting and safely releasing hardware resources...")
|
|
running = False
|
|
if cap is not None and cap.isOpened():
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|
|
print("[INFO] V4L2 Hardware closed peacefully. Goodbye.")
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
main()
|
|
except KeyboardInterrupt:
|
|
print("\n[INFO] Ctrl+C Detected. Terminating safely...")
|
|
running = False
|