Files
AVM360/getimg.py
2026-04-01 14:11:47 +08:00

165 lines
6.4 KiB
Python

# 相机参考工具 (Optimized by XSimple)
# 作用: 实时预览指定摄像头,并在主界面对比原始画面与去畸变画面,支持按键抓拍
# 特性: 彻底修复 RK3588 退出后的 MIPI_CSI2 内核报错刷屏问题
import cv2
import threading
import sys
import os
import argparse
import numpy as np
import time # <--- 修复:引入 time 模块以处理底层 timeout 重试
# ==================== 全局控制与参数 ====================
running = True
capture_flag = False
# Front 相机内参 (K) 与 畸变系数 (D)
K = np.array([[ 3.9875655282072148e+02, 0., 6.3788553583130647e+02],
[0.,3.7214138046449295e+02, 3.4915903287538981e+02],
[ 0., 0., 1. ]])
D = np.array([ 8.6662165805452579e-02, -2.8155159006680735e-02,
-3.5111914522638211e-02, 1.4621518110074484e-02 ])
W, H = 1280, 720
# ==================== 终端指令后台监听 ====================
def input_thread():
"""
负责在后台默默倾听 SSH 终端的输入。
由于 input() 是阻塞式的,将其放入独立后台线程可以防止卡死画面。
"""
global running, capture_flag
print("\n[XSimple Tool] Terminal commands Ready: 's' = screenshot, 'q' = quit")
while running:
try:
cmd = input().strip().lower()
if cmd == 's':
capture_flag = True
elif cmd == 'q':
print("[SSH] Quit command received. Shutting down...")
running = False
break
except EOFError:
break
except Exception:
pass
# ==================== 主控与渲染流程 ====================
def main():
global running, capture_flag
# 1. 解析参数
parser = argparse.ArgumentParser(description="Multi-Camera Hardware Reference Tool")
parser.add_argument("--i", type=str, required=True,
choices=["front", "back", "left", "right"],
help="摄像头方位 (front, back, left, right)")
args = parser.parse_args()
camera_mapping = {"front": 0, "left": 1, "back": 2, "right": 3}
which_camera = camera_mapping[args.i]
cam_name = args.i.upper()
print(f"[INFO] Initializing Camera: {cam_name} (Node: /dev/video{which_camera})")
# 2. 检查输出文件夹
save_dir = os.path.join(os.getcwd(), "images")
if not os.path.exists(save_dir):
os.makedirs(save_dir)
# 3. 开启终端监听线程
cmd_listener = threading.Thread(target=input_thread, daemon=True)
cmd_listener.start()
# 4. 初始化 V4L2 摄像头
cap = None
try:
cap = cv2.VideoCapture(which_camera, cv2.CAP_V4L2)
cap.set(cv2.CAP_PROP_FOURCC, cv2.VideoWriter_fourcc(*"YUYV"))
cap.set(cv2.CAP_PROP_FRAME_WIDTH, W)
cap.set(cv2.CAP_PROP_FRAME_HEIGHT, H)
# 【关键防内核报错修复】:保证底层的 V4L2 有充足缓冲池
cap.set(cv2.CAP_PROP_BUFFERSIZE, 3)
if not cap.isOpened():
print(f"[ERROR] Cannot open camera /dev/video{which_camera}", file=sys.stderr)
running = False
return
# 预计算鱼眼镜头的抗畸变映射矩阵
map1, map2 = cv2.fisheye.initUndistortRectifyMap(K, D, np.eye(3), K, (W, H), cv2.CV_16SC2)
cv2.namedWindow('Video old vs new', cv2.WND_PROP_FULLSCREEN)
cv2.setWindowProperty('Video old vs new', cv2.WND_PROP_FULLSCREEN, cv2.WINDOW_FULLSCREEN)
print("[INFO] Stream started. Press 'q' on windows or terminal to exit.")
# 5. 主线程 UI 渲染循环 (OpenCV 原生归属)
while running:
ret, f = cap.read()
if not ret or f is None:
# 出现 select() timeout 时,给物理层重试的机会,不直接崩溃
time.sleep(0.01)
continue
raw_frame = f.copy()
# —— 截图逻辑拦截 ——
if capture_flag:
filename = os.path.join(save_dir, f"{args.i.lower()}.png")
cv2.imwrite(filename, raw_frame)
print(f"[SUCCESS] Image saved accurately: {filename}")
capture_flag = False
# —— 图像处理与拼接 ——
undistorted = cv2.remap(f, map1, map2, interpolation=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
comparison = np.hstack((f, undistorted))
# 等比例缩放以适应屏幕
scale = min(1920 / comparison.shape[1], 1080 / comparison.shape[0])
if scale < 1:
comparison = cv2.resize(comparison, (int(comparison.shape[1] * scale), int(comparison.shape[0] * scale)))
# 为了绘制字体时流畅,确保内存连续并转换类型
comparison = np.ascontiguousarray(comparison, dtype=np.uint8)
# 绘制 OSD 信息层
text_info = f"Camera: {cam_name} | Press 'q'='Quit', 's'='Screenshot'"
cv2.putText(comparison, text_info, (20, 50),
cv2.FONT_HERSHEY_SIMPLEX, 1.2, (0, 255, 255), 3, cv2.LINE_AA)
# 区分原本与去畸变后的区域
w_half = comparison.shape[1] // 2
h_bottom = comparison.shape[0] - 30
cv2.putText(comparison, "RAW (FISHEYE)", (20, h_bottom),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3)
cv2.putText(comparison, "UNDISTORTED", (w_half + 20, h_bottom),
cv2.FONT_HERSHEY_SIMPLEX, 1.5, (0, 255, 0), 3)
# —— 显示与按键监控 ——
cv2.imshow('Video old vs new', comparison)
key = cv2.waitKey(1) & 0xFF
if key == ord('q'):
print("[GUI] Quit command triggered by UI window.")
running = False
break
elif key == ord('s'):
capture_flag = True
finally:
# 6. 安全清理底层资源 (无论遇到什么错误或中断,此代码块必会执行)
print("\n[INFO] Exiting and safely releasing hardware resources...")
running = False
if cap is not None and cap.isOpened():
cap.release()
cv2.destroyAllWindows()
print("[INFO] V4L2 Hardware closed peacefully. Goodbye.")
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
print("\n[INFO] Ctrl+C Detected. Terminating safely...")
running = False