机器学习(ML)在客户解决越来越具有挑战性的问题时变得越来越复杂。这种复杂性经常导致需要分布式ML,即使用多台机器来训练一个模型。尽管这样可以实现跨多个节点的任务并行化,从而加快训练时间、增强可扩展性和提高性能,但在有效利用分布式硬件方面存在着重大挑战。数据科学家必须应对数据分区、负载均衡、容错和可扩展性等挑战。ML工程师必须手动处理并行化、调度、故障和重试,需要复杂的基础架构代码。
本文将讨论使用Ray和Amazon SageMaker进行分布式ML的优势,并提供一个逐步指南,介绍如何使用这些框架构建和部署可扩展的ML工作流程。
Ray是一个开源的分布式计算框架,提供了一个灵活的框架用于ML模型的分布式训练和服务。它通过简单、可扩展的库来抽象出低级分布式系统的细节,用于常见的ML任务,如数据预处理、分布式训练、超参数调优、强化学习和模型服务。
SageMaker是一个完全托管的用于构建、训练和部署ML模型的服务。Ray与SageMaker功能无缝集成,可以构建和部署高效可靠的复杂ML工作负载。Ray和SageMaker的结合为可扩展的ML工作流程提供了端到端的功能,并具有以下突出特点:
- Ray中的分布式actor和并行构造简化了开发分布式应用程序。
- Ray AI Runtime(AIR)减少了从开发到生产的摩擦。通过Ray和AIR,相同的Python代码可以从笔记本电脑无缝扩展到大型集群。
- SageMaker的托管基础架构和处理作业、训练作业、超参数调优作业等功能可以在Ray库下使用分布式计算。
- Amazon SageMaker Experiments可以快速迭代和跟踪试验。
- Amazon SageMaker Feature Store提供了一个可扩展的存储库,用于存储、检索和共享ML模型训练所需的特征。
- 训练好的模型可以在Amazon SageMaker Model Registry中进行存储、版本管理和跟踪,以进行治理和管理。
- Amazon SageMaker Pipelines允许编排从数据准备和训练到模型部署的端到端ML生命周期的自动化工作流。
解决方案概述
本文重点介绍了同时使用Ray和SageMaker的优势。我们建立了一个使用Ray为基础的端到端ML工作流程,并使用SageMaker Pipelines进行编排。该工作流程包括使用Ray actor并行摄取数据到特征存储中,使用Ray Data进行数据预处理,使用Ray Train和超参数优化(HPO)调优作业进行大规模模型训练和超参数调优,最后对模型进行评估并将其注册到模型注册表中。
我们使用一个合成的住房数据集作为我们的数据,该数据集包含八个特征(YEAR_BUILT
、SQUARE_FEET
、NUM_BEDROOM
、NUM_BATHROOMS
、LOT_ACRES
、GARAGE_SPACES
、FRONT_PORCH
和DECK
),我们的模型将预测房屋的PRICE
。
ML工作流程中的每个阶段都被分解为离散的步骤,每个步骤都有自己的脚本,接受输入和输出参数。在下一节中,我们将介绍每个步骤的关键代码片段。完整的代码可以在aws-samples-for-ray GitHub存储库中找到。
先决条件
要使用SageMaker Python SDK并运行与本文相关的代码,您需要具备以下先决条件:
- 包含所有AWS资源的AWS帐户
- 具有访问Amazon SageMaker Studio笔记本、SageMaker Feature Store、SageMaker Model Registry和SageMaker Pipelines的IAM角色
将数据摄入SageMaker特征存储
ML工作流程中的第一步是从Amazon Simple Storage Service(Amazon S3)读取CSV格式的源数据文件,并将其摄入SageMaker特征存储。SageMaker特征存储是一个专门构建的存储库,使团队能够轻松创建、共享和管理ML特征。它简化了特征的发现、重用和共享,加快了开发速度,增加了客户团队之间的协作,并降低了成本。
将特征摄入特征存储包括以下步骤:
- 定义一个特征组并在特征存储中创建该特征组。
- 通过为每一行数据添加事件时间和记录ID,准备源数据以用于特征存储。
- 使用Boto3 SDK将准备好的数据摄入特征组。
在本节中,我们只突出了第3步,因为这是涉及使用Ray进行摄取任务的并行处理的部分。您可以在GitHub仓库中查看此过程的完整代码。
ingest_features方法定义在名为Featurestore
的类中。请注意,Featurestore
类被装饰为@ray.remote
。这表示此类的实例是一个Ray actor,即Ray中的一个有状态且并发的计算单元。它是一种编程模型,允许您创建维护内部状态并且可以被Ray集群中不同节点上运行的多个任务并发访问的分布式对象。actor提供了一种管理和封装可变状态的方式,使其在分布式环境中构建复杂的、有状态的应用程序非常有价值。您还可以在actor中指定资源需求。在这种情况下,FeatureStore
类的每个实例将需要0.5个CPU。请参阅以下代码:
@ray.remote(num_cpus=0.5)
class Featurestore:
def ingest_features(self,feature_group_name, df, region):
"""
将特征摄取到特征存储组
Args:
feature_group_name (str): 特征组名称
df (str): 以CSV格式存储的训练/验证/测试数据的路径。
"""
...
您可以通过调用remote
操作符与actor进行交互。在以下代码中,所需的actor数量作为输入参数传递给脚本。然后,根据actor的数量对数据进行分区,并将其传递给远程并行处理过程以摄取到特征存储中。您可以调用get
方法来阻塞当前任务的执行,直到远程计算完成并且结果可用。当结果可用时,ray.get
将返回结果,并继续执行当前任务。
import modin.pandas as pd
import ray
df = pd.read_csv(s3_path)
data = prepare_df_for_feature_store(df)
# 将数据分割为分区
partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
# 启动actor并循环分配分区
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = []
for actor, partition in zip(actors, input_partitions):
results.append(actor.ingest_features.remote(
args.feature_group_name,
partition, args.region
)
)
ray.get(results)
为训练、验证和测试准备数据
在这一步中,我们使用Ray Dataset来高效地拆分、转换和缩放我们的数据,为机器学习做准备。Ray Dataset提供了一种将分布式数据加载到Ray中的标准方式,支持各种存储系统和文件格式。它具有用于常见ML数据预处理操作的API,如并行转换、洗牌、分组和聚合。Ray Dataset还处理需要有状态设置和GPU加速的操作。它与其他数据处理库(如Spark、Pandas、NumPy等)以及TensorFlow和PyTorch等ML框架无缝集成。这使得在Ray上构建端到端的数据管道和ML工作流程成为可能。目标是使从业者和研究人员更容易进行分布式数据处理和机器学习。
让我们来看看执行这种数据预处理的脚本部分。我们首先从特征存储中加载数据:
def load_dataset(feature_group_name, region):
"""
从离线特征存储的S3位置加载数据作为ray数据集
Args:
feature_group_name (str): 特征组的名称
Returns:
ds (ray.data.dataset): 包含从特征存储中请求的数据的Ray数据集
"""
session = sagemaker.Session(boto3.Session(region_name=region))
fs_group = FeatureGroup(
name=feature_group_name,
sagemaker_session=session
)
fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
# 删除特征存储添加的列
# 因为这些与当前ML问题无关
cols_to_drop = ["record_id", "event_time","write_time",
"api_invocation_time", "is_deleted",
"year", "month", "day", "hour"]
ds = ray.data.read_parquet(fs_data_loc)
ds = ds.drop_columns(cols_to_drop)
print(f"{fs_data_loc}的计数是{ds.count()}")
return ds
然后,我们使用ray.data
库提供的更高级抽象来拆分和缩放数据:
def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
"""
将数据集拆分为训练集、验证集和测试集
参数:
dataset (ray.data.Dataset): 输入数据
train_size (float): 作为训练集使用的数据比例
val_size (float): 作为验证集使用的数据比例
test_size (float): 作为测试集使用的数据比例
random_state (int): 传递一个整数以在多次函数调用之间产生可重现的输出
返回:
train_set (ray.data.Dataset): 训练集
val_set (ray.data.Dataset): 验证集
test_set (ray.data.Dataset): 测试集
"""
# 使用固定的随机种子对数据集进行洗牌
shuffled_ds = dataset.random_shuffle(seed=random_state)
# 将数据分割为训练集、验证集和测试集
train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
return train_set, val_set, test_set
def scale_dataset(train_set, val_set, test_set, target_col):
"""
对训练集进行标准化,并将其应用于验证集和测试集
参数:
train_set (ray.data.Dataset): 训练集
val_set (ray.data.Dataset): 验证集
test_set (ray.data.Dataset): 测试集
target_col (str): 目标列
返回:
train_transformed (ray.data.Dataset): 缩放后的训练数据
val_transformed (ray.data.Dataset): 缩放后的验证数据
test_transformed (ray.data.Dataset): 缩放后的测试数据
"""
tranform_cols = dataset.columns()
# 从要缩放的列中移除目标列
tranform_cols.remove(target_col)
# 设置标准化缩放器
standard_scaler = StandardScaler(tranform_cols)
# 将缩放器拟合到训练数据集
print("将缩放应用于训练数据集并转换数据集...")
train_set_transformed = standard_scaler.fit_transform(train_set)
# 将缩放器应用于验证集和测试集
print("转换验证集和测试集...")
val_set_transformed = standard_scaler.transform(val_set)
test_set_transformed = standard_scaler.transform(test_set)
return train_set_transformed, val_set_transformed, test_set_transformed
处理后的训练、验证和测试数据集存储在Amazon S3中,并将作为输入参数传递给后续步骤。
进行模型训练和超参数优化
经过预处理并准备好进行建模,现在是训练一些机器学习模型并微调其超参数以最大化预测性能的时候了。我们使用XGBoost-Ray,这是一个建立在Ray上的XGBoost分布式后端,通过使用多个节点和GPU在大规模数据集上训练XGBoost模型。它提供了简单的替代XGBoost的训练和预测API,并在幕后处理分布式数据管理和训练的复杂性。
为了实现训练在多个节点上的分布,我们使用一个名为RayHelper的辅助类。如下面的代码所示,我们使用训练作业的资源配置,并选择第一个主机作为头节点:
class RayHelper():
def __init__(self, ray_port:str="9339", redis_pass:str="redis_password"):
....
self.resource_config = self.get_resource_config()
self.head_host = self.resource_config["hosts"][0]
self.n_hosts = len(self.resource_config["hosts"])
我们可以使用主机信息来决定如何在每个训练作业实例上初始化Ray:
def start_ray(self):
head_ip = self._get_ip_from_host()
# 如果当前主机是选择为头节点的主机
# 使用指定的--head标志运行`ray start`,将其设置为头节点
if self.resource_config["current_host"] == self.head_host:
output = subprocess.run(['ray', 'start', '--head', '-vvv', '--port',
self.ray_port, '--redis-password', self.redis_pass,
'--include-dashboard', 'false'], stdout=subprocess.PIPE)
print(output.stdout.decode("utf-8"))
ray.init(address="auto", include_dashboard=False)
self._wait_for_workers()
print("所有工作节点已经就绪")
print(ray.cluster_resources())
else:
# 如果当前主机不是头节点,
# 运行`ray start`并将ip地址指定为头节点head_host作为头节点
time.sleep(10)
output = subprocess.run(['ray', 'start',
f"--address={head_ip}:{self.ray_port}",
'--redis-password', self.redis_pass, "--block"], stdout=subprocess.PIPE)
print(output.stdout.decode("utf-8"))
sys.exit(0)
当启动训练作业时,可以通过在RayHelper
实例上调用start_ray()
方法来初始化Ray集群:
if __name__ == '__main__':
ray_helper = RayHelper()
ray_helper.start_ray()
args = read_parameters()
sess = sagemaker.Session(boto3.Session(region_name=args.region))
然后我们使用XGBoost-Ray进行训练:
def train_xgboost(ds_train, ds_val, params, num_workers, target_col = "price") -> Result:
"""
创建XGBoost训练器,训练并返回结果。
参数:
ds_train (ray.data.dataset):训练数据集
ds_val (ray.data.dataset):验证数据集
params (dict):超参数
num_workers (int):分布式训练的工作节点数量
target_col (str):目标列
返回:
result (ray.air.result.Result):训练作业的结果
"""
train_set = RayDMatrix(ds_train, 'PRICE')
val_set = RayDMatrix(ds_val, 'PRICE')
evals_result = {}
trainer = train(
params=params,
dtrain=train_set,
evals_result=evals_result,
evals=[(val_set, "validation")],
verbose_eval=False,
num_boost_round=100,
ray_params=RayParams(num_actors=num_workers, cpus_per_actor=1),
)
output_path=os.path.join(args.model_dir, 'model.xgb')
trainer.save_model(output_path)
valMAE = evals_result["validation"]["mae"][-1]
valRMSE = evals_result["validation"]["rmse"][-1]
print('[3] #011validation-mae:{}'.format(valMAE))
print('[4] #011validation-rmse:{}'.format(valRMSE))
local_testing = False
try:
load_run(sagemaker_session=sess)
except:
local_testing = True
if not local_testing: # 如果使用SageMaker Training,则跟踪实验
with load_run(sagemaker_session=sess) as run:
run.log_metric('validation-mae', valMAE)
run.log_metric('validation-rmse', valRMSE)
注意,在实例化trainer
时,我们传递了RayParams
,它接受工作节点数量和每个工作节点的CPU数量。XGBoost-Ray使用这些信息将训练分布到附加到Ray集群的所有节点上。
我们现在基于SageMaker Python SDK创建一个XGBoost估计器对象,并将其用于HPO作业。
使用SageMaker Pipelines编排前面的步骤
为了构建一个端到端的可扩展且可重用的机器学习工作流,我们需要使用CI/CD工具来编排前面的步骤形成一个流水线。SageMaker Pipelines与SageMaker、SageMaker Python SDK和SageMaker Studio直接集成。这种集成允许您使用易于使用的Python SDK创建机器学习工作流,并使用SageMaker Studio可视化和管理工作流。您还可以跟踪流水线执行中的数据历史,并为缓存指定步骤。
SageMaker Pipelines创建一个包含构建机器学习工作流所需步骤的有向无环图(DAG)。每个流水线是一系列相互连接的步骤,由步骤之间的数据依赖关系进行编排,并且可以进行参数化,允许您为每次运行的流水线提供输入变量作为参数。SageMaker Pipelines有四种类型的流水线参数:ParameterString
、ParameterInteger
、ParameterFloat
和ParameterBoolean
。在本节中,我们对一些输入变量进行参数化并设置步骤缓存配置:
processing_instance_count = ParameterInteger(
name='ProcessingInstanceCount',
default_value=1
)
feature_group_name = ParameterString(
name='FeatureGroupName',
default_value='fs-ray-synthetic-housing-data'
)
bucket_prefix = ParameterString(
name='Bucket_Prefix',
default_value='aws-ray-mlops-workshop/feature-store'
)
rmse_threshold = ParameterFloat(name="RMSEThreshold", default_value=15000.0)
train_size = ParameterString(
name='TrainSize',
default_value="0.6"
)
val_size = ParameterString(
name='ValidationSize',
default_value="0.2"
)
test_size = ParameterString(
name='TestSize',
default_value="0.2"
)
cache_config = CacheConfig(enable_caching=True, expire_after="PT12H")
我们定义了两个处理步骤:一个用于SageMaker特征存储摄取,另一个用于数据准备。这应该看起来与之前描述的步骤非常相似。唯一的新代码是在步骤定义之后的ProcessingStep
,它允许我们将处理作业配置包含为流水线步骤。我们还进一步指定了数据准备步骤对SageMaker特征存储摄取步骤的依赖关系。请参考以下代码:
feature_store_ingestion_step = ProcessingStep(
name='FeatureStoreIngestion',
step_args=fs_processor_args,
cache_config=cache_config
)
preprocess_dataset_step = ProcessingStep(
name='PreprocessData',
step_args=processor_args,
cache_config=cache_config
)
preprocess_dataset_step.add_depends_on([feature_store_ingestion_step])
同样地,要构建模型训练和调优步骤,我们需要在模型训练步骤的代码后面添加TuningStep
的定义,以便将SageMaker超参数调整作为流水线中的一个步骤运行。请参考以下代码:
tuning_step = TuningStep(
name="HPTuning",
tuner=tuner,
inputs={
"train": TrainingInput(
s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
"train"
].S3Output.S3Uri,
content_type="text/csv"
),
"validation": TrainingInput(
s3_data=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs[
"validation"
].S3Output.S3Uri,
content_type="text/csv"
)
},
cache_config=cache_config,
)
tuning_step.add_depends_on([preprocess_dataset_step])
在调优步骤之后,我们选择将最佳模型注册到SageMaker模型注册表中。为了控制模型质量,我们实现了一个最小质量门限,将最佳模型的目标指标(RMSE)与流水线的输入参数rmse_threshold
进行比较。为了进行评估,我们创建了另一个处理步骤来运行评估脚本。模型评估结果将存储为属性文件。属性文件在分析处理步骤的结果以决定如何运行其他步骤时特别有用。请参考以下代码:
# 指定我们将存储模型评估结果的位置,以便其他步骤可以访问这些结果
evaluation_report = PropertyFile(
name='EvaluationReport',
output_name='evaluation',
path='evaluation.json',
)
# 使用ProcessingStep来评估HPO步骤中选择的模型的性能。在这种情况下,评估的是表现最好的模型。
evaluation_step = ProcessingStep(
name='EvaluateModel',
processor=evaluation_processor,
inputs=[
ProcessingInput(
source=tuning_step.get_top_model_s3_uri(
top_k=0, s3_bucket=bucket, prefix=s3_prefix
),
destination='/opt/ml/processing/model',
),
ProcessingInput(
source=preprocess_dataset_step.properties.ProcessingOutputConfig.Outputs['test'].S3Output.S3Uri,
destination='/opt/ml/processing/test',
),
],
outputs=[
ProcessingOutput(
output_name='evaluation', source='/opt/ml/processing/evaluation'
),
],
code='./pipeline_scripts/evaluate/script.py',
property_files=[evaluation_report],
)
我们定义了一个ModelStep
,将最佳模型注册到SageMaker模型注册表中。如果最佳模型未通过预定的质量检查,我们还指定了FailStep
来输出错误消息。请参考以下代码:
register_step = ModelStep(
name='RegisterTrainedModel',
step_args=model_registry_args
)
metrics_fail_step = FailStep(
name="RMSEFail",
error_message=Join(on=" ", values=["Execution failed due to RMSE >", rmse_threshold]),
)
接下来,我们使用ConditionStep
来评估在流水线中接下来应采取模型注册步骤还是失败步骤。在我们的情况下,如果最佳模型的RMSE评分低于阈值,将注册最佳模型。
# 用于评估模型质量并分支执行的条件步骤
cond_lte = ConditionLessThanOrEqualTo(
left=JsonGet(
step_name=evaluation_step.name,
property_file=evaluation_report,
json_path='regression_metrics.rmse.value',
),
right=rmse_threshold,
)
condition_step = ConditionStep(
name='CheckEvaluation',
conditions=[cond_lte],
if_steps=[register_step],
else_steps=[metrics_fail_step],
)
最后,我们将所有定义的步骤编排到一个流水线中:
pipeline_name = 'synthetic-housing-training-sm-pipeline-ray'
step_list = [
feature_store_ingestion_step,
preprocess_dataset_step,
tuning_step,
evaluation_step,
condition_step
]
training_pipeline = Pipeline(
name=pipeline_name,
parameters=[
processing_instance_count,
feature_group_name,
train_size,
val_size,
test_size,
bucket_prefix,
rmse_threshold
],
steps=step_list
)
# 注意:如果已存在同名流水线,它将被覆盖。
training_pipeline.upsert(role_arn=role_arn)
上述流水线可以直接在SageMaker Studio中可视化和执行,或者通过调用execution = training_pipeline.start()
来执行。下图说明了流水线的流程。
此外,我们可以查看由流水线执行生成的工件的血统。
from sagemaker.lineage.visualizer import LineageTableVisualizer
viz = LineageTableVisualizer(sagemaker.session.Session())
for execution_step in reversed(execution.list_steps()):
print(execution_step)
display(viz.show(pipeline_execution_step=execution_step))
time.sleep(5)
部署模型
在通过流水线运行将最佳模型注册到SageMaker模型注册表后,我们使用SageMaker完全托管的模型部署功能将模型部署到实时终端节点。SageMaker还提供其他模型部署选项,以满足不同用例的需求。有关详细信息,请参阅选择正确的选项为您的用例部署推理模型。首先,让我们在SageMaker模型注册表中注册模型:
xgb_regressor_model = ModelPackage(
role_arn,
model_package_arn=model_package_arn,
name=model_name
)
模型的当前状态为PendingApproval
。在部署之前,我们需要将其状态设置为Approved
:
sagemaker_client.update_model_package(
ModelPackageArn=xgb_regressor_model.model_package_arn,
ModelApprovalStatus='Approved'
)
xgb_regressor_model.deploy(
initial_instance_count=1,
instance_type='ml.m5.xlarge',
endpoint_name=endpoint_name
)
清理资源
在完成实验后,请记得清理资源以避免不必要的费用。要进行清理,请调用DeleteEndpoint、DeleteModelPackageGroup、DeletePipeline和DeleteFeatureGroup等API来删除实时终端节点、模型组、流水线和特征组,并关闭所有SageMaker Studio笔记本实例。
结论
本文逐步演示了如何使用SageMaker Pipelines来编排基于Ray的机器学习工作流程。我们还演示了SageMaker Pipelines与第三方机器学习工具集成的能力。AWS提供了各种支持Ray工作负载的可扩展和安全的云服务,以确保卓越的性能和操作效率。现在,轮到您了,开始利用Amazon SageMaker Pipelines和Ray优化您的机器学习工作流程。立即行动,释放您的机器学习项目的全部潜力!