Press "Enter" to skip to content

在亚马逊SageMaker上使用Ray进行有效的负载均衡

提高DNN训练效率和降低培训成本的方法

Fineas Anton在Unsplash上的照片

在之前的帖子中(例如,这里),我们详细介绍了优化和优化DNN训练工作负载性能的重要性。训练深度学习模型,尤其是大型模型,可能是一项昂贵的任务。您能够以一种既加速模型收敛又最小化培训成本的方式,最大限度地利用您的培训资源,可能是项目成功的决定性因素。性能优化是一个迭代过程,在该过程中,我们识别和解决应用程序中的性能瓶颈,即阻止我们增加资源利用率和/或加速运行时间的部分。

本文是一系列关注训练深度学习模型时遇到的较常见性能瓶颈之一的帖子的第三篇,即数据预处理瓶颈。当我们的GPU(或替代加速器)-通常是培训设置中最昂贵的资源-在等待过多负载的CPU资源输入数据时,就会出现数据预处理瓶颈。

TensorBoard分析器标签中显示的数据输入管道瓶颈的典型足迹。我们可以清楚地看到每七个训练步骤都有长时间的GPU空闲时间(作者提供)

在我们关于该主题的第一篇帖子中,我们讨论并演示了解决这种类型瓶颈的不同方法,包括:

  1. 选择具有适合您工作负载的CPU到GPU计算比率的训练实例,
  2. 通过将一些CPU操作移动到GPU来改善CPU和GPU之间的工作负载平衡,以及
  3. 将一些CPU计算卸载到辅助的CPU工作设备上。

我们使用TensorFlow Data Service API演示了第三个选项,这是一种特定于TensorFlow的解决方案,其中一部分输入数据处理可以使用gRPC作为底层通信协议卸载到其他设备上。

在我们的第二篇帖子中,我们提出了一种更通用的基于gRPC的解决方案,用于使用辅助的CPU工作者,并在一个玩具PyTorch模型上进行了演示。尽管与TensorFlow Data Service API相比,它需要更多的手动编码和调整,但该解决方案提供了更大的稳定性,并允许在培训性能上进行相同的优化。

使用Ray进行负载均衡

在本文中,我们将演示一种使用辅助的CPU工作者的附加方法,旨在将通用解决方案的稳定性与TensorFlow特定API的简单性和易用性相结合。我们将使用Ray Data库的Ray Datasets来实现这种演示。通过充分利用Ray的资源管理和分布式调度系统的全部能力,Ray Data能够以可扩展和分布式的方式运行我们的训练数据输入管道。特别是,我们将以这样的方式配置我们的Ray Dataset,以便库能够自动检测和利用所有可用的CPU资源来预处理训练数据。我们还将使用Ray AIR Trainer来包装我们的模型训练循环,以实现无缝扩展到多GPU设置。

在Amazon SageMaker上部署Ray集群

在多节点环境中使用Ray框架和它提供的实用程序的先决条件是部署Ray集群。一般来说,设计、部署、管理和维护这样的计算集群可能是一项艰巨的任务,通常需要专门的DevOps工程师(或工程团队)。这可能对一些开发团队构成难以逾越的障碍。在本文中,我们将演示如何使用AWS的托管培训服务Amazon SageMaker来克服这个障碍。特别是,我们将创建一个SageMaker异构集群,其中包含GPU实例和CPU实例,并在启动时使用它来部署一个Ray集群。然后,我们将在此Ray集群上运行Ray AIR训练应用程序,依靠Ray的后端在集群中的所有资源上进行有效的负载均衡。当训练应用程序完成时,Ray集群将自动关闭。以这种方式使用SageMaker,使我们能够在不常见的集群管理开销的情况下部署和使用Ray集群。

Ray是一个强大的框架,可以支持各种机器学习工作负载。在本文中,我们将展示Ray版本2.6.1的一些功能和API。但是请注意,本文不能替代Ray文档。请务必查阅官方文档以获取最适合和最新的Ray工具的使用方式。

在开始之前,特别感谢Boruch Chalk介绍我认识到Ray Data库及其独特的功能。

玩具示例

为了方便我们的讨论,我们将定义并训练一个简单的PyTorch(2.0)Vision Transformer分类模型,该模型将在由随机图像和标签组成的合成数据集上进行训练。Ray AIR文档中包含了各种示例,展示了如何使用Ray AIR构建不同类型的训练工作负载。我们在这里创建的脚本基本上遵循了PyTorch图像分类器示例中描述的步骤。

定义Ray数据集和预处理器

Ray AIR Trainer API区分原始数据集和在将数据集输入训练循环之前应用于数据集元素的预处理流程。对于我们的原始Ray数据集,我们创建了一个简单的整数范围,大小为num_records。接下来,我们定义要应用于数据集的预处理器。我们的Ray Preprocessor包含两个组件:第一个是BatchMapper,它将原始整数映射为随机图像-标签对。第二个是TorchVisionPreprocessor,它对我们的随机批次执行torchvision转换,将它们转换为PyTorch张量,并应用一系列的GaussianBlur操作。GaussianBlur操作旨在模拟相对重的数据预处理流程。这两个预处理器通过Chain Preprocessor组合在一起。下面的代码块中演示了Ray数据集和预处理器的创建:

import rayfrom typing import Dict, Tupleimport numpy as npimport torchvision.transforms as transformsfrom ray.data.preprocessors import Chain, BatchMapper, TorchVisionPreprocessordef get_ds(batch_size, num_records):    # 创建一个原始的Ray表格数据集    ds = ray.data.range(num_records)    # 将整数映射为随机图像-标签对    def synthetic_ds(batch: Tuple[int]) -> Dict[str, np.ndarray]:        labels = batch['id']        batch_size = len(labels)        images = np.random.randn(batch_size, 224, 224, 3).astype(np.float32)        labels = np.array([label % 1000 for label in labels]).astype(                                                               dtype=np.int64)        return {"image": images, "label": labels}    # 预处理器的第一步是将整数批次映射为随机图像-标签对    synthetic_data = BatchMapper(synthetic_ds,                                  batch_size=batch_size,                                  batch_format="numpy")    # 定义一个torchvision转换,将numpy对转换为张量,并应用一系列的高斯模糊,以模拟较重的预处理       transform = transforms.Compose(        [transforms.ToTensor()] + [transforms.GaussianBlur(11)]*10    )    # 预处理器的第二步是应用torchvision转换    vision_preprocessor = TorchVisionPreprocessor(columns=["image"],                                                   transform=transform)    # 组合预处理步骤    preprocessor = Chain(synthetic_data, vision_preprocessor)    return ds, preprocessor

请注意,Ray数据流程将自动使用Ray集群中可用的所有CPU资源。这包括GPU实例上的CPU资源以及集群中任何其他辅助实例的CPU资源。

定义训练循环

下一步是定义将在每个训练工作器(例如GPU)上运行的训练序列。首先,我们使用流行的timm(0.6.13)Python包定义模型,并使用train.torch.prepare_model API进行包装。接下来,我们从数据集中提取适当的分片,并定义一个迭代器,该迭代器产生具有请求的批次大小的数据批次,并将其复制到训练设备上。然后,训练循环本身由标准的PyTorch代码组成。循环结束时,我们报告结果损失指标。下面的代码块演示了每个工作器的训练序列:

import timefrom ray import trainfrom ray.air import sessionimport torch.nn as nnimport torch.optim as optimfrom timm.models.vision_transformer import VisionTransformer# 使用timm构建一个ViT模型def build_model():    return VisionTransformer()# 每个工作器的训练循环def train_loop_per_worker(config):    # 使用Ray对象包装PyTorch模型    model = train.torch.prepare_model(build_model())    criterion = nn.CrossEntropyLoss()    optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)    # 获取适当的数据集分片    train_dataset_shard = session.get_dataset_shard("train")    # 创建一个从数据集返回批次的迭代器    train_dataset_batches = train_dataset_shard.iter_torch_batches(        batch_size=config["batch_size"],        prefetch_batches=config["prefetch_batches"],        device=train.torch.get_device()    )    t0 = time.perf_counter()    for i, batch in enumerate(train_dataset_batches):        # 获取输入和标签        inputs, labels = batch["image"], batch["label"]        # 将参数梯度置零        optimizer.zero_grad()        # 前向传播 + 反向传播 + 优化        outputs = model(inputs)        loss = criterion(outputs, labels)        loss.backward()        optimizer.step()        # 打印统计信息        if i % 100 == 99:  # 每100个小批次打印一次            avg_time = (time.perf_counter()-t0)/100            print(f"Iteration {i+1}: avg time per step {avg_time:.3f}")            t0 = time.perf_counter()    metrics = dict(running_loss=loss.item())    session.report(metrics)

定义Ray Torch Trainer

一旦我们定义了数据管道和训练循环,我们就可以开始设置Ray TorchTrainer。我们根据集群中可用的资源来配置Trainer。具体来说,我们根据GPU的数量设置训练工作器的数量,并根据目标GPU上可用的内存设置批次大小。我们构建的数据集包含足够的记录来训练1000个步骤。

from ray.train.torch import TorchTrainerfrom ray.air.config import ScalingConfigdef train_model():    # 我们将根据可用的资源配置工作器的数量、数据集的大小和数据存储的大小     num_gpus = int(ray.available_resources().get("GPU", 0))        # 根据GPU的数量设置训练工作器的数量    num_workers = num_gpus if num_gpus > 0 else 1    # 根据 Amazon EC2 g5 实例系列的 GPU 内存容量设置批次大小    batch_size = 64    # 创建一个合成数据集,其中包含足够的数据来训练 1000 个步骤    num_records = batch_size * 1000 * num_workers    ds, preprocessor = get_ds(batch_size, num_records)    ds = preprocessor(ds)     trainer = TorchTrainer(        train_loop_per_worker=train_loop_per_worker,        train_loop_config={"batch_size": batch_size},        datasets={"train": ds},        scaling_config=ScalingConfig(num_workers=num_workers,                                      use_gpu=num_gpus > 0),    )    trainer.fit()

部署 Ray 集群并运行训练序列

现在我们来定义训练脚本的入口点。在这里,我们设置 Ray 集群并在主节点上启动训练序列。我们使用 sagemaker-training 库中的 Environment 类来发现异构 SageMaker 集群中的实例,如本教程所述。我们将 GPU 实例组的第一个节点定义为 Ray 集群的主节点,并在其他所有节点上运行适当的命令将它们连接到集群中(有关创建集群的更多详细信息,请参阅 Ray 文档)。我们编写主节点的代码,等待所有节点连接后再开始训练序列。这样可以确保 Ray 在定义和分配底层 Ray 任务时利用所有可用的资源。

import timeimport subprocessfrom sagemaker_training import environmentif __name__ == "__main__":    # 使用 Environment() 类自动发现 SageMaker 集群    env = environment.Environment()    if env.current_instance_group == 'gpu' and \             env.current_instance_group_hosts.index(env.current_host) == 0:        # 主节点启动 Ray 集群        p = subprocess.Popen('ray start --head --port=6379',                             shell=True).wait()        ray.init()        # 计算集群中的总节点数        groups = env.instance_groups_dict.values()        cluster_size = sum(len(v['hosts']) for v in list(groups))        # 等待所有 SageMaker 节点连接到 Ray 集群        connected_nodes = 1        while connected_nodes < cluster_size:            time.sleep(1)            resources = ray.available_resources().keys()            connected_nodes = sum(1 for s in list(resources) if 'node' in s)        # 调用训练序列        train_model()        # 关闭 Ray 集群        p = subprocess.Popen("ray down", shell=True).wait()    else:        # 工作节点连接到主节点        head = env.instance_groups_dict['gpu']['hosts'][0]        p = subprocess.Popen(            f"ray start --address='{head}:6379'",            shell=True).wait()        # 检查集群是否仍然存活的实用函数        def is_alive():            from subprocess import Popen            p = Popen('ray status', shell=True)            p.communicate()[0]            return p.returncode        # 保持节点存活,直到主节点上的进程完成        while is_alive() == 0:            time.sleep(10)

在亚马逊 SageMaker 异构集群上训练

完成训练脚本后,我们现在需要将其部署到亚马逊 SageMaker 异构集群中。为此,我们按照本教程中描述的步骤进行操作。首先,我们创建一个 source_dir 目录,将 train.py 脚本和一个包含我们脚本所依赖的两个 pip 包(timm 和 ray[air])的 requirements.txt 文件放入其中。这些包会自动安装在 SageMaker 集群中的每个节点上。我们定义了两个 SageMaker 实例组,第一个实例组包含一个 ml.g5.xlarge 实例(包含 1 个 GPU 和 4 个 vCPU),第二个实例组包含一个 ml.c5.4xlarge 实例(包含 16 个 vCPU)。然后,我们使用 SageMaker PyTorch estimator 来定义和部署我们的训练作业到云端。

from sagemaker.pytorch import PyTorch
from sagemaker.instance_group import InstanceGroup

cpu_group = InstanceGroup("cpu", "ml.c5.4xlarge", 1)
gpu_group = InstanceGroup("gpu", "ml.g5.xlarge", 1)

estimator = PyTorch(
    entry_point='train.py',
    source_dir='./source_dir',
    framework_version='2.0.0',
    role='',
    py_version='py310',
    job_name='hetero-cluster',
    instance_groups=[gpu_group, cpu_group]
)

estimator.fit()

结果

在下表中,我们比较了在两种不同设置下运行训练脚本的运行时结果:一个单独的ml.g5.xlarge GPU实例和一个包含一个ml.g5.xlarge实例和一个ml.c5.4xlarge实例的异构集群。我们使用Amazon CloudWatch评估系统资源利用率,并根据Amazon SageMaker定价(此时为每小时$0.816的ml.c5.4xlarge实例和$1.408的ml.g5.xlarge实例)估算培训成本。

Comparative Performance Results (By Author)

单个实例实验中相对较高的CPU利用率和较低的GPU利用率表明数据预处理管道存在性能瓶颈。当转移到异构集群时,这些问题得到了明显解决。不仅GPU利用率提高,训练速度也提高。总体而言,训练的价格效率提高了23%。

我们应强调,这些示例实验纯粹是为了演示Ray生态系统的自动负载平衡功能。调整控制参数可能会改善性能。选择不同的解决方案来解决CPU瓶颈(例如选择EC2 g5系列中具有更多CPU的实例)可能会带来更好的性价比。

总结

在本文中,我们演示了如何使用Ray数据集来平衡集群中所有可用的CPU工作器的重型数据预处理管道的负载。这使我们可以通过简单地将辅助CPU实例添加到训练环境中轻松解决CPU瓶颈问题。Amazon SageMaker的异构集群支持是在云中运行Ray训练作业的一种引人注目的方式,它处理集群管理的所有方面,避免了对专门的DevOps支持的需求。

请记住,这里提出的解决方案只是解决CPU瓶颈的众多不同方式之一。最适合您的解决方案将高度依赖于您项目的细节。

如往常一样,欢迎您提出评论、修正和问题。

Leave a Reply

Your email address will not be published. Required fields are marked *