Press "Enter" to skip to content

使用Python实现高效的相机流

Rahul Chakraborty在Unsplash上的照片

让我们谈谈如何使用Python与网络摄像头。我有一个简单的任务,即从摄像头读取帧,并在每个帧上运行神经网络。在使用一个特定的摄像头时,我在设置目标帧率方面遇到了问题(据我现在了解,这是因为摄像头可以以mjpeg格式运行30帧每秒,但不是raw格式),所以我决定研究一下FFmpeg是否有所帮助。

最终,我让OpenCV和FFmpeg都正常工作了,但我发现了一个非常有趣的事实:在我的主要用例中,FFmpeg的性能优于OpenCV。事实上,使用FFmpeg,我在读取帧时的速度提高了15倍,整个流水线的速度提高了32%。我无法相信这个结果,并多次重新检查了所有内容,但结果保持一致。

注意:当我只是连续读取帧时,性能完全相同,但当我在读取帧之后运行某些操作时(这需要时间),FFmpeg更快。我将在下面准确地说明我的意思。

现在,让我们来看一下代码。首先是使用OpenCV读取网络摄像头帧的类:

class VideoStreamCV:    def __init__(self,src:int,fps:int,resolution:Tuple [int,int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH,self.resolution [0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT,self.resolution [1])        fourcc = cv2.VideoWriter_fourcc(*“MJPG”)        cap.set(cv2.CAP_PROP_FOURCC,fourcc)        cap.set(cv2.CAP_PROP_FPS,self.fps)        return cap    def read(self):        ret,frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

我使用了wait_for_cam函数,因为相机通常需要时间“热身”。与FFmpeg类一样,使用了相同的预热:

class VideoStreamFFmpeg:    def __init__(self,src:int,fps:int,resolution:Tuple [int,int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape =(self.resolution [1],self.resolution [0],3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name ==“Darwin”:  #macOS            input_format =“avfoundation”            video_device = f"{self.src}:none"        elif os_name ==“Linux”:            input_format =“v4l2”            video_device = f"{self.src}"        elif os_name ==“Windows”:            input_format =“dshow”            video_device = f"video = {self.src}"        else:            raise ValueError(“不支持的操作系统”)        command = [            'ffmpeg',            '-f',input_format,            '-r',str(self.fps),            '-video_size',f'{self.resolution [0]} x {self.resolution [1]}',            '-i',video_device,            '-vcodec','mjpeg',  #输入编解码器设置为mjpeg            '-an','-vcodec','rawvideo',  #将MJPEG流解码为原始视频            '-pix_fmt','bgr24',            '-vsync','2',            '-f','image2pipe','-'        ]        if os_name ==“Linux”:            command.insert(2,“-input_format”)            command.insert(3,“mjpeg”)        return subprocess.Popen(            command,stdout = subprocess.PIPE,stderr = subprocess.DEVNULL,bufsize = 10 ** 8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image)!= self.frame_size:            return None        image = np.frombuffer(raw_image,dtype = np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return False

为了计时run函数,我使用了装饰器:

def timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"主函数时间:{round(t1-t0, 4)}秒")        return result    return wrapper

作为一个繁重的合成任务,我使用了这个简单的函数代替神经网络(它也可以只是time.sleep)。这是非常重要的一部分,因为如果没有任何任务,对于OpenCV和FFmpeg来说,读取速度是相同的:

def computation_task():    for _ in range(5000000):        9999 * 9999

现在有一个循环的函数,我在其中读取帧,计时,运行computation_task

@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)

最后是main函数,我在其中设置了一些参数,使用OpenCV和FFmpeg初始化了两个视频流,并分别运行它们,一次没有computation_task,一次有:

def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG,任务{run_task}:")        print(f"平均帧读取时间:{run(cam=ff_cam, run_task=run_task)}秒\n")        print(f"CV2,任务{run_task}:")        print(f"平均帧读取时间:{run(cam=cv_cam, run_task=run_task)}秒\n")

这是我得到的结果:

FFMPEG,任务False:主函数时间:3.2334秒平均帧读取时间:0.0323秒CV2,任务False:主函数时间:3.3934秒平均帧读取时间:0.0332秒FFMPEG,任务True:主函数时间:4.461秒平均帧读取时间:0.0014秒CV2,任务True:主函数时间:6.6833秒平均帧读取时间:0.023秒

所以,没有合成任务时,我得到的读取时间是相同的:0.03230.0332。但是合成任务时:0.00140.023,因此FFmpeg的速度明显更快。美妙的是,我在神经网络应用中得到了真正的加速,而不仅仅是合成测试,所以我决定分享这些结果。

这是一个显示每次迭代所需时间的图表:读取帧,使用yolov8s模型(在CPU上)进行处理,并保存带有检测到的对象的帧:

使用Python实现高效的相机流 四海 第2张

这是一个带有合成测试的完整脚本:

import platformimport subprocessimport timefrom typing import Tupleimport cv2import numpy as npclass VideoStreamFFmpeg:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.pipe = self._open_ffmpeg()        self.frame_shape = (self.resolution[1], self.resolution[0], 3)        self.frame_size = np.prod(self.frame_shape)        self.wait_for_cam()    def _open_ffmpeg(self):        os_name = platform.system()        if os_name == "Darwin":  # macOS            input_format = "avfoundation"            video_device = f"{self.src}:none"        elif os_name == "Linux":            input_format = "v4l2"            video_device = f"{self.src}"        elif os_name == "Windows":            input_format = "dshow"            video_device = f"video={self.src}"        else:            raise ValueError("不支持的操作系统")        command = [            'ffmpeg',            '-f', input_format,            '-r', str(self.fps),            '-video_size', f'{self.resolution[0]}x{self.resolution[1]}',            '-i', video_device,            '-vcodec', 'mjpeg',  # 将输入编解码器设置为mjpeg            '-an', '-vcodec', 'rawvideo',  # 将MJPEG流解码为原始视频            '-pix_fmt', 'bgr24',            '-vsync', '2',            '-f', 'image2pipe', '-'        ]        if os_name == "Linux":            command.insert(2, "-input_format")            command.insert(3, "mjpeg")        return subprocess.Popen(            command, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, bufsize=10**8        )    def read(self):        raw_image = self.pipe.stdout.read(self.frame_size)        if len(raw_image) != self.frame_size:            return None        image = np.frombuffer(raw_image, dtype=np.uint8).reshape(self.frame_shape)        return image    def release(self):        self.pipe.terminate()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falseclass VideoStreamCV:    def __init__(self, src: int, fps: int, resolution: Tuple[int, int]):        self.src = src        self.fps = fps        self.resolution = resolution        self.cap = self._open_camera()        self.wait_for_cam()    def _open_camera(self):        cap = cv2.VideoCapture(self.src)        cap.set(cv2.CAP_PROP_FRAME_WIDTH, self.resolution[0])        cap.set(cv2.CAP_PROP_FRAME_HEIGHT, self.resolution[1])        fourcc = cv2.VideoWriter_fourcc(*"MJPG")        cap.set(cv2.CAP_PROP_FOURCC, fourcc)        cap.set(cv2.CAP_PROP_FPS, self.fps)        return cap    def read(self):        ret, frame = self.cap.read()        if not ret:            return None        return frame    def release(self):        self.cap.release()    def wait_for_cam(self):        for _ in range(30):            frame = self.read()        if frame is not None:            return True        return Falsedef timeit(func):    def wrapper(*args, **kwargs):        t0 = time.perf_counter()        result = func(*args, **kwargs)        t1 = time.perf_counter()        print(f"主函数时间:{round(t1-t0, 4)}秒")        return result    return wrapperdef computation_task():    for _ in range(5000000):        9999 * 9999@timeitdef run(cam: VideoStreamCV | VideoStreamFFmpeg, run_task: bool):    timer = []    for _ in range(100):        t0 = time.perf_counter()        cam.read()        timer.append(time.perf_counter() - t0)        if run_task:            computation_task()    cam.release()    return round(np.mean(timer), 4)def main():    fsp = 30    resolution = (1920, 1080)    for run_task in [False, True]:        ff_cam = VideoStreamFFmpeg(src=0, fps=fsp, resolution=resolution)        cv_cam = VideoStreamCV(src=0, fps=fsp, resolution=resolution)        print(f"FFMPEG,任务{run_task}:")        print(f"平均帧读取时间:{run(cam=ff_cam, run_task=run_task)}秒\n")        print(f"CV2,任务{run_task}:")        print(f"平均帧读取时间:{run(cam=cv_cam, run_task=run_task)}秒\n")if __name__ == "__main__":    main()

注意:此脚本在苹果的M1 Pro芯片上进行了测试。希望对您有所帮助!

Leave a Reply

Your email address will not be published. Required fields are marked *