Press "Enter" to skip to content

如何使用自定义PyTorch运算符优化您的深度学习数据输入流程

PyTorch模型性能分析与优化 —— 第5部分

Photo by Alexander Grey on Unsplash

本篇文章是关于GPU-based PyTorch工作负载性能分析和优化的系列文章中的第五篇,也是第四篇的直接续篇。在第四篇文章中,我们演示了如何使用PyTorch Profiler和TensorBoard来识别、分析和解决DL训练工作负载中数据预处理流程的性能瓶颈。本篇文章中,我们讨论PyTorch对于创建自定义操作符的支持,并演示了它如何帮助我们解决数据输入流程中的性能瓶颈,加速DL工作负载,并降低训练成本。感谢Yitzhak Levi和Gilad Wasserman对本文的贡献。本文相关的代码可以在GitHub代码库中找到。

构建PyTorch扩展

PyTorch提供了多种方法来创建自定义操作,包括使用自定义模块和/或函数扩展torch.nn。在本文中,我们关注PyTorch对于集成自定义C++代码的支持。由于一些操作在C++中可以比在Python中实现得更高效和/或更容易,因此这种能力非常重要。使用指定的PyTorch工具,例如CppExtension,这些操作可以轻松地作为“扩展”包含到PyTorch中,而无需拉取和重新编译整个PyTorch代码库。关于这个功能背后的动机以及如何使用它的详细信息,请参阅官方PyTorch教程中关于自定义C++和CUDA扩展的部分。由于本文的兴趣在于加速基于CPU的数据预处理流程,我们将仅使用C++扩展,而不需要CUDA代码。在未来的文章中,我们希望演示如何使用这个功能来实现自定义的CUDA扩展,以加速在GPU上运行的训练代码。

示例

在我们之前的文章中,我们定义了一个数据输入流程,该流程从解码一个533×800的JPEG图像开始,然后提取一个随机的256×256裁剪图像,经过一些额外的变换后,输入训练循环。我们使用PyTorch Profiler和TensorBoard来测量从文件加载图像所需的时间,并意识到解码过程的浪费性。为了完整起见,我们在下面复制了代码:

import numpy as np
from PIL import Image
from torchvision.datasets.vision import VisionDataset

input_img_size = [533, 800]
img_size = 256

class FakeDataset(VisionDataset):
    def __init__(self, transform):
        super().__init__(root=None, transform=transform)
        size = 10000
        self.img_files = [f'{i}.jpg' for i in range(size)]
        self.targets = np.random.randint(low=0, high=num_classes,
                                         size=(size), dtype=np.uint8).tolist()
    
    def __getitem__(self, index):
        img_file, target = self.img_files[index], self.targets[index]
        img = Image.open(img_file)
        if self.transform is not None:
            img = self.transform(img)
        return img, target
    
    def __len__(self):
        return len(self.img_files)

transform = T.Compose(
    [T.PILToTensor(),
     T.RandomCrop(img_size),
     RandomMask(),
     ConvertColor(),
     Scale()])

回顾我们之前的文章,我们达到的优化后的平均步骤时间是0.72秒。可以想象,如果我们只解码我们感兴趣的裁剪图像,我们的流程会更快。不幸的是,截至本文撰写时,PyTorch不包含支持此功能的函数。然而,使用自定义操作创建工具,我们可以定义和实现自己的函数!

自定义JPEG图像解码和裁剪函数

libjpeg-turbo库是一个JPEG图像编解码器,与libjpeg相比,它包含了一些增强和优化。特别是,libjpeg-turbo包含了一些函数,使我们能够仅解码图像中预定义的裁剪部分,例如jpeg_skip_scanlines和jpeg_crop_scanline。如果您在conda环境中运行,可以使用以下命令安装:

conda install -c conda-forge libjpeg-turbo

请注意,libjpeg-turbo已预装在我们将在下面的实验中使用的官方AWS PyTorch 2.0深度学习Docker映像中。

在下面的代码块中,我们修改了torchvision 0.15的decode_jpeg函数,以解码并返回输入的JPEG编码图像的请求裁剪。

torch::Tensor decode_and_crop_jpeg(const torch::Tensor& data,                                   unsigned int crop_y,                                   unsigned int crop_x,                                   unsigned int crop_height,                                   unsigned int crop_width) {  struct jpeg_decompress_struct cinfo;  struct torch_jpeg_error_mgr jerr;  auto datap = data.data_ptr<uint8_t>();  // 设置解压缩结构  cinfo.err = jpeg_std_error(&jerr.pub);  jerr.pub.error_exit = torch_jpeg_error_exit;  /* 为my_error_exit建立setjmp返回上下文。 */  setjmp(jerr.setjmp_buffer);  jpeg_create_decompress(&cinfo);  torch_jpeg_set_source_mgr(&cinfo, datap, data.numel());  // 从头部读取信息  jpeg_read_header(&cinfo, TRUE);  int channels = cinfo.num_components;  jpeg_start_decompress(&cinfo);  int stride = crop_width * channels;  auto tensor =     torch::empty({int64_t(crop_height), int64_t(crop_width), channels},                  torch::kU8);  auto ptr = tensor.data_ptr<uint8_t>();  unsigned int update_width = crop_width;  jpeg_crop_scanline(&cinfo, &crop_x, &update_width);  jpeg_skip_scanlines(&cinfo, crop_y);  const int offset = (cinfo.output_width - crop_width) * channels;  uint8_t* temp = nullptr;  if(offset > 0) temp = new uint8_t[cinfo.output_width * channels];  while (cinfo.output_scanline < crop_y + crop_height) {    /* jpeg_read_scanlines期望一个指向扫描线的指针数组。     * 这里的数组只有一个元素,但如果更方便,可以要求     * 一次请求多个扫描线。     */    if(offset>0){      jpeg_read_scanlines(&cinfo, &temp, 1);      memcpy(ptr, temp + offset, stride);    }    else      jpeg_read_scanlines(&cinfo, &ptr, 1);    ptr += stride;  }  if(offset > 0){    delete[] temp;    temp = nullptr;  }  if (cinfo.output_scanline < cinfo.output_height) {    // 跳过剩余的扫描线,这是jpeg_destroy_decompress所必需的。    jpeg_skip_scanlines(&cinfo,                        cinfo.output_height - crop_y - crop_height);  }  jpeg_finish_decompress(&cinfo);  jpeg_destroy_decompress(&cinfo);  return tensor.permute({2, 0, 1});}PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {  m.def("decode_and_crop_jpeg",&decode_and_crop_jpeg,"decode_and_crop_jpeg");}

完整的C++文件可以在此处找到。

在下一节中,我们将按照PyTorch教程中的步骤,将其转换为我们可以在预处理流水线中使用的PyTorch操作符。

部署PyTorch扩展

如PyTorch教程所述,部署自定义操作符有不同的方法。有一些考虑因素可能会影响您的部署设计。以下是我们认为重要的一些示例:

  1. 即时编译:为了确保我们的C++扩展与我们训练所使用的PyTorch版本相匹配,我们的部署脚本在训练过程中在训练环境内编译代码。
  2. 多进程支持:部署脚本必须支持C++扩展从多个进程中加载的可能性(例如,多个DataLoader工作器)。
  3. 托管训练支持:由于我们经常在托管的训练环境中进行训练(例如Amazon SageMaker),我们要求部署脚本支持此选项。(有关定制托管训练环境的更多信息,请参见此处。)

在下面的代码块中,我们定义了一个简单的setup.py脚本,用于编译和安装我们的自定义函数,如此处所述。

from setuptools import setupfrom torch.utils import cpp_extensionsetup(name='decode_and_crop_jpeg',      ext_modules=[cpp_extension.CppExtension('decode_and_crop_jpeg',                                               ['decode_and_crop_jpeg.cpp'],                                               libraries=['jpeg'])],      cmdclass={'build_ext': cpp_extension.BuildExtension})

我们将我们的C++文件和setup.py脚本放在一个名为custom_op的文件夹中,并定义一个__init__.py文件,以确保设置脚本仅执行一次且由单个进程运行:

import osimport sysimport subprocessimport shleximport filelockp_dir = os.path.dirname(__file__)with filelock.FileLock(os.path.join(pkg_dir, f".lock")):  try:    from custom_op.decode_and_crop_jpeg import decode_and_crop_jpeg  except ImportError:    install_cmd = f"{sys.executable} setup.py build_ext --inplace"    subprocess.run(shlex.split(install_cmd), capture_output=True, cwd=p_dir)    from custom_op.decode_and_crop_jpeg import decode_and_crop_jpeg

最后,我们修改了我们的数据输入流水线,使用我们新创建的自定义函数:

from torchvision.datasets.vision import VisionDatasetinput_img_size = [533, 800]class FakeDataset(VisionDataset):    def __init__(self, transform):        super().__init__(root=None, transform=transform)        size = 10000        self.img_files = [f'{i}.jpg' for i in range(size)]        self.targets = np.random.randint(low=0,high=num_classes,                                        size=(size),dtype=np.uint8).tolist()    def __getitem__(self, index):        img_file, target = self.img_files[index], self.targets[index]        with torch.profiler.record_function('decode_and_crop_jpeg'):            import random            from custom_op.decode_and_crop_jpeg import decode_and_crop_jpeg            with open(img_file, 'rb') as f:                x = torch.frombuffer(f.read(), dtype=torch.uint8)            h_offset = random.randint(0, input_img_size[0] - img_size)            w_offset = random.randint(0, input_img_size[1] - img_size)            img = decode_and_crop_jpeg(x, h_offset, w_offset,                                        img_size, img_size)        if self.transform is not None:            img = self.transform(img)        return img, target    def __len__(self):        return len(self.img_files)transform = T.Compose(    [RandomMask(),     ConvertColor(),     Scale()])

结果

根据我们描述的优化,我们的步骤时间从0.72秒降低到0.48秒,性能提升了50%!当然,我们的优化的影响与原始JPEG图像的大小和裁剪尺寸的选择直接相关。

总结

数据预处理流水线中的瓶颈是常见的情况,可能会导致GPU饥饿和训练速度变慢。考虑到潜在的成本影响,您必须具备各种工具和技术来分析和解决这些问题。在本文中,我们回顾了通过创建自定义的C++ PyTorch扩展来优化数据输入流水线的选项,展示了其易用性并展示了其潜在影响。当然,这种优化机制的潜在收益将根据项目和性能瓶颈的细节而大大不同。

接下来做什么? 在我们的博客文章中,我们讨论了许多输入流水线优化方法,鼓励您查阅它们(例如,从这里开始)。

Leave a Reply

Your email address will not be published. Required fields are marked *