Press "Enter" to skip to content

使用预选算法在Amazon SageMaker自动模型调整中实现定制的AutoML作业

AutoML允许您在机器学习(ML)项目生命周期的早期阶段快速得出与数据相关的普遍见解。事先了解哪些预处理技术和算法类型可以提供最佳结果可以减少开发、训练和部署正确模型所需的时间。它在每个模型的开发过程中起着至关重要的作用,并允许数据科学家专注于最有前景的ML技术。此外,AutoML还提供了一个基准模型性能,可以作为数据科学团队的参考点。

AutoML工具将不同算法和各种预处理技术应用于您的数据。例如,它可以对数据进行缩放,进行单变量特征选择,以不同方差阈值水平进行PCA,并进行聚类。这些预处理技术可以单独应用,也可以结合在一个流水线中。随后,AutoML工具将在经过预处理的数据集的不同版本上训练不同类型的模型,例如线性回归、弹性网或随机森林,并进行超参数优化(HPO)。Amazon SageMaker Autopilot消除了构建ML模型的繁重工作。提供数据集后,SageMaker Autopilot会自动探索不同的解决方案,找到最佳模型。但是,如果您想要部署自定义版本的AutoML工作流呢?

本文介绍了如何在Amazon SageMaker上使用Amazon SageMaker自动模型调优创建定制的AutoML工作流程,其中提供了GitHub资源库中的示例代码。

方案概述

对于这个应用案例,假设您是一个专门开发模型的数据科学团队的成员。您已经开发了一组自定义的预处理技术,并选择了一些通常与您的ML问题配合良好的算法。在处理新的ML用例时,您首先希望使用自定义的预处理技术和算法运行AutoML,以缩小潜在解决方案的范围。

对于这个示例,您不使用专门的数据集;相反,您将使用从Amazon Simple Storage Service(Amazon S3)导入的加州住房数据集。重点是演示使用SageMaker HPO实施解决方案的技术实现,后续可以应用于任何数据集和领域。

以下图表介绍了整体解决方案的工作流程。

Architecture diagram showing steps explained in the following Walkthrough section.

先决条件

完成本文中演示步骤的先决条件如下:

实施解决方案

完整代码可在GitHub资源库中找到。

实施解决方案的步骤(如工作流程图中所示)如下:

  1. 创建一个笔记本实例并指定以下内容:
    1. 对于笔记本实例类型,选择ml.t3.medium
    2. 对于Elastic Inference,选择none
    3. 对于平台标识符,选择Amazon Linux 2,Jupyter Lab 3
    4. 对于IAM角色,选择默认的AmazonSageMaker-ExecutionRole。如果不存在,则创建一个新的AWS身份和访问管理(IAM)角色,并附加AmazonSageMakerFullAccess IAM策略

请注意,在生产环境中,您应该创建一个最小范围的执行角色和策略。

  1. 打开笔记本实例的JupyterLab界面并克隆GitHub存储库。

您可以通过启动一个新的终端会话并运行git clone <REPO>命令,或者使用UI功能来操作,如下面的截图所示。

JupyterLab git集成按钮

  1. 打开automl.ipynb笔记本文件,选择conda_python3内核,并按照说明触发一组HPO任务

要在不进行任何更改的情况下运行代码,您需要增加用于训练作业的ml.m5.large的服务配额所有训练作业的实例数。AWS默认只允许同时运行20个SageMaker训练作业的配额。您需要申请将其增加到30个的配额。这两个配额更改通常在几分钟内获得批准。有关更多信息,请参阅请求配额增加

AWS服务配额页面,允许请求增加特定实例类型并发训练作业

如果您不想更改配额,您可以在脚本中简单修改MAX_PARALLEL_JOBS变量的值(例如修改为5)。

  1. 每个HPO作业将完成一组训练作业试验,并指示具有最佳超参数的模型。
  2. 分析结果并部署效果最佳的模型

此解决方案将在您的AWS账户中产生费用。此解决方案的费用将根据HPO训练作业的数量和持续时间而定。随着这些增加,费用也会增加。您可以通过限制训练时间并根据本文后面讨论的说明配置TuningJobCompletionCriteriaConfig来降低成本。有关定价信息,请参阅Amazon SageMaker定价

在下面的部分中,我们将通过代码示例和分析结果的步骤来详细讨论笔记本,并选择最佳模型。

初始设置

首先运行 custom-automl.ipynb 笔记本中的导入和设置部分。它会安装和导入所有所需的依赖项,实例化一个SageMaker会话和客户端,并设置默认的地区和用于存储数据的S3存储桶。

数据准备

通过运行笔记本的下载数据部分,下载加利福尼亚住房数据集并进行准备。将数据集分为训练和测试数据帧,并上传到SageMaker会话的默认S3存储桶。

整个数据集共有20,640条记录和9列,包括目标值。目标是预测房屋的中位数价值( medianHouseValue 列)。以下截图显示了数据集的前几行。

显示表格结构的加利福尼亚住房数据帧的前五行

训练脚本模板

本文中的AutoML工作流基于scikit-learn预处理管道和算法。目标是生成大量不同预处理管道和算法的组合,以找到表现最佳的设置。让我们从创建一个通用的训练脚本开始,该脚本将在笔记本实例上保持不变。在此脚本中,有两个空注释块:一个用于注入超参数,另一个用于预处理模型管道对象。它们将动态注入到每个预处理模型候选中。拥有一个通用脚本的目的是保持实现的DRY(不重复自己)。

#create base script_script = """import argparseimport joblibimport osimport numpy as npimport pandas as pdfrom sklearn.metrics import mean_squared_errorfrom sklearn.pipeline import Pipeline, FeatureUnionfrom sklearn.preprocessing import StandardScalerfrom sklearn.decomposition import PCAfrom sklearn.impute import SimpleImputerfrom sklearn.cluster import KMeansfrom sklearn.linear_model import ElasticNetfrom sklearn.ensemble import RandomForestRegressorfrom sklearn.ensemble import GradientBoostingRegressor############################## 推理函数 ##############################def model_fn(model_dir):clf = joblib.load(os.path.join(model_dir, "model.joblib"))return clfif __name__ == "__main__":print("提取参数")parser = argparse.ArgumentParser()# 超参数##### 在此动态插入 #####{}############################# 数据、模型和输出目录parser.add_argument("--model-dir", type=str, default=os.environ.get("SM_MODEL_DIR"))parser.add_argument("--train", type=str, default=os.environ.get("SM_CHANNEL_TRAIN"))parser.add_argument("--test", type=str, default=os.environ.get("SM_CHANNEL_TEST"))parser.add_argument("--train-file", type=str, default="train.parquet")parser.add_argument("--test-file", type=str, default="test.parquet")parser.add_argument("--features", type=str)parser.add_argument("--target", type=str)args, _ = parser.parse_known_args()# 加载和准备数据train_df = pd.read_parquet(os.path.join(args.train, args.train_file))test_df = pd.read_parquet(os.path.join(args.test, args.test_file))X_train = train_df[args.features.split()]X_test = test_df[args.features.split()]y_train = train_df[args.target]y_test = test_df[args.target]# 训练模型##### 在此动态插入 #####{}{}############################pipeline = Pipeline([('preprocessor', preprocessor), ('model', model)])pipeline.fit(X_train, y_train)# 验证模型并打印指标rmse = mean_squared_error(y_test, pipeline.predict(X_test), squared=False)print("RMSE: " + str(rmse))# 保存模型path = os.path.join(args.model_dir, "model.joblib")joblib.dump(pipeline, path)"""# 将_script写入文件以备后用with open("script_draft.py", "w") as f:print(_script, file=f)

创建预处理和模型组合

preprocessors字典包含应用于模型的所有输入特征的预处理技术的规范。每个配方使用scikit-learn的PipelineFeatureUnion对象定义,将单个数据转换链接在一起并堆叠在一起。例如,mean-imp-scale是一个简单的配方,确保使用各列的均值来填充缺失值,并使用StandardScaler对所有特征进行缩放。相比之下,mean-imp-scale-pca配方将更多操作链接在一起:

  1. 用平均值填充列中的缺失值。
  2. 使用平均值和标准差进行特征缩放。
  3. 在指定方差阈值上对输入数据进行主成分分析(PCA),并将其与填充和缩放的输入特征合并。

在本文中,所有的输入特征都是数值型的。如果您的输入数据集包含更多的数据类型,您应该指定一个更复杂的流程,将不同的预处理分支应用于不同的特征类型集。

preprocessors = {    "mean-imp-scale": "preprocessor = Pipeline([('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler())])\n",    "mean-imp-scale-knn": "preprocessor = FeatureUnion([('base-features', Pipeline([('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler())])), ('knn', Pipeline([('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler()), ('knn', KMeans(n_clusters=10))]))])\n",    "mean-imp-scale-pca": "preprocessor = FeatureUnion([('base-features', Pipeline([('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler())])), ('pca', Pipeline([('imputer', SimpleImputer(strategy='mean')), ('scaler', StandardScaler()), ('pca', PCA(n_components=0.9))]))])\n"   }

models 字典包含对数据集拟合的不同算法的规范。字典中的每个模型类型都具有以下规范:

  • script_output – 指向估算器使用的训练脚本的位置。当将 models 字典与 preprocessors 字典组合时,动态填充此字段。
  • insertions – 定义将插入到 script_draft.py 并随后保存在 script_output 下的代码。键 “preprocessor” 故意留空,因为此位置将用预处理器之一填充,以创建多个模型-预处理器组合。
  • hyperparameters – 一组超参数,由 HPO 作业进行优化。
  • include_cls_metadata – SageMaker Tuner 类所需的更多配置详细信息。

GitHub 存储库中提供了 models 字典的完整示例。

models = {    "rf": {        "script_output": None,        "insertions": {            # 参数            "arguments" :             "parser.add_argument('--n_estimators', type=int, default=100)\n"+            "    parser.add_argument('--max_depth', type=int, default=None)\n"+                        "    parser.add_argument('--min_samples_leaf', type=int, default=1)\n"+            "    parser.add_argument('--min_samples_split', type=int, default=2)\n"+                        "    parser.add_argument('--max_features', type=str, default='auto')\n",            # 模型调用            "preprocessor": None,            "model_call" : "model = RandomForestRegressor(n_estimators=args.n_estimators,max_depth=args.max_depth,min_samples_leaf=args.min_samples_leaf,min_samples_split=args.min_samples_split,max_features=args.max_features)\n"        },        "hyperparameters": {            "n_estimators": IntegerParameter(100, 2000, "Linear"),            "max_depth": IntegerParameter(1, 100, "Logarithmic"),            "min_samples_leaf": IntegerParameter(1, 6, "Linear"),            "min_samples_split": IntegerParameter(2, 20, "Linear"),            "max_features": CategoricalParameter(["auto", "sqrt", "log2"]),        },        "include_cls_metadata": False,    }}

接下来,让我们遍历 preprocessorsmodels 字典,并创建所有可能的组合。例如,如果您的 preprocessors 字典包含 10 个配方,并且 models 字典中有 5 个模型定义,则新创建的 pipelines 字典包含 50 个预处理器-模型流程,在 HPO 过程中进行评估。注意,此时个别的流程脚本尚未创建。Jupyter 笔记本的下一个代码块(第 9 个单元格)遍历 pipelines 字典中的所有预处理器-模型对象,插入所有相关的代码片段,并在笔记本中本地保持特定于管道的版本的脚本。在下一步中创建插入到 HPO 作业中的个别估算器时会使用这些脚本。

pipelines = {}for model_name, model_spec in models.items():    pipelines[model_name] = {}    for preprocessor_name, preprocessor_spec in preprocessors.items():        pipeline_name = f"{model_name}-{preprocessor_name}"        pipelines[model_name][pipeline_name] = {}        pipelines[model_name][pipeline_name]["insertions"] = {}        pipelines[model_name][pipeline_name]["insertions"]["preprocessor"] = preprocessor_spec        pipelines[model_name][pipeline_name]["hyperparameters"] = model_spec["hyperparameters"]        pipelines[model_name][pipeline_name]["include_cls_metadata"] = model_spec["include_cls_metadata"]                pipelines[model_name][pipeline_name]["insertions"]["arguments"] = model_spec["insertions"]["arguments"]        pipelines[model_name][pipeline_name]["insertions"]["model_call"] = model_spec["insertions"]["model_call"]        pipelines[model_name][pipeline_name]["script_output"] = f"scripts/{model_name}/script-{pipeline_name}.py"

定义估算器

现在,您可以开始定义在脚本准备好后由HPO作业使用的SageMaker估算器。让我们从创建一个包含所有估算器共同属性的包装类开始。它继承自SKLearn类,并指定了角色、实例数、类型,以及脚本将使用的特征列和目标。

class SKLearnBase(SKLearn):    def __init__(        self,         entry_point=".", # 故意留空,将在下一个函数中重写        framework_version="1.2-1",        role=sm_role,        instance_count=1,        instance_type="ml.c5.xlarge",        hyperparameters={           "features": "medianIncome housingMedianAge totalRooms totalBedrooms population households latitude longitude",            "target": "medianHouseValue",        },          **kwargs,        ):        super(SKLearnBase, self).__init__(            entry_point=entry_point,            framework_version=framework_version,            role=role,            instance_count=instance_count,            instance_type=instance_type,            hyperparameters=hyperparameters,            **kwargs        )

通过迭代之前生成的位于scripts目录中的所有脚本,我们可以构建estimators字典。您可以使用SKLearnBase类实例化一个新估算器,使用唯一的估算器名称和其中一个脚本。请注意,estimators字典有两个级别:顶级定义pipeline_family。这是一个基于要评估的模型类型的逻辑分组,等于models字典的长度。第二级包含与给定的pipeline_family组合的各个预处理器类型。在创建HPO作业时,这个逻辑分组是必需的。

estimators = {}for pipeline_family in pipelines.keys():    estimators[pipeline_family] = {}    scripts = os.listdir(f"scripts/{pipeline_family}")    for script in scripts:        if script.endswith(".py"):            estimator_name = script.split(".")[0].replace("_", "-").replace("script", "estimator")            estimators[pipeline_family][estimator_name] = SKLearnBase(                entry_point=f"scripts/{pipeline_family}/{script}",                base_job_name=estimator_name,            )

定义HPO调谐器参数

为了优化将参数传递给HPO Tuner类,我们使用HyperparameterTunerArgs数据类初始化必需的HPO类参数。它带有一组函数,用于确保以在一次部署多个模型定义时所期望的格式返回HPO参数。

@dataclassclass HyperparameterTunerArgs:    base_job_names: list[str]    estimators: list[object]    inputs: dict[str]    objective_metric_name: str    hyperparameter_ranges: list[dict]    metric_definition: dict[str]    include_cls_metadata: list[bool]    def get_estimator_dict(self) -> dict:        return {k:v for (k, v) in zip(self.base_job_names, self.estimators)}    def get_inputs_dict(self) -> dict:        return {k:v for (k, v) in zip(self.base_job_names, [self.inputs]*len(self.base_job_names))}    def get_objective_metric_name_dict(self) -> dict:        return {k:v for (k, v) in zip(self.base_job_names, [self.objective_metric_name]*len(self.base_job_names))}    def get_hyperparameter_ranges_dict(self) -> dict:        return {k:v for (k, v) in zip(self.base_job_names, self.hyperparameter_ranges)}    def get_metric_definition_dict(self) -> dict:        return {k:[v] for (k, v) in zip(self.base_job_names, [self.metric_definition]*len(self.base_job_names))}    def get_include_cls_metadata_dict(self) -> dict:        return {k:v for (k, v) in zip(self.base_job_names, self.include_cls_metadata)}

下一个代码块使用之前介绍的HyperparameterTunerArgs数据类。您创建了另一个名为hp_args的字典,并从estimators字典中生成一组特定于每个estimator_family的输入参数。在下一步中,这些参数将用于为每个模型族初始化HPO作业。

hp_args = {}for estimator_family, estimators in estimators.items():    hp_args[estimator_family] = HyperparameterTunerArgs(        base_job_names=list(estimators.keys()),        estimators=list(estimators.values()),        inputs={"train": s3_data_train.uri, "test": s3_data_test.uri},        objective_metric_name="RMSE",        hyperparameter_ranges=[pipeline.get("hyperparameters") for pipeline in pipelines[estimator_family].values()],        metric_definition={"Name": "RMSE", "Regex": "RMSE: ([0-9.]+).*$"},        include_cls_metadata=[pipeline.get("include_cls_metadata") for pipeline in pipelines[estimator_family].values()],    )

创建HPO调谐器对象

在此步骤中,您为每个estimator_family创建单独的调谐器。为什么要创建三个单独的HPO作业,而不是在所有估计器上启动一个作业?HyperparameterTuner类中限制了附加到它的10个模型定义。因此,每个HPO负责找到特定模型族的性能最佳的预处理器,并调整该模型族的超参数。

以下是关于设置的几点说明:

  • 优化策略是贝叶斯的,这意味着HPO积极监视所有试验的性能,并将优化导向更有前途的超参数组合。在使用贝叶斯策略时,应将提前停止设置为禁用自动,这将由逻辑本身处理。
  • 每个HPO作业运行最多100个作业,并且同时运行10个作业。如果您处理更大的数据集,可能需要增加总作业数量。
  • 此外,您可能希望使用设置来控制作业运行时间和HPO触发的作业数量。一种方法是将最大运行时间设置为以秒为单位(在本文中,我们将其设置为1小时)。另一种方法是使用最近发布的TuningJobCompletionCriteriaConfig。它提供了一组设置,用于监视作业的进度并决定是否有可能通过更多作业来改善结果。在本文中,我们将未改善的最大训练作业数设置为20。因此,如果得分不再提高(例如从第40次试验开始),则您不必为其他试验支付费用,直到达到max_jobs
STRATEGY = "Bayesian"OBJECTIVE_TYPE = "Minimize"MAX_JOBS = 100MAX_PARALLEL_JOBS = 10MAX_RUNTIME_IN_SECONDS = 3600EARLY_STOPPING_TYPE = "Off"# RANDOM_SEED = 42 # 如果需要在多次运行之间实现可重复性,请取消注释TUNING_JOB_COMPLETION_CRITERIA_CONFIG = TuningJobCompletionCriteriaConfig(    max_number_of_training_jobs_not_improving=20,    )tuners = {}for estimator_family, hp in hp_args.items():    tuners[estimator_family] = HyperparameterTuner.create(        estimator_dict=hp.get_estimator_dict(),        objective_metric_name_dict=hp.get_objective_metric_name_dict(),        hyperparameter_ranges_dict=hp.get_hyperparameter_ranges_dict(),        metric_definitions_dict=hp.get_metric_definition_dict(),        strategy=STRATEGY,        completion_criteria_config=TUNING_JOB_COMPLETION_CRITERIA_CONFIG,        objective_type=OBJECTIVE_TYPE,        max_jobs=MAX_JOBS,        max_parallel_jobs=MAX_PARALLEL_JOBS,        max_runtime_in_seconds=MAX_RUNTIME_IN_SECONDS,        base_tuning_job_name=f"custom-automl-{estimator_family}",        early_stopping_type=EARLY_STOPPING_TYPE, # 当使用多个训练作业定义时,不支持使用提前停止的训练作业        # random_seed=RANDOM_SEED,    )

现在,让我们遍历tunershp_args字典,并在SageMaker中触发所有HPO作业。请注意,将wait参数设置为False,这意味着内核不会等待结果完成,您可以一次性触发所有作业。

很可能并非所有的训练工作都会完成,其中一些可能会被HPO工作停止。这是由于TuningJobCompletionCriteriaConfig,即如果满足任何指定的条件,则优化完成。在本例中,当连续的20个作业的优化标准没有改善时。

for tuner, hpo in zip(tuners.values(), hp_args.values()):    tuner.fit(        inputs=hpo.get_inputs_dict(),        include_cls_metadata=hpo.get_include_cls_metadata_dict(),        wait=False,        )

分析结果

笔记本的第15单元格检查所有HPO作业是否完成,并将所有结果合并为pandas数据框以进行进一步分析。在详细分析结果之前,让我们先对SageMaker控制台进行高级查看。

Hyperparameter tuning jobs页面的顶部,您可以看到您启动的三个HPO作业。所有这些作业都提前完成,并没有执行所有的100个训练作业。在下面的截图中,您可以看到Elastic-Net模型系列完成了最多的试验,而其他模型系列则不需要那么多的训练作业来找到最佳结果。

SageMaker Hyperparameter tuning jobs console showing all three triggered HPO jobs status

您可以打开HPO作业以获取更多详细信息,例如单独的训练作业、作业配置和最佳训练作业的信息和性能。

Detailed view of one of the selected HPO jobs

让我们根据结果生成一个可视化图表,以更深入地了解AutoML工作流在所有模型系列中的性能。

从下图中,您可以得出结论,Elastic-Net模型的性能在70,000到80,000的RMSE之间波动,并最终停止,因为算法无法改善其性能,尽管尝试了各种预处理技术和超参数值。看起来随机森林的性能根据HPO探索的超参数集合变化很大,但是尽管尝试了许多次,它仍无法降低到50,000的RMSE误差以下。GradientBoosting模型从一开始就实现了最佳性能,RMSE低于50,000。HPO试图进一步改善该结果,但未能在其他超参数组合上实现更好的性能。所有HPO作业的一个总体结论是,为每个算法找到最佳超参数组合并不需要太多的作业。为了进一步改善结果,您需要尝试创建更多特征和执行其他特征工程。

Changes in HPO objective value over time by each model family

您还可以检查模型-预处理器组合的更详细视图,以了解最有希望的组合。

Changes in HPO objective value over time by each model-preprocessor combination

选择最佳模型并部署

以下代码段根据达到的最低目标值选择最佳模型。然后,您可以将模型部署为SageMaker端点。

df_best_job = df_tuner_results.loc[df_tuner_results["FinalObjectiveValue"] == df_tuner_results["FinalObjectiveValue"].min()]df_best_jobBEST_MODEL_FAMILY = df_best_job["TrainingJobFamily"].values[0]tuners.get(BEST_MODEL_FAMILY).best_training_job()tuners.get(BEST_MODEL_FAMILY).best_estimator()predictor = tuners.get(BEST_MODEL_FAMILY).deploy(    initial_instance_count=1,    instance_type="ml.c4.large",    endpoint_name=f"custom-automl-endpoint-{BEST_MODEL_FAMILY}",)

清理

为了防止对您的AWS账户产生不必要的费用,我们建议删除您在本篇文章中使用的AWS资源:

  1. 在Amazon S3控制台上,清空存储训练数据的S3存储桶中的数据。

Amazon S3控制台显示如何清空或完全删除存储桶

  1. 在SageMaker控制台上,停止笔记本实例。

SageMaker笔记本实例控制台显示如何停止实例

  1. 如果已部署模型端点,请删除它。不再使用时应删除端点,因为它们按部署时间计费。
sm_client.delete_endpoint(EndpointName=predictor.endpoint)

结论

在本篇文章中,我们展示了如何在SageMaker中创建自定义的HPO作业,使用自定义的算法和预处理技术。特别是,这个示例演示了如何自动化生成多个训练脚本的过程,以及如何使用Python编程结构来高效地部署多个并行优化作业。我们希望这个解决方案能够成为您在使用SageMaker进行自定义模型调优作业时的基本框架,以实现更高的性能和加速ML工作流程。

请查看以下资源,以进一步深入了解如何使用SageMaker HPO:

Leave a Reply

Your email address will not be published. Required fields are marked *