Press "Enter" to skip to content

从头开始构建和部署使用Amazon SageMaker的机器学习推理应用程序

随着机器学习(ML)进入主流并得到更广泛的应用,基于ML的推断应用程序在解决一系列复杂业务问题方面越来越常见。解决这些复杂业务问题通常需要使用多个ML模型和步骤。本文向您展示如何使用自定义容器在Amazon SageMaker上构建和托管一个ML应用。

Amazon SageMaker提供内置算法预构建的SageMaker Docker镜像以进行模型部署。但是,如果它们不满足您的需求,则可以自己带入容器(BYOC)以在Amazon SageMaker上托管。

有几种情况下,用户可能需要BYOC来在Amazon SageMaker上进行托管。

  1. 自定义ML框架或库:如果您打算使用Amazon SageMaker内置算法或预构建容器不支持的ML框架或库,则需要创建自定义容器。
  2. 专用模型:对于某些领域或行业,您可能需要特定的模型架构或定制的预处理步骤,而这些在内置的Amazon SageMaker提供中不可用。
  3. 专有算法:如果您在公司内部开发了自己的专有算法,则需要一个自定义容器来在Amazon SageMaker上部署它们。
  4. 复杂推断流程:如果您的ML推断工作流涉及自定义业务逻辑 – 一系列需要按特定顺序执行的复杂步骤 – 则BYOC可以帮助您更高效地管理和编排这些步骤。

解决方案概述

在本解决方案中,我们展示了如何使用最新的scikit-learnxgboost包,在Amazon SageMaker上托管一个ML串行推断应用,并使用实时端点。

第一个容器使用scikit-learn模型将原始数据转换为特征化列。它对数值列应用StandardScaler,对分类列应用OneHotEncoder

从头开始构建和部署使用Amazon SageMaker的机器学习推理应用程序 四海 第1张

第二个容器托管了一个预训练的XGboost模型(即预测器)。预测器模型接受特征化的输入并输出预测结果。

从头开始构建和部署使用Amazon SageMaker的机器学习推理应用程序 四海 第2张

最后,我们将特征化器和预测器部署到Amazon SageMaker实时端点中的串行推断流水线中。

以下是将在推断应用程序中使用单独容器的一些不同考虑因素。

  • 解耦 – 流水线的各个步骤具有明确定义的目的,并且由于涉及到的底层依赖关系,需要在单独的容器上运行。这也有助于保持流水线的良好结构。
  • 框架 – 流水线的各个步骤使用特定的适用于目的的框架(如scikit或Spark ML),因此需要在单独的容器上运行。
  • 资源隔离 – 流水线的各个步骤具有不同的资源消耗需求,因此需要在单独的容器中运行,以获得更灵活和可控的资源隔离。
  • 维护和升级 – 从运营角度来看,这促进了功能隔离,您可以继续更轻松地升级或修改各个步骤,而不影响其他模型。

此外,个别容器的本地构建有助于使用喜爱的工具和集成开发环境(IDE)进行开发和测试的迭代过程。容器准备好后,您可以使用Amazon SageMaker端点将它们部署到AWS云中进行推断。

完整的实现,包括代码片段,可以在此Github存储库中找到这里

从头开始构建和部署使用Amazon SageMaker的机器学习推理应用程序 四海 第3张

前提条件

在我们首先在本地测试这些自定义容器之前,您需要在本地计算机上安装Docker桌面。您应该熟悉构建Docker容器的方法。

您还需要一个具有访问Amazon SageMaker、Amazon ECR和Amazon S3的AWS账户,以便测试这个应用程序的端到端功能。

确保您已安装了最新版本的Boto3和Amazon SageMaker的Python包:

pip install --upgrade boto3 sagemaker scikit-learn

解决方案演练

构建自定义特征提取器容器

为了构建第一个容器,即特征提取器容器,我们训练一个scikit-learn模型来处理鲍鱼数据集中的原始特征。预处理脚本使用SimpleImputer来处理缺失值,StandardScaler来对数值列进行归一化,以及OneHotEncoder来对分类列进行转换。在拟合转换器后,我们将模型保存为joblib格式。然后,我们压缩和上传此保存的模型工件到Amazon简单存储服务(Amazon S3)桶。

下面是一个演示此过程的示例代码片段。详细实现请参考featurizer.ipynb

```pythonnumeric_features = list(feature_columns_names)numeric_features.remove("sex")numeric_transformer = Pipeline(    steps=[        ("imputer", SimpleImputer(strategy="median")),        ("scaler", StandardScaler()),    ])categorical_features = ["sex"]categorical_transformer = Pipeline(    steps=[        ("imputer", SimpleImputer(strategy="constant", fill_value="missing")),        ("onehot", OneHotEncoder(handle_unknown="ignore")),    ])preprocess = ColumnTransformer(    transformers=[        ("num", numeric_transformer, numeric_features),        ("cat", categorical_transformer, categorical_features),    ])# 调用ColumnTransformer的fit方法来将所有转换器拟合到X、ypreprocessor = preprocess.fit(df_train_val)# 将处理器模型保存到磁盘joblib.dump(preprocess, os.path.join(model_dir, "preprocess.joblib"))```

接下来,为特征提取器模型创建一个自定义推断容器,我们构建一个包含nginx、gunicorn、flask包以及其他特征提取器模型所需依赖项的Docker镜像。

Nginx、gunicorn和Flask应用程序将作为Amazon SageMaker实时端点的模型服务堆栈。

当将自定义容器用于在Amazon SageMaker上托管时,我们需要确保推断脚本在容器内启动后执行以下任务:

  1. 模型加载:推断脚本(preprocessing.py)应该引用/opt/ml/model目录来加载容器中的模型。Amazon S3中的模型工件将被下载并挂载到容器的路径/opt/ml/model上。
  2. 环境变量:要将自定义环境变量传递给容器,在Model创建步骤或从训练作业创建Endpoint时,必须指定这些变量。
  3. API要求:推断脚本必须在Flask应用程序中实现/ping/invocations路由。其中,/ping API用于健康检查,/invocations API用于处理推断请求。
  4. 日志记录:推断脚本中的输出日志必须写入标准输出(stdout)和标准错误(stderr)流。然后,Amazon SageMaker将这些日志流式传输到Amazon CloudWatch

下面是preprocessing.py中的代码片段,展示了/ping/invocations的实现:

请参考featurizer文件夹下的preprocessing.py以获取完整的实现。

```pythondef load_model():    # 构建featurizer模型文件的路径    ft_model_path = os.path.join(MODEL_PATH, "preprocess.joblib")    featurizer = None    try:        # 打开模型文件并使用joblib加载featurizer        with open(ft_model_path, "rb") as f:            featurizer = joblib.load(f)            print("加载Featurizer模型成功", flush=True)    except FileNotFoundError:        print(f"错误:找不到featurizer模型文件:{ft_model_path}", flush=True)    except Exception as e:        print(f"加载featurizer模型时发生错误:{e}", flush=True)    # 返回加载的featurizer模型,如果发生错误则返回None    return featurizerdef transform_fn(request_body, request_content_type):    """    将请求体转换为可用于模型的numpy数组。    此函数将请求体和内容类型作为输入,    并返回一个转换后的numpy数组,可用作预测模型的输入。    参数:        request_body (str):包含输入数据的请求体。        request_content_type (str): 请求体的内容类型。    返回:        data (np.ndarray): 转换后的输入数据,以numpy数组形式返回。    """    # 定义输入数据的列名    feature_columns_names = [        "性别",        "长度",        "直径",        "高度",        "总重量",        "去壳重量",        "内脏重量",        "壳重量",    ]    label_column = "环数"    # 检查请求体的内容类型是否支持(text/csv)    if request_content_type == "text/csv":        # 加载featurizer模型        featurizer = load_model()        # 检查featurizer是否是ColumnTransformer        if isinstance(            featurizer, sklearn.compose._column_transformer.ColumnTransformer        ):            print(f"加载featurizer模型成功", flush=True)        # 从请求体中以CSV文件形式读取输入数据        df = pd.read_csv(StringIO(request_body), header=None)        # 根据输入数据的列数指定列名        if len(df.columns) == len(feature_columns_names) + 1:            # 这是一个带标签的例子,包含环数标签            df.columns = feature_columns_names + [label_column]        elif len(df.columns) == len(feature_columns_names):            # 这是一个无标签的例子            df.columns = feature_columns_names        # 使用featurizer对输入数据进行转换        data = featurizer.transform(df)        # 以numpy数组形式返回转换后的数据        return data    else:        # 如果内容类型不支持,则抛出错误        raise ValueError("不支持的内容类型:{}".format(request_content_type))@app.route("/ping", methods=["GET"])def ping():    # 检查是否可以加载模型,根据情况设置状态    featurizer = load_model()    status = 200 if featurizer is not None else 500    # 返回确定的状态码响应    return flask.Response(response="\n", status=status, mimetype="application/json")@app.route("/invocations", methods=["POST"])def invocations():    # 从JSON转换为字典    print(f"接收到的内容类型:{flask.request.content_type}")    if flask.request.content_type == "text/csv":        # 解码输入数据并进行转换        input = flask.request.data.decode("utf-8")        transformed_data = transform_fn(input, flask.request.content_type)        # 将transformed_data格式化为CSV字符串        csv_buffer = io.StringIO()        csv_writer = csv.writer(csv_buffer)        for row in transformed_data:            csv_writer.writerow(row)        csv_buffer.seek(0)        # 以响应中的CSV字符串形式返回转换后的数据        return flask.Response(response=csv_buffer, status=200, mimetype="text/csv")    else:        print(f"接收到的内容类型:{flask.request.content_type}", flush=True)        return flask.Response(            response="Transformer:该预测器只支持CSV数据",            status=415,            mimetype="text/plain",        )```

使用featurizer和模型服务栈构建Docker镜像

接下来我们将使用自定义基础映像构建Dockerfile,并安装所需的依赖项。

为此,我们使用python:3.9-slim-buster作为基础镜像。您可以根据您的用例更改为其他基础镜像。

然后,我们将nginx配置、gunicorn的Web服务器网关文件和推理脚本复制到容器中。我们还创建了一个名为serve的Python脚本,其中在后台启动nginx和gunicorn进程,并将推理脚本(即preprocessing.py Flask应用程序)设置为容器的入口点。

以下是托管特征提取模型的Dockerfile片段。有关完整实现,请参阅featurizer文件夹下的Dockerfile

```dockerFROM python:3.9-slim-buster…# 将requirements.txt复制到/opt/program文件夹COPY requirements.txt /opt/program/requirements.txt# 安装requirements.txt中列出的软件包RUN pip3 install --no-cache-dir -r /opt/program/requirements.txt# 将code/文件夹的内容复制到/opt/program文件夹COPY code/ /opt/program/# 设置工作目录为/opt/program,其中包含serve和inference.py脚本的路径WORKDIR /opt/program# 将端口8080暴露出来用于服务EXPOSE 8080ENTRYPOINT ["python"]# serve是code/目录下的一个python脚本,用于启动nginx和gunicorn进程CMD [ "serve" ]```

本地测试使用特征提取器的自定义推理图像

现在,使用Amazon SageMaker本地模式构建并测试带有特征提取器的自定义推理容器。本地模式非常适合在Amazon SageMaker上启动任何任务之前测试处理、训练和推理脚本。确认本地测试结果后,您可以轻松地对训练和推理脚本进行适应,以进行部署到Amazon SageMaker并进行最小更改。

要在本地测试特征提取器自定义镜像,请首先使用先前定义的Dockerfile构建镜像。然后,通过将包含特征提取器模型(preprocess.joblib)的目录挂载到容器内的/opt/ml/model目录中来启动容器。此外,将容器中的端口8080映射到主机。

启动后,您可以向http://localhost:8080/invocations发送推理请求。

要构建和启动容器,请打开终端并运行以下命令。

请注意,您应该根据以下代码中所示,将<IMAGE_NAME>替换为您容器的镜像名称。

以下命令还假设训练好的scikit-learn模型(preprocess.joblib)存在于一个名为models的目录下。

```shelldocker build -t <IMAGE_NAME> .``````shelldocker run –rm -v $(pwd)/models:/opt/ml/model -p 8080:8080 <IMAGE_NAME>```

容器运行后,我们可以使用curl命令测试/ping/invocations路径。

从终端运行下面的命令

```shell# 在本地端点上测试/ping路由curl http://localhost:8080/ping# 将原始的csv字符串发送到/invocations。端点应返回转换后的数据curl --data-raw 'I,0.365,0.295,0.095,0.25,0.1075,0.0545,0.08,9.0' -H 'Content-Type: text/csv' -v http://localhost:8080/invocations```

当将原始(未经转换的)数据发送到http://localhost:8080/invocations时,端点将返回转换后的数据。

您应该会看到类似以下内容的响应:

```shell* 正在尝试 127.0.0.1:8080...* 已连接到本地主机 (127.0.0.1) 端口 8080 (#0)> POST /invocations HTTP/1.1> 主机: localhost:8080> 用户代理: curl/7.87.0> 接受: */*> 内容类型: text/csv> 内容长度: 47>* 标记捆绑包不支持多用途> HTTP/1.1 200 OK> 服务器: nginx/1.14.2> 日期: Sun, 09 Apr 2023 20:47:48 GMT> 内容类型: text/csv; charset=utf-8> 内容长度: 150> 连接: keep-alive-1.3317586042173168, -1.1425409076053987, -1.0579488602777858, -1.177706547272754, -1.130662184748842,* 连接 #0 to host localhost left intact```

我们现在终止运行的容器,然后标记和推送本地自定义镜像到私有的Amazon Elastic Container Registry(Amazon ECR)存储库。

请参考以下命令登录到Amazon ECR,该命令会将本地镜像标记为完整的Amazon ECR镜像路径,然后将镜像推送到Amazon ECR。请确保将regionaccount变量替换为与您的环境匹配的值。

```shell# 使用您的凭证登录到 ECRaws ecr get-login-password - -region "${region}" |\docker login - -username AWS - -password-stdin ${account}".dkr.ecr."${region}".amazonaws.com# 将镜像标记并推送到私有的 Amazon ECRdocker tag ${image} ${fullname}docker push $ {fullname}```

有关更多信息,请参阅使用Amazon ECR仓库的创建仓库推送镜像到Amazon ECR AWS命令行界面(AWS CLI)命令。

可选步骤

您还可以通过使用Amazon ECR中的自定义Docker镜像将特征提取器模型部署到实时端点来执行实时测试。有关完整的创建、测试和推送自定义镜像到Amazon ECR的实现,请参阅featurizer.ipynb。笔记本。

Amazon SageMaker初始化推理端点并将模型工件复制到容器内的/opt/ml/model目录。请参阅SageMaker如何加载模型工件

构建自定义XGBoost预测容器

为构建XGBoost推理容器,我们按照构建特征提取器容器时的类似步骤进行操作:

  1. 从Amazon S3下载预训练的XGBoost模型。
  2. 创建inference.py脚本,该脚本加载预训练的XGBoost模型,将从特征提取器接收到的转换输入数据转换为XGBoost.DMatrix格式,在booster上运行predict,并以json格式返回预测结果。
  3. 构成模型服务堆栈的脚本和配置文件(即nginx.confwsgi.pyserve)保持不变,无需修改。
  4. 我们使用Ubuntu:18.04作为Dockerfile的基础映像。这不是必需的。我们使用Ubuntu基础映像来演示容器可以使用任何基础映像构建。
  5. 构建自定义docker镜像、在本地测试镜像以及将测试过的镜像推送到Amazon ECR的步骤与之前相同。

为了简洁起见,因为步骤与上面显示的类似;但是,我们仅展示以下示例中发生的更改的代码。

首先是inference.py脚本。以下是显示/ping/invocations实现的代码片段。有关此文件的完整实现,请参阅inference.py,位于predictor文件夹下。

```python@app.route("/ping", methods=["GET"])def ping():    """    检查模型服务器的健康状况,验证模型是否已加载。    如果模型成功加载,则返回状态码200;如果出现错误,则返回状态码500。    返回:        flask.Response: 包含状态码和MIME类型的响应对象。    """    status = 200 if model is not None else 500    return flask.Response(response="\n", status=status, mimetype="application/json")@app.route("/invocations", methods=["POST"])def invocations():    """    处理预测请求,对输入数据进行预处理,进行预测,并将预测结果返回为JSON对象。    此函数会检查请求的内容类型是否受支持(text/csv; charset=utf-8),    如果受支持,则解码输入数据,进行预处理、进行预测,并将预测结果作为JSON对象返回。    如果不支持所请求的内容类型,将返回状态码415。    返回:        flask.Response: 包含预测结果、状态码和MIME类型的响应对象。    """    print(f"Predictor: 接收到的内容类型: {flask.request.content_type}")    if flask.request.content_type == "text/csv; charset=utf-8":        input = flask.request.data.decode("utf-8")        transformed_data = preprocess(input, flask.request.content_type)        predictions = predict(transformed_data)        # 将预测结果作为JSON对象返回        return json.dumps({"result": predictions})    else:        print(f"接收到的内容类型: {flask.request.content_type}", flush=True)        return flask.Response(            response=f"XGBPredictor: 该预测器仅支持CSV数据;接收到的内容类型: {flask.request.content_type}",            status=415,            mimetype="text/plain",        )```

这是用于托管预测器模型的Dockerfile片段。完整的实现请参考 Dockerfile 中预测器文件夹下的代码。

```dockerFROM ubuntu:18.04…# 安装所需的依赖库,包括flask、gunicorn、xgboost等等RUN pip3 --no-cache-dir install  flask  gunicorn  gevent  numpy  pandas  xgboost# 将code/文件夹中的内容复制到/opt/program目录COPY code /opt/program# 设置工作目录为/opt/program目录,其中有serve和inference.py脚本WORKDIR /opt/program# 将端口8080暴露用于服务EXPOSE 8080ENTRYPOINT ["python"]# serve是code/目录下的一个Python脚本,用于启动nginx和gunicorn进程CMD ["serve"]```

接着,我们继续构建、测试并将自定义预测器镜像推送到Amazon ECR的私有仓库中。完整的构建、测试和推送自定义镜像至Amazon ECR的实现请参考 predictor.ipynb 笔记本。

部署串行推理流水线

在我们已经测试完特征工程器和预测器镜像,并将它们推送至Amazon ECR之后,我们现在将模型工件上传到Amazon S3存储桶中。

然后,我们创建两个模型对象:一个用于featurizer(即preprocess.joblib),另一个用于predictor(即xgboost-model),并指定之前构建的自定义镜像URI。

以下是代码片段示例,请参考 serial-inference-pipeline.ipynb 了解完整的实现。

```pythonsuffix = f"{str(uuid4())[:5]}-{datetime.now().strftime('%d%b%Y')}"# 特征工程器模型(SKLearn 模型)image_name = "<FEATURIZER_IMAGE_NAME>"sklearn_image_uri = f"{account_id}.dkr.ecr.{region}.amazonaws.com/{image_name}:latest"featurizer_model_name = f""<FEATURIZER_MODEL_NAME>-{suffix}"print(f"创建特征工程器模型: {featurizer_model_name}")sklearn_model = Model(    image_uri=featurizer_ecr_repo_uri,    name=featurizer_model_name,    model_data=featurizer_model_data,    role=role,)# ECR 仓库的完整名称predictor_image_name = "<PREDICTOR_IMAGE_NAME>"predictor_ecr_repo_uri= f"{account_id}.dkr.ecr.{region}.amazonaws.com/{predictor_image_name}:latest"# 预测器模型(XGBoost 模型)predictor_model_name = f"""<PREDICTOR_MODEL_NAME>-{suffix}"print(f"创建预测器模型: {predictor_model_name}")xgboost_model = Model(    image_uri=predictor_ecr_repo_uri,    name=predictor_model_name,    model_data=predictor_model_data,    role=role,)```

现在,要按顺序部署这些容器,我们首先创建一个PipelineModel对象,并按相同的顺序将featurizer模型和predictor模型传递给Python列表对象。

然后,我们在PipelineModel上调用.deploy()方法,指定实例类型和实例数量。

```pythonfrom sagemaker.pipeline import PipelineModelpipeline_model_name = f"Abalone-pipeline-{suffix}"pipeline_model = PipelineModel(    name=pipeline_model_name,    role=role,    models=[sklearn_model, xgboost_model],    sagemaker_session=sm_session,)print(f"正在部署管道模型 {pipeline_model_name}...")predictor = pipeline_model.deploy(    initial_instance_count=1,    instance_type="ml.m5.xlarge",)```

在此阶段,Amazon SageMaker将序列推理管道部署到实时端点。我们等待端点变为InService状态。

现在,我们可以通过向此实时端点发送一些推理请求来测试端点。

请参考serial-inference-pipeline.ipynb以获取完整的实现。

清理

在完成测试后,请按照笔记本清理部分的说明删除本文中提供的资源,以避免不必要的费用。有关推理实例费用的详细信息,请参阅Amazon SageMaker Pricing

```python# 删除端点和模型try:    print(f"正在删除模型:{pipeline_model_name}")    predictor.delete_model()except Exception as e:    print(f"删除模型时出错:{pipeline_model_name}\n{e}")    passtry:    print(f"正在删除端点:{endpoint_name}")    predictor.delete_endpoint()except Exception as e:    print(f"删除端点时出错:{endpoint_name}\n{e}")    pass```

结论

在本文中,我展示了如何使用自定义推理容器在Amazon SageMaker上构建和部署串行ML推理应用程序到实时端点。

此解决方案演示了客户如何以成本效益的方式将自定义容器带入Amazon SageMaker进行托管。通过BYOC选项,客户可以快速构建和调整ML应用程序,以部署到Amazon SageMaker上。

我们鼓励您尝试此解决方案,并使用与您业务关键绩效指标(KPI)相关的数据集。您可以在此GitHub存储库中找到整个解决方案。

参考资料

Leave a Reply

Your email address will not be published. Required fields are marked *