随着机器学习(ML)进入主流并得到更广泛的应用,基于ML的推断应用程序在解决一系列复杂业务问题方面越来越常见。解决这些复杂业务问题通常需要使用多个ML模型和步骤。本文向您展示如何使用自定义容器在Amazon SageMaker上构建和托管一个ML应用。
Amazon SageMaker提供内置算法和预构建的SageMaker Docker镜像以进行模型部署。但是,如果它们不满足您的需求,则可以自己带入容器(BYOC)以在Amazon SageMaker上托管。
有几种情况下,用户可能需要BYOC来在Amazon SageMaker上进行托管。
- 自定义ML框架或库:如果您打算使用Amazon SageMaker内置算法或预构建容器不支持的ML框架或库,则需要创建自定义容器。
- 专用模型:对于某些领域或行业,您可能需要特定的模型架构或定制的预处理步骤,而这些在内置的Amazon SageMaker提供中不可用。
- 专有算法:如果您在公司内部开发了自己的专有算法,则需要一个自定义容器来在Amazon SageMaker上部署它们。
- 复杂推断流程:如果您的ML推断工作流涉及自定义业务逻辑 – 一系列需要按特定顺序执行的复杂步骤 – 则BYOC可以帮助您更高效地管理和编排这些步骤。
解决方案概述
在本解决方案中,我们展示了如何使用最新的scikit-learn
和xgboost
包,在Amazon SageMaker上托管一个ML串行推断应用,并使用实时端点。
第一个容器使用scikit-learn
模型将原始数据转换为特征化列。它对数值列应用StandardScaler,对分类列应用OneHotEncoder。
第二个容器托管了一个预训练的XGboost
模型(即预测器)。预测器模型接受特征化的输入并输出预测结果。
最后,我们将特征化器和预测器部署到Amazon SageMaker实时端点中的串行推断流水线中。
以下是将在推断应用程序中使用单独容器的一些不同考虑因素。
- 解耦 – 流水线的各个步骤具有明确定义的目的,并且由于涉及到的底层依赖关系,需要在单独的容器上运行。这也有助于保持流水线的良好结构。
- 框架 – 流水线的各个步骤使用特定的适用于目的的框架(如scikit或Spark ML),因此需要在单独的容器上运行。
- 资源隔离 – 流水线的各个步骤具有不同的资源消耗需求,因此需要在单独的容器中运行,以获得更灵活和可控的资源隔离。
- 维护和升级 – 从运营角度来看,这促进了功能隔离,您可以继续更轻松地升级或修改各个步骤,而不影响其他模型。
此外,个别容器的本地构建有助于使用喜爱的工具和集成开发环境(IDE)进行开发和测试的迭代过程。容器准备好后,您可以使用Amazon SageMaker端点将它们部署到AWS云中进行推断。
完整的实现,包括代码片段,可以在此Github存储库中找到这里。
前提条件
在我们首先在本地测试这些自定义容器之前,您需要在本地计算机上安装Docker桌面。您应该熟悉构建Docker容器的方法。
您还需要一个具有访问Amazon SageMaker、Amazon ECR和Amazon S3的AWS账户,以便测试这个应用程序的端到端功能。
确保您已安装了最新版本的Boto3
和Amazon SageMaker的Python包:
pip install --upgrade boto3 sagemaker scikit-learn
解决方案演练
构建自定义特征提取器容器
为了构建第一个容器,即特征提取器容器,我们训练一个scikit-learn
模型来处理鲍鱼数据集中的原始特征。预处理脚本使用SimpleImputer来处理缺失值,StandardScaler来对数值列进行归一化,以及OneHotEncoder来对分类列进行转换。在拟合转换器后,我们将模型保存为joblib格式。然后,我们压缩和上传此保存的模型工件到Amazon简单存储服务(Amazon S3)桶。
下面是一个演示此过程的示例代码片段。详细实现请参考featurizer.ipynb:
```pythonnumeric_features = list(feature_columns_names)numeric_features.remove("sex")numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()), ])categorical_features = ["sex"]categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")), ])preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features), ])# 调用ColumnTransformer的fit方法来将所有转换器拟合到X、ypreprocessor = preprocess.fit(df_train_val)# 将处理器模型保存到磁盘joblib.dump(preprocess, os.path.join(model_dir, "preprocess.joblib"))```
接下来,为特征提取器模型创建一个自定义推断容器,我们构建一个包含nginx、gunicorn、flask包以及其他特征提取器模型所需依赖项的Docker镜像。
Nginx、gunicorn和Flask应用程序将作为Amazon SageMaker实时端点的模型服务堆栈。
当将自定义容器用于在Amazon SageMaker上托管时,我们需要确保推断脚本在容器内启动后执行以下任务:
- 模型加载:推断脚本(
preprocessing.py
)应该引用/opt/ml/model
目录来加载容器中的模型。Amazon S3中的模型工件将被下载并挂载到容器的路径/opt/ml/model
上。 - 环境变量:要将自定义环境变量传递给容器,在Model创建步骤或从训练作业创建Endpoint时,必须指定这些变量。
- API要求:推断脚本必须在Flask应用程序中实现
/ping
和/invocations
路由。其中,/ping
API用于健康检查,/invocations
API用于处理推断请求。 - 日志记录:推断脚本中的输出日志必须写入标准输出(stdout)和标准错误(stderr)流。然后,Amazon SageMaker将这些日志流式传输到Amazon CloudWatch。
下面是preprocessing.py
中的代码片段,展示了/ping
和/invocations
的实现:
请参考featurizer文件夹下的preprocessing.py以获取完整的实现。
```pythondef load_model(): # 构建featurizer模型文件的路径 ft_model_path = os.path.join(MODEL_PATH, "preprocess.joblib") featurizer = None try: # 打开模型文件并使用joblib加载featurizer with open(ft_model_path, "rb") as f: featurizer = joblib.load(f) print("加载Featurizer模型成功", flush=True) except FileNotFoundError: print(f"错误:找不到featurizer模型文件:{ft_model_path}", flush=True) except Exception as e: print(f"加载featurizer模型时发生错误:{e}", flush=True) # 返回加载的featurizer模型,如果发生错误则返回None return featurizerdef transform_fn(request_body, request_content_type): """ 将请求体转换为可用于模型的numpy数组。 此函数将请求体和内容类型作为输入, 并返回一个转换后的numpy数组,可用作预测模型的输入。 参数: request_body (str):包含输入数据的请求体。 request_content_type (str): 请求体的内容类型。 返回: data (np.ndarray): 转换后的输入数据,以numpy数组形式返回。 """ # 定义输入数据的列名 feature_columns_names = [ "性别", "长度", "直径", "高度", "总重量", "去壳重量", "内脏重量", "壳重量", ] label_column = "环数" # 检查请求体的内容类型是否支持(text/csv) if request_content_type == "text/csv": # 加载featurizer模型 featurizer = load_model() # 检查featurizer是否是ColumnTransformer if isinstance( featurizer, sklearn.compose._column_transformer.ColumnTransformer ): print(f"加载featurizer模型成功", flush=True) # 从请求体中以CSV文件形式读取输入数据 df = pd.read_csv(StringIO(request_body), header=None) # 根据输入数据的列数指定列名 if len(df.columns) == len(feature_columns_names) + 1: # 这是一个带标签的例子,包含环数标签 df.columns = feature_columns_names + [label_column] elif len(df.columns) == len(feature_columns_names): # 这是一个无标签的例子 df.columns = feature_columns_names # 使用featurizer对输入数据进行转换 data = featurizer.transform(df) # 以numpy数组形式返回转换后的数据 return data else: # 如果内容类型不支持,则抛出错误 raise ValueError("不支持的内容类型:{}".format(request_content_type))@app.route("/ping", methods=["GET"])def ping(): # 检查是否可以加载模型,根据情况设置状态 featurizer = load_model() status = 200 if featurizer is not None else 500 # 返回确定的状态码响应 return flask.Response(response="\n", status=status, mimetype="application/json")@app.route("/invocations", methods=["POST"])def invocations(): # 从JSON转换为字典 print(f"接收到的内容类型:{flask.request.content_type}") if flask.request.content_type == "text/csv": # 解码输入数据并进行转换 input = flask.request.data.decode("utf-8") transformed_data = transform_fn(input, flask.request.content_type) # 将transformed_data格式化为CSV字符串 csv_buffer = io.StringIO() csv_writer = csv.writer(csv_buffer) for row in transformed_data: csv_writer.writerow(row) csv_buffer.seek(0) # 以响应中的CSV字符串形式返回转换后的数据 return flask.Response(response=csv_buffer, status=200, mimetype="text/csv") else: print(f"接收到的内容类型:{flask.request.content_type}", flush=True) return flask.Response( response="Transformer:该预测器只支持CSV数据", status=415, mimetype="text/plain", )```
使用featurizer和模型服务栈构建Docker镜像
接下来我们将使用自定义基础映像构建Dockerfile,并安装所需的依赖项。
为此,我们使用python:3.9-slim-buster
作为基础镜像。您可以根据您的用例更改为其他基础镜像。
然后,我们将nginx配置、gunicorn的Web服务器网关文件和推理脚本复制到容器中。我们还创建了一个名为serve的Python脚本,其中在后台启动nginx和gunicorn进程,并将推理脚本(即preprocessing.py Flask应用程序)设置为容器的入口点。
以下是托管特征提取模型的Dockerfile片段。有关完整实现,请参阅featurizer文件夹下的Dockerfile。
```dockerFROM python:3.9-slim-buster…# 将requirements.txt复制到/opt/program文件夹COPY requirements.txt /opt/program/requirements.txt# 安装requirements.txt中列出的软件包RUN pip3 install --no-cache-dir -r /opt/program/requirements.txt# 将code/文件夹的内容复制到/opt/program文件夹COPY code/ /opt/program/# 设置工作目录为/opt/program,其中包含serve和inference.py脚本的路径WORKDIR /opt/program# 将端口8080暴露出来用于服务EXPOSE 8080ENTRYPOINT ["python"]# serve是code/目录下的一个python脚本,用于启动nginx和gunicorn进程CMD [ "serve" ]```
本地测试使用特征提取器的自定义推理图像
现在,使用Amazon SageMaker本地模式构建并测试带有特征提取器的自定义推理容器。本地模式非常适合在Amazon SageMaker上启动任何任务之前测试处理、训练和推理脚本。确认本地测试结果后,您可以轻松地对训练和推理脚本进行适应,以进行部署到Amazon SageMaker并进行最小更改。
要在本地测试特征提取器自定义镜像,请首先使用先前定义的Dockerfile构建镜像。然后,通过将包含特征提取器模型(preprocess.joblib
)的目录挂载到容器内的/opt/ml/model
目录中来启动容器。此外,将容器中的端口8080映射到主机。
启动后,您可以向http://localhost:8080/invocations发送推理请求。
要构建和启动容器,请打开终端并运行以下命令。
请注意,您应该根据以下代码中所示,将<IMAGE_NAME>
替换为您容器的镜像名称。
以下命令还假设训练好的scikit-learn
模型(preprocess.joblib
)存在于一个名为models
的目录下。
```shelldocker build -t <IMAGE_NAME> .``````shelldocker run –rm -v $(pwd)/models:/opt/ml/model -p 8080:8080 <IMAGE_NAME>```
容器运行后,我们可以使用curl命令测试/ping和/invocations路径。
从终端运行下面的命令
```shell# 在本地端点上测试/ping路由curl http://localhost:8080/ping# 将原始的csv字符串发送到/invocations。端点应返回转换后的数据curl --data-raw 'I,0.365,0.295,0.095,0.25,0.1075,0.0545,0.08,9.0' -H 'Content-Type: text/csv' -v http://localhost:8080/invocations```
当将原始(未经转换的)数据发送到http://localhost:8080/invocations时,端点将返回转换后的数据。
您应该会看到类似以下内容的响应:
```shell* 正在尝试 127.0.0.1:8080...* 已连接到本地主机 (127.0.0.1) 端口 8080 (#0)> POST /invocations HTTP/1.1> 主机: localhost:8080> 用户代理: curl/7.87.0> 接受: */*> 内容类型: text/csv> 内容长度: 47>* 标记捆绑包不支持多用途> HTTP/1.1 200 OK> 服务器: nginx/1.14.2> 日期: Sun, 09 Apr 2023 20:47:48 GMT> 内容类型: text/csv; charset=utf-8> 内容长度: 150> 连接: keep-alive-1.3317586042173168, -1.1425409076053987, -1.0579488602777858, -1.177706547272754, -1.130662184748842,* 连接 #0 to host localhost left intact```
我们现在终止运行的容器,然后标记和推送本地自定义镜像到私有的Amazon Elastic Container Registry(Amazon ECR)存储库。
请参考以下命令登录到Amazon ECR,该命令会将本地镜像标记为完整的Amazon ECR镜像路径,然后将镜像推送到Amazon ECR。请确保将region
和account
变量替换为与您的环境匹配的值。
```shell# 使用您的凭证登录到 ECRaws ecr get-login-password - -region "${region}" |\docker login - -username AWS - -password-stdin ${account}".dkr.ecr."${region}".amazonaws.com# 将镜像标记并推送到私有的 Amazon ECRdocker tag ${image} ${fullname}docker push $ {fullname}```
有关更多信息,请参阅使用Amazon ECR仓库的创建仓库和推送镜像到Amazon ECR AWS命令行界面(AWS CLI)命令。
可选步骤
您还可以通过使用Amazon ECR中的自定义Docker镜像将特征提取器模型部署到实时端点来执行实时测试。有关完整的创建、测试和推送自定义镜像到Amazon ECR的实现,请参阅featurizer.ipynb。笔记本。
Amazon SageMaker初始化推理端点并将模型工件复制到容器内的/opt/ml/model
目录。请参阅SageMaker如何加载模型工件。
构建自定义XGBoost预测容器
为构建XGBoost推理容器,我们按照构建特征提取器容器时的类似步骤进行操作:
- 从Amazon S3下载预训练的
XGBoost
模型。 - 创建
inference.py
脚本,该脚本加载预训练的XGBoost
模型,将从特征提取器接收到的转换输入数据转换为XGBoost.DMatrix
格式,在booster上运行predict
,并以json格式返回预测结果。 - 构成模型服务堆栈的脚本和配置文件(即
nginx.conf
,wsgi.py
和serve
)保持不变,无需修改。 - 我们使用
Ubuntu:18.04
作为Dockerfile的基础映像。这不是必需的。我们使用Ubuntu基础映像来演示容器可以使用任何基础映像构建。 - 构建自定义docker镜像、在本地测试镜像以及将测试过的镜像推送到Amazon ECR的步骤与之前相同。
为了简洁起见,因为步骤与上面显示的类似;但是,我们仅展示以下示例中发生的更改的代码。
首先是inference.py
脚本。以下是显示/ping
和/invocations
实现的代码片段。有关此文件的完整实现,请参阅inference.py,位于predictor文件夹下。
```python@app.route("/ping", methods=["GET"])def ping(): """ 检查模型服务器的健康状况,验证模型是否已加载。 如果模型成功加载,则返回状态码200;如果出现错误,则返回状态码500。 返回: flask.Response: 包含状态码和MIME类型的响应对象。 """ status = 200 if model is not None else 500 return flask.Response(response="\n", status=status, mimetype="application/json")@app.route("/invocations", methods=["POST"])def invocations(): """ 处理预测请求,对输入数据进行预处理,进行预测,并将预测结果返回为JSON对象。 此函数会检查请求的内容类型是否受支持(text/csv; charset=utf-8), 如果受支持,则解码输入数据,进行预处理、进行预测,并将预测结果作为JSON对象返回。 如果不支持所请求的内容类型,将返回状态码415。 返回: flask.Response: 包含预测结果、状态码和MIME类型的响应对象。 """ print(f"Predictor: 接收到的内容类型: {flask.request.content_type}") if flask.request.content_type == "text/csv; charset=utf-8": input = flask.request.data.decode("utf-8") transformed_data = preprocess(input, flask.request.content_type) predictions = predict(transformed_data) # 将预测结果作为JSON对象返回 return json.dumps({"result": predictions}) else: print(f"接收到的内容类型: {flask.request.content_type}", flush=True) return flask.Response( response=f"XGBPredictor: 该预测器仅支持CSV数据;接收到的内容类型: {flask.request.content_type}", status=415, mimetype="text/plain", )```
这是用于托管预测器模型的Dockerfile片段。完整的实现请参考 Dockerfile 中预测器文件夹下的代码。
```dockerFROM ubuntu:18.04…# 安装所需的依赖库,包括flask、gunicorn、xgboost等等RUN pip3 --no-cache-dir install flask gunicorn gevent numpy pandas xgboost# 将code/文件夹中的内容复制到/opt/program目录COPY code /opt/program# 设置工作目录为/opt/program目录,其中有serve和inference.py脚本WORKDIR /opt/program# 将端口8080暴露用于服务EXPOSE 8080ENTRYPOINT ["python"]# serve是code/目录下的一个Python脚本,用于启动nginx和gunicorn进程CMD ["serve"]```
接着,我们继续构建、测试并将自定义预测器镜像推送到Amazon ECR的私有仓库中。完整的构建、测试和推送自定义镜像至Amazon ECR的实现请参考 predictor.ipynb 笔记本。
部署串行推理流水线
在我们已经测试完特征工程器和预测器镜像,并将它们推送至Amazon ECR之后,我们现在将模型工件上传到Amazon S3存储桶中。
然后,我们创建两个模型对象:一个用于featurizer
(即preprocess.joblib
),另一个用于predictor
(即xgboost-model
),并指定之前构建的自定义镜像URI。
以下是代码片段示例,请参考 serial-inference-pipeline.ipynb 了解完整的实现。
```pythonsuffix = f"{str(uuid4())[:5]}-{datetime.now().strftime('%d%b%Y')}"# 特征工程器模型(SKLearn 模型)image_name = "<FEATURIZER_IMAGE_NAME>"sklearn_image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{image_name}:latest"featurizer_model_name = f""<FEATURIZER_MODEL_NAME>-{suffix}"print(f"创建特征工程器模型: {featurizer_model_name}")sklearn_model = Model( image_uri=featurizer_ecr_repo_uri, name=featurizer_model_name, model_data=featurizer_model_data, role=role,)# ECR 仓库的完整名称predictor_image_name = "<PREDICTOR_IMAGE_NAME>"predictor_ecr_repo_uri= f"{account_id}.dkr.ecr.{region}.amazonaws.com/{predictor_image_name}:latest"# 预测器模型(XGBoost 模型)predictor_model_name = f"""<PREDICTOR_MODEL_NAME>-{suffix}"print(f"创建预测器模型: {predictor_model_name}")xgboost_model = Model( image_uri=predictor_ecr_repo_uri, name=predictor_model_name, model_data=predictor_model_data, role=role,)```
现在,要按顺序部署这些容器,我们首先创建一个PipelineModel对象,并按相同的顺序将featurizer
模型和predictor
模型传递给Python列表对象。
然后,我们在PipelineModel上调用.deploy()
方法,指定实例类型和实例数量。
```pythonfrom sagemaker.pipeline import PipelineModelpipeline_model_name = f"Abalone-pipeline-{suffix}"pipeline_model = PipelineModel( name=pipeline_model_name, role=role, models=[sklearn_model, xgboost_model], sagemaker_session=sm_session,)print(f"正在部署管道模型 {pipeline_model_name}...")predictor = pipeline_model.deploy( initial_instance_count=1, instance_type="ml.m5.xlarge",)```
在此阶段,Amazon SageMaker将序列推理管道部署到实时端点。我们等待端点变为InService
状态。
现在,我们可以通过向此实时端点发送一些推理请求来测试端点。
请参考serial-inference-pipeline.ipynb以获取完整的实现。
清理
在完成测试后,请按照笔记本清理部分的说明删除本文中提供的资源,以避免不必要的费用。有关推理实例费用的详细信息,请参阅Amazon SageMaker Pricing。
```python# 删除端点和模型try: print(f"正在删除模型:{pipeline_model_name}") predictor.delete_model()except Exception as e: print(f"删除模型时出错:{pipeline_model_name}\n{e}") passtry: print(f"正在删除端点:{endpoint_name}") predictor.delete_endpoint()except Exception as e: print(f"删除端点时出错:{endpoint_name}\n{e}") pass```
结论
在本文中,我展示了如何使用自定义推理容器在Amazon SageMaker上构建和部署串行ML推理应用程序到实时端点。
此解决方案演示了客户如何以成本效益的方式将自定义容器带入Amazon SageMaker进行托管。通过BYOC选项,客户可以快速构建和调整ML应用程序,以部署到Amazon SageMaker上。
我们鼓励您尝试此解决方案,并使用与您业务关键绩效指标(KPI)相关的数据集。您可以在此GitHub存储库中找到整个解决方案。
参考资料