嵌入式技术
使用两个摄像头来计算深度图,在没有RGBD的情况下可以使用,不过为了保持效率,需要使用Jetson边缘计算设备。
先输入的是两个IMX219的视频流,捕获的视频流是1080P的,接着传为G流这个后处理器,看图是处理了一下颜色空间。
这个VPI我查一下,是英伟达家的视觉编程接口,总之就是进行了一些转换的过程。
大概是这样的
最后进行这个合成运算和对齐
capture->rectify->SGBM->depth calc.
数据流管道
处理流程
IMX219,Sony家的东西
sudo apt install libnvvpi1 python3-vpi1 vpi1-dev
需要安装的库
https://repo.download.nvidia.com/jetson/
但是这个源码我找不到了,就是找到了deb的库。
英伟达的库好全啊
当然了,算法很全面
安装后里面有的内容
可以处理的视频流后端,这里是支持jetson的(废话)
后端使用CUDA的结果
写一个,Python是快,但是不能进行内存管理,CSDK可以。U8是八位无符号数。
#include#include #if CV_MAJOR_VERSION >= 3#include #else#include #endif#include #include #include #include #include #include int main(int argc, char *argv[]){ if (argc != 2) { std::cerr << "Must pass an input image to be blurred" << std::endl; return 1; } cv::Mat cvImage = cv::imread(argv[1]); if (cvImage.data == NULL) { std::cerr << "Can't open input image" << std::endl; return 2; } VPIStream stream; vpiStreamCreate(0, &stream); VPIImage image; vpiImageCreateWrapperOpenCVMat(cvImage, 0, &image); VPIImage imageGray; vpiImageCreate(cvImage.cols, cvImage.rows, VPI_IMAGE_FORMAT_U8, 0, &imageGray); VPIImage blurred; vpiImageCreate(cvImage.cols, cvImage.rows, VPI_IMAGE_FORMAT_U8, 0, &blurred); vpiSubmitConvertImageFormat(stream, VPI_BACKEND_CUDA, image, imageGray, NULL); vpiStreamSync(stream); VPIImageData outData; vpiImageLockData(blurred, VPI_LOCK_READ, VPI_IMAGE_BUFFER_HOST_PITCH_LINEAR, &outData); cv::Mat cvOut; vpiImageDataExportOpenCVMat(outData, &cvOut); imwrite("tutorial_blurred.png", cvOut); vpiImageUnlock(blurred); vpiStreamDestroy(stream); vpiImageDestroy(image); vpiImageDestroy(imageGray); vpiImageDestroy(blurred); return 0;}
CPP的版本,还需要Cmake编译
主要是为了加速运行视觉的算法
算法:表示不可分割的计算操作。
后端:代表负责实际计算的硬件引擎。
Streams:充当提交算法的异步队列,最终在给定后端按顺序执行。流和事件是计算管道的构建块。
缓冲区:存储输入和输出数据。
事件:提供在流和/或应用程序线程上操作的同步原语。
上下文:保存 VPI 和创建对象的状态。
可以运行的硬件
补一个硬件的淘汰路线
最近一个版本是有了Python的支持
真新啊
这个项目一开始需要的是进行摄像头的标定。
导入的库
将棋盘格引入
wget https://www.dropbox.com/sh/j4w6tg9b0ov209g/AAA1HfT7Yy33tmGI1tWaqDKCa?dl=1 -O /tmp/calib_files.zipunzip /tmp/calib_files.zip
需要提前执行一下
接着开始处理
ret, K_l, dist_coeff_l, rvecs, tvecs = cv2.calibrateCamera(obj_pts, img_pts, (arr.shape[1], arr.shape[0]), None,None)
得到校准的参数
参数到手
img = Image.open(FOLDER+"001.png")arr = np.array(img) arr_corr = cv2.undistort(arr, K_l, dist_coeff_l, None, K_l) fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(20,10))ax1.imshow(arr, cmap='gray')ax2.imshow(arr_corr, cmap='gray')plt.show()
因为是鱼眼的相机,这里使用标定的参数进行校正。
就是这样
因为是两个相机,记得标定两次,然后保存参数:
np.save("K_l.npy", K_l)np.save("K_r.npy", K_r) np.save("dist_coeff_l.npy", dist_coeff_l)np.save("dist_coeff_r.npy", dist_coeff_r)
重新开个Jupyter,导入库,生成一张新图,把相机的照片准备好
建立一些列表来存放我们的数据,使用ZIP来进行遍历,处理角点
现在就只相当于左右两个棋盘图像对齐操作
将摄像头对齐
使用立体校正
rect_l, rect_r, proj_mat_l, proj_mat_r, Q, roiL, roiR = cv2.stereoRectify(K_l, dist_l, K_r, dist_r, gray_l.shape[::-1], Rot, Trns, 1 ,(0,0))
left_stereo_maps = cv2.initUndistortRectifyMap(K_l, dist_l, rect_l, proj_mat_l, gray_l.shape[::-1], cv2.CV_16SC2)right_stereo_maps = cv2.initUndistortRectifyMap(K_r, dist_r, rect_r, proj_mat_r, gray_l.shape[::-1], cv2.CV_16SC2)
初始化每个校正过的视图
arr_l_rect = cv2.remap(arr_l, left_stereo_maps[0],left_stereo_maps[1], cv2.INTER_LANCZOS4, cv2.BORDER_CONSTANT, 0)arr_r_rect = cv2.remap(arr_r, right_stereo_maps[0],right_stereo_maps[1], cv2.INTER_LANCZOS4, cv2.BORDER_CONSTANT, 0)
将函数进行进行映射
开始进行校正
看一张就行
效果不错的情况下都安排了
读取所有的图像
对单个图像校准
fig, axs = plt.subplots(2, 2, figsize=(20,10)) # beforeaxs[0][0].title.set_text('Original L')axs[0][0].imshow(arr_l) axs[0][1].title.set_text('Original R')axs[0][1].imshow(arr_r) axs[1][0].title.set_text('Rectified L')axs[1][0].imshow(arr_l_rect)axs[1][0].title.set_text('Rectified R')axs[1][1].imshow(arr_r_rect)
结果
开始融合
可以先看下数组形状
X和Y的
组合一下
最终的结果
import timeimport queueimport numpy as npimport cv2from threading import Threadfrom camera import Camerafrom concurrent.futures import ThreadPoolExecutor MAX_DISP = 128WINDOW_SIZE = 10 class Depth(Thread): def __init__(self): super().__init__() print("Reading camera calibration...") self._map_l, self._map_r = get_calibration() self._cam_r = CameraThread(0) self._cam_l = CameraThread(1) self._disp_arr = None self._should_run = True self._dq = queue.deque(maxlen=3) self._executor = ThreadPoolExecutor(max_workers=4) self.start() # Wait for the deque to start filling up while len(self._dq) < 1: time.sleep(0.1) def stop(self): self._should_run = False self._cam_l.stop() self._cam_r.stop() def disparity(self): while len(self._dq) == 0: time.sleep(0.01) return self._dq.pop() def enqueue_async(self, vpi_image): arr = vpi_image.cpu().copy() self._dq.append(arr) del vpi_image def run(self): import vpi # Don't import vpi in main thread to avoid creating a global context i = 0 self._warp_l = make_vpi_warpmap(self._map_l) self._warp_r = make_vpi_warpmap(self._map_r) ts_history = [] while self._should_run: i += 1 ts = [] ts.append(time.perf_counter()) with vpi.Backend.CUDA: ts.append(time.perf_counter()) # confidenceMap = vpi.Image(vpi_l.size, vpi.Format.U16) # Read Images arr_l = self._cam_l.image arr_r = self._cam_r.image ts.append(time.perf_counter()) # Convert to VPI image vpi_l = vpi.asimage(arr_l) vpi_r = vpi.asimage(arr_r) ts.append(time.perf_counter()) # Rectify vpi_l = vpi_l.remap(self._warp_l) vpi_r = vpi_r.remap(self._warp_r) ts.append(time.perf_counter()) # Resize vpi_l = vpi_l.rescale( (480, 270), interp=vpi.Interp.LINEAR, border=vpi.Border.ZERO ) vpi_r = vpi_r.rescale( (480, 270), interp=vpi.Interp.LINEAR, border=vpi.Border.ZERO ) ts.append(time.perf_counter()) # Convert to 16bpp vpi_l_16bpp = vpi_l.convert(vpi.Format.U16, scale=1) vpi_r_16bpp = vpi_r.convert(vpi.Format.U16, scale=1) ts.append(time.perf_counter()) # Disparity disparity_16bpp = vpi.stereodisp( vpi_l_16bpp, vpi_r_16bpp, out_confmap=None, backend=vpi.Backend.CUDA, window=WINDOW_SIZE, maxdisp=MAX_DISP, ) ts.append(time.perf_counter()) # Convert again disparity_8bpp = disparity_16bpp.convert( vpi.Format.U8, scale=255.0 / (32 * MAX_DISP) ) ts.append(time.perf_counter()) # CPU mapping # self._disp_arr = disparity_8bpp self._executor.submit(self.enqueue_async, disparity_8bpp) # global frames # frames.append(self._disp_arr) ts.append(time.perf_counter()) # Render # cv2.imshow("Disparity", disp_arr) # cv2.waitKey(1) ts.append(time.perf_counter()) ts = np.array(ts) ts_deltas = np.diff(ts) ts_history.append(ts_deltas) debug_str = f"Iter {i} " tasks = [ "Enter context", "Read images", "Convert to VPI image", "Rectify", "Resize 1080p->270p", "Convert to 16bpp", "Disparity", "Convert 16bpp->8bpp", ".cpu() mapping", "OpenCV colormap", "Render", ] debug_str += f"Sum: {sum(ts_deltas):0.2f} " task_acc_time = 1000 * np.array(ts_history).mean(axis=0) for task, dt in zip(tasks, task_acc_time): debug_str += f"{task.ljust(20)} {dt:0.2f} " # print(debug_str) if i % 10 == 0: vpi.clear_cache() def disp2depth(disp_arr): F, B, PIXEL_SIZE = (CAM_PARAMS.F, CAM_PARAMS.B, CAM_PARAMS.PIXEL_SIZE) disp_arr[disp_arr < 10] = 10 disp_arr_mm = disp_arr * PIXEL_SIZE # convert disparites from px -> mm depth_arr_mm = F * B / (disp_arr_mm + 1e-10) # calculate depth return depth_arr_mm def get_calibration() -> tuple: fs = cv2.FileStorage( "calib/rectify_map_imx219_160deg_1080p.yaml", cv2.FILE_STORAGE_READ ) map_l = (fs.getNode("map_l_x").mat(), fs.getNode("map_l_y").mat()) map_r = (fs.getNode("map_r_x").mat(), fs.getNode("map_r_y").mat()) fs.release() return map_l, map_r def make_vpi_warpmap(cv_maps): import vpi src_map = cv_maps[0] idk_what_that_is = cv_maps[1] map_y, map_x = src_map[:, :, 0], src_map[:, :, 1] warp = vpi.WarpMap(vpi.WarpGrid((1920, 1080))) # (H, W, C) -> (C,H,W) arr_warp = np.asarray(warp) wx = arr_warp[:, :, 0] wy = arr_warp[:, :, 1] wy[:1080, :] = map_x wx[:1080, :] = map_y return warp class CameraThread(Thread): def __init__(self, sensor_id) -> None: super().__init__() self._camera = Camera(sensor_id) self._should_run = True self._image = self._camera.read() self.start() def run(self): while self._should_run: self._image = self._camera.read() @property def image(self): # NOTE: if we care about atomicity of reads, we can add a lock here return self._image def stop(self): self._should_run = False self._camera.stop() def calculate_depth(disp_arr): F, B, PIXEL_SIZE = (CAM_PARAMS.F, CAM_PARAMS.B, CAM_PARAMS.PIXEL_SIZE) disp_arr_mm = disp_arr * PIXEL_SIZE # disparites in mm depth_arr_mm = F * B / (disp_arr_mm + 1e-1) # distances return depth_arr_mm if __name__ == "__main__": DISPLAY = True SAVE = True frames_d = [] frames_rgb = [] depth = Depth() t1 = time.perf_counter() for i in range(100): disp_arr = depth.disparity() frames_d.append(disp_arr) frames_rgb.append(depth._cam_l.image) print(i) # print(disp_arr) # depth_arr = calculate_depth(disp_arr) # print(f"{i}/20: {disp_arr:0.2f}") # time.sleep(1) if DISPLAY: disp_arr = cv2.applyColorMap(disp_arr, cv2.COLORMAP_TURBO) cv2.imshow("Depth", disp_arr) cv2.imshow("Image", cv2.resize(depth._cam_l.image, (480, 270))) cv2.waitKey(1) depth.stop() t2 = time.perf_counter() print(f"Approx framerate: {len(frames_d)/(t2-t1)} FPS") # Save frames for i, (disp_arr, rgb_arr) in enumerate(zip(frames_d, frames_rgb)): print(f"{i}/{len(frames_d)}", end=" ") disp_arr = cv2.applyColorMap(disp_arr, cv2.COLORMAP_TURBO) cv2.imwrite(f"images/depth_{i}.jpg", disp_arr) cv2.imwrite(f"images/rgb_{i}.jpg", rgb_arr)
写了这么多了,不写了,给个demo。
给出相机的打印件
后面
由远及近
编辑:黄飞
全部0条评论
快来发表一下你的评论吧 !