让我们谈谈如何使用Python与网络摄像头。我有一个简单的任务,即从摄像头读取帧,并在每个帧上运行神经网络。在使用一个特定的摄像头时,我在设置目标帧率方面遇到了问题(据我现在了解,这是因为摄像头可以以mjpeg格式运行30帧每秒,但不是raw格式),所以我决定研究一下FFmpeg是否有所帮助。
最终,我让OpenCV和FFmpeg都正常工作了,但我发现了一个非常有趣的事实:在我的主要用例中,FFmpeg的性能优于OpenCV。事实上,使用FFmpeg,我在读取帧时的速度提高了15倍,整个流水线的速度提高了32%。我无法相信这个结果,并多次重新检查了所有内容,但结果保持一致。
注意:当我只是连续读取帧时,性能完全相同,但当我在读取帧之后运行某些操作时(这需要时间),FFmpeg更快。我将在下面准确地说明我的意思。
现在,让我们来看一下代码。首先是使用OpenCV读取网络摄像头帧的类:
class VideoStreamCV: def __init__(self,src:int,fps:int,resolution:Tuple [int,int]): self.src = src self.fps = fps self.resolution = resolution self.cap = self._open_camera() self.wait_for_cam() def _open_camera(self): cap = cv2.VideoCapture(self.src) cap.set(cv2.CAP_PROP_FRAME_WIDTH,self.resolution [0]) cap.set(cv2.CAP_PROP_FRAME_HEIGHT,self.resolution [1]) fourcc = cv2.VideoWriter_fourcc(*“MJPG”) cap.set(cv2.CAP_PROP_FOURCC,fourcc) cap.set(cv2.CAP_PROP_FPS,self.fps) return cap def read(self): ret,frame = self.cap.read() if not ret: return None return frame def release(self): self.cap.release() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return False
我使用了wait_for_cam
函数,因为相机通常需要时间“热身”。与FFmpeg类一样,使用了相同的预热:
class VideoStreamFFmpeg: def __init__(self,src:int,fps:int,resolution:Tuple [int,int]): self.src = src self.fps = fps self.resolution = resolution self.pipe = self._open_ffmpeg() self.frame_shape =(self.resolution [1],self.resolution [0],3) self.frame_size = np.prod(self.frame_shape) self.wait_for_cam() def _open_ffmpeg(self): os_name = platform.system() if os_name ==“Darwin”: #macOS input_format =“avfoundation” video_device = f"{self.src}:none" elif os_name ==“Linux”: input_format =“v4l2” video_device = f"{self.src}" elif os_name ==“Windows”: input_format =“dshow” video_device = f"video = {self.src}" else: raise ValueError(“不支持的操作系统”) command = [ 'ffmpeg', '-f',input_format, '-r',str(self.fps), '-video_size',f'{self.resolution [0]} x {self.resolution [1]}', '-i',video_device, '-vcodec','mjpeg', #输入编解码器设置为mjpeg '-an','-vcodec','rawvideo', #将MJPEG流解码为原始视频 '-pix_fmt','bgr24', '-vsync','2', '-f','image2pipe','-' ] if os_name ==“Linux”: command.insert(2,“-input_format”) command.insert(3,“mjpeg”) return subprocess.Popen( command,stdout = subprocess.PIPE,stderr = subprocess.DEVNULL,bufsize = 10 ** 8 ) def read(self): raw_image = self.pipe.stdout.read(self.frame_size) if len(raw_image)!= self.frame_size: return None image = np.frombuffer(raw_image,dtype = np.uint8).reshape(self.frame_shape) return image def release(self): self.pipe.terminate() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return False
为了计时run
函数,我使用了装饰器:
def timeit(func): def wrapper(*args, **kwargs): t0 = time.perf_counter() result = func(*args, **kwargs) t1 = time.perf_counter() print(f"主函数时间:{round(t1-t0, 4)}秒") return result return wrapper
作为一个繁重的合成任务,我使用了这个简单的函数代替神经网络(它也可以只是time.sleep
)。这是非常重要的一部分,因为如果没有任何任务,对于OpenCV和FFmpeg来说,读取速度是相同的:
def computation_task(): for _ in range(5000000): 9999 * 9999
现在有一个循环的函数,我在其中读取帧,计时,运行computation_task
:
@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool): timer = [] for _ in range(100): t0 = time.perf_counter() cam.read() timer.append(time.perf_counter() - t0) if run_task: computation_task() cam.release() return round(np.mean(timer), 4)
最后是main
函数,我在其中设置了一些参数,使用OpenCV和FFmpeg初始化了两个视频流,并分别运行它们,一次没有computation_task
,一次有:
def main(): fsp = 30 resolution = (1920, 1080) for run_task in [False, True]: ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution) cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution) print(f"FFMPEG,任务{run_task}:") print(f"平均帧读取时间:{run(cam=ff_cam, run_task=run_task)}秒\n") print(f"CV2,任务{run_task}:") print(f"平均帧读取时间:{run(cam=cv_cam, run_task=run_task)}秒\n")
这是我得到的结果:
FFMPEG,任务False:主函数时间:3.2334秒平均帧读取时间:0.0323秒CV2,任务False:主函数时间:3.3934秒平均帧读取时间:0.0332秒FFMPEG,任务True:主函数时间:4.461秒平均帧读取时间:0.0014秒CV2,任务True:主函数时间:6.6833秒平均帧读取时间:0.023秒
所以,没有合成任务时,我得到的读取时间是相同的:0.0323
,0.0332
。但是有合成任务时:0.0014
和0.023
,因此FFmpeg的速度明显更快。美妙的是,我在神经网络应用中得到了真正的加速,而不仅仅是合成测试,所以我决定分享这些结果。
这是一个显示每次迭代所需时间的图表:读取帧,使用yolov8s模型(在CPU上)进行处理,并保存带有检测到的对象的帧:
这是一个带有合成测试的完整脚本:
import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.pipe = self._open_ffmpeg() self.frame_shape = (self.resolution[1], self.resolution[0], 3) self.frame_size = np.prod(self.frame_shape) self.wait_for_cam() def _open_ffmpeg(self): os_name = platform.system() if os_name == "Darwin": # macOS input_format = "avfoundation" video_device = f"{self.src}:none" elif os_name == "Linux": input_format = "v4l2" video_device = f"{self.src}" elif os_name == "Windows": input_format = "dshow" video_device = f"video={self.src}" else: raise ValueError("不支持的操作系统") command = [ 'ffmpeg', '-f', input_format, '-r', str(self.fps), '-video_size', f'{self.resolution[0]}x{self.resolution[1]}', '-i', video_device, '-vcodec', 'mjpeg', # 将输入编解码器设置为mjpeg '-an', '-vcodec', 'rawvideo', # 将MJPEG流解码为原始视频 '-pix_fmt', 'bgr24', '-vsync', '2', '-f', 'image2pipe', '-' ] if os_name == "Linux": command.insert(2, "-input_format") command.insert(3, "mjpeg") return subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8 ) def read(self): raw_image = self.pipe.stdout.read(self.frame_size) if len(raw_image) != self.frame_size: return None image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape) return image def release(self): self.pipe.terminate() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return Falseclass VideoStreamCV: def __init__(self, src: int, fps: int, resolution: Tuple[int, int]): self.src = src self.fps = fps self.resolution = resolution self.cap = self._open_camera() self.wait_for_cam() def _open_camera(self): cap = cv2.VideoCapture(self.src) cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0]) cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1]) fourcc = cv2.VideoWriter_fourcc(*"MJPG") cap.set(cv2.CAP_PROP_FOURCC, fourcc) cap.set(cv2.CAP_PROP_FPS, self.fps) return cap def read(self): ret, frame = self.cap.read() if not ret: return None return frame def release(self): self.cap.release() def wait_for_cam(self): for _ in range(30): frame = self.read() if frame is not None: return True return Falsedef timeit(func): def wrapper(*args, **kwargs): t0 = time.perf_counter() result = func(*args, **kwargs) t1 = time.perf_counter() print(f"主函数时间:{round(t1-t0, 4)}秒") return result return wrapperdef computation_task(): for _ in range(5000000): 9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool): timer = [] for _ in range(100): t0 = time.perf_counter() cam.read() timer.append(time.perf_counter() - t0) if run_task: computation_task() cam.release() return round(np.mean(timer), 4)def main(): fsp = 30 resolution = (1920, 1080) for run_task in [False, True]: ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution) cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution) print(f"FFMPEG,任务{run_task}:") print(f"平均帧读取时间:{run(cam=ff_cam, run_task=run_task)}秒\n") print(f"CV2,任务{run_task}:") print(f"平均帧读取时间:{run(cam=cv_cam, run_task=run_task)}秒\n")if __name__ == "__main__": main()
注意:此脚本在苹果的M1 Pro芯片上进行了测试。希望对您有所帮助!