Press "Enter" to skip to content

部署自定义ML模型作为SageMaker端点

Photo by Ricardo Gomez Angel on Unsplash

SageMaker端点部署

为您的模型创建AWS SageMaker端点的快速简易指南

开发机器学习(ML)模型涉及一系列关键步骤,从数据收集到模型部署。在通过测试完善算法并确保性能后,最关键的最后一步是部署。这个阶段将创新转化为实用工具,让他人受益于模型的预测能力。部署的ML模型将开发和现实世界的影响联系起来,为用户和利益相关者提供切实的好处。

本指南涵盖了开发自定义ML作为SageMaker端点所需的基本步骤。在此阶段,我假设您已经有一个可用的模型,并希望通过端点将其暴露给全世界。本指南将指导您部署一个基于PyTorch的模型,该模型旨在预测视频剪辑中的异常情况。该模型,称为AI VAD,基于论文“基于属性的表示以实现准确可解释的视频异常检测”,其实现可以在anomalib GitHub存储库中找到,由OpenVINO提供支持。要阅读更多关于这种有趣方法的信息,请向下滚动到博客的附录部分。

在此阶段,我想强调的是,在这种情况下,我们不能使用特别用于部署PyTorch模型的PyTorchModel抽象,原因有两个。第一是我们有anomalib包作为附加依赖项,该附加依赖项不包含在预构建的PyTorch Sagemaker映像中。第二个原因是该模型需要额外的有关训练步骤中学到的信息,而这些信息不是PyTorch模型权重的一部分。

以下是实现此目标的步骤:

  1. 编写Sagemaker模型服务脚本
  2. 将模型上传到S3
  3. 将自定义Docker映像上传到AWS ECR
  4. 在SageMaker中创建模型
  5. 创建端点配置
  6. 创建端点
  7. 调用端点

编写Sagemaker模型服务脚本

Sagemaker模型服务脚本(inference.py)是创建Sagemaker模型时的重要组成部分。它在机器学习模型和现实世界数据之间架起桥梁。它处理传入的请求,运行模型预测,并返回结果,从而影响应用程序的决策过程。

inference.py脚本由几个关键方法组成,每个方法都有不同的目的,共同促进模型服务过程。以下列出了其中的四个主要方法。

  1. model_fn方法负责加载训练好的模型。它读取已保存的模型工件,并返回一个可用于预测的模型对象。该方法仅在启动SageMaker模型服务器时调用一次。
  2. input_fn方法将请求数据格式化为适合进行预测的形式。例如,在下面的代码中,此函数根据数据来源(图像字节或S3 URI列表)以及是否应将帧列表视为一个视频剪辑而以不同的方式格式化数据。
  3. predict_fn方法使用格式化的请求数据执行推理操作,并针对加载的模型进行预测。
  4. 最后,output_fn方法用于将预测结果格式化为响应消息。例如,将其打包为JSON对象。

以下是Sagemaker模型服务脚本的代码。

import osimport jsonimport joblibimport torchfrom PIL import Imageimport numpy as npimport ioimport boto3from enum import Enumfrom urllib.parse import urlsplitfrom omegaconf import OmegaConffrom anomalib.data.utils import read_image, InputNormalizationMethod, get_transformsfrom anomalib.models.ai_vad.torch_model import AiVadModeldevice = "cuda"class PredictMode(Enum):    frame = 1    batch = 2    clip = 3    def model_fn(model_dir):    """    此函数是在预测请求时首次执行的函数,它从磁盘中加载模型并返回稍后用于推理的模型对象。    """    # 加载配置文件    config = OmegaConf.load(os.path.join(model_dir, "ai_vad_config.yaml"))    config_model = config.model    # 加载模型    model = AiVadModel(            box_score_thresh=config_model.box_score_thresh,            persons_only=config_model.persons_only,            min_bbox_area=config_model.min_bbox_area,            max_bbox_overlap=config_model.max_bbox_overlap,            enable_foreground_detections=config_model.enable_foreground_detections,            foreground_kernel_size=config_model.foreground_kernel_size,            foreground_binary_threshold=config_model.foreground_binary_threshold,            n_velocity_bins=config_model.n_velocity_bins,            use_velocity_features=config_model.use_velocity_features,            use_pose_features=config_model.use_pose_features,            use_deep_features=config_model.use_deep_features,            n_components_velocity=config_model.n_components_velocity,            n_neighbors_pose=config_model.n_neighbors_pose,            n_neighbors_deep=config_model.n_neighbors_deep,        )    # 加载模型权重    model.load_state_dict(torch.load(os.path.join(model_dir, "ai_vad_weights.pth"), map_location=device), strict=False)    # 加载内存库    velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank = joblib.load(os.path.join(model_dir, "ai_vad_banks.joblib"))     if velocity_estimator_memory_bank is not None:        model.density_estimator.velocity_estimator.memory_bank = velocity_estimator_memory_bank    if pose_estimator_memory_bank is not None:        model.density_estimator.pose_estimator.memory_bank = pose_estimator_memory_bank    if appearance_estimator_memory_bank is not None:        model.density_estimator.appearance_estimator.memory_bank = appearance_estimator_memory_bank    model.density_estimator.fit()    # 将整个模型移至指定设备    model = model.to(device)    # 获取转换器    transform_config = config.dataset.transform_config.eval if "transform_config" in config.dataset.keys() else None    image_size = (config.dataset.image_size[0], config.dataset.image_size[1])    center_crop = config.dataset.get("center_crop")    center_crop = tuple(center_crop) if center_crop is not None else None    normalization = InputNormalizationMethod(config.dataset.normalization)    transform = get_transforms(config=transform_config, image_size=image_size, center_crop=center_crop, normalization=normalization)    return model, transformdef input_fn(request_body, request_content_type):    """    request_body由SageMaker传入,content type由客户端(或调用方)通过HTTP头传入。    """    print("input_fn-----------------------")    if request_content_type in ("application/x-image", "image/x-image"):        image = Image.open(io.BytesIO(request_body)).convert("RGB")        numpy_array = np.array(image)        print("numpy_array.shape", numpy_array.shape)        print("input_fn-----------------------")        return [numpy_array], PredictMode.frame    elif request_content_type == "application/json":        request_body_json = json.loads(request_body)        s3_uris = request_body_json.get("images", [])        if len(s3_uris) == 0:            raise ValueError(f"Images是必需的键,且应至少包含一个S3 URI的列表")        s3 = boto3.client("s3")        frame_paths = []        for s3_uri in s3_uris:            parsed_url = urlsplit(s3_uri)            bucket_name = parsed_url.netloc            object_key = parsed_url.path.lstrip('/')            local_frame_path = f"/tmp/{s3_uri.replace('/', '_')}"            # 从S3下载帧            s3.download_file(bucket_name, object_key, local_frame_path)            frame_paths.append(local_frame_path)        frames = np.stack([torch.Tensor(read_image(frame_path)) for frame_path in frame_paths], axis=0)                predict_mode = PredictMode.clip if request_body_json.get("clip", False) else PredictMode.batch                print("frames.shape", frames.shape)        print("predict_mode", predict_mode)        print("input_fn-----------------------")        return frames, predict_mode    # 如果request_content_type不符合预期,则引发异常    raise ValueError(f"不支持的内容类型 {request_content_type}")def predict_fn(input_data, model):    """    此函数对输入数据和model_fn返回的模型进行处理,它在model_fn之后执行,并将其输出作为API响应返回。    """    print("predict_fn-----------------------")    model, transform = model        frames, predict_mode = input_data    processed_data = {}    processed_data["image"] = [transform(image=frame)["image"] for frame in frames]    processed_data["image"] = torch.stack(processed_data["image"])    image = processed_data["image"].to(device)        # 为批次大小为1的clip添加一个维度    if predict_mode == PredictMode.clip:        image = image.unsqueeze(0)    print("image.shape", image.shape)    model.eval()    with torch.no_grad():        boxes, anomaly_scores, image_scores = model(image)    print("boxes_len", [len(b) for b in boxes])    processed_data["pred_boxes"] = [box.int() for box in boxes]    processed_data["box_scores"] = [score.to(device) for score in anomaly_scores]    processed_data["pred_scores"] = torch.Tensor(image_scores).to(device)    print("predict_fn-----------------------")    return processed_datadef output_fn(prediction, accept):    """    模型预测的后处理函数。在predict_fn之后执行。    """    print("output_fn-----------------------")    # 检查accept类型是否为JSON    if accept != "application/json":        raise ValueError(f"不支持的Accept类型 {accept}")    # 将PyTorch张量转换为列表,以便可以JSON序列化    for key in prediction:        # 如果是torch.Tensor,则将其转换为列表        if isinstance(prediction[key], torch.Tensor):            prediction[key] = prediction[key].tolist()        # 如果是列表,则转换列表中的每个张量        elif isinstance(prediction[key], list):            prediction[key] = [tensor.tolist() if isinstance(tensor, torch.Tensor) else tensor for tensor in prediction[key]]    print("output_fn-----------------------")    return json.dumps(prediction), accept

顺便说一句,在进行下一步之前强烈建议测试模型服务脚本。可以通过以下代码模拟调用管道来轻松完成。

import jsonfrom inference import model_fn, predict_fn, input_fn, output_fnresponse, accept = output_fn(    predict_fn(        input_fn(payload, "application/x-image"),        model_fn("../")    ),    "application/json")json.loads(response).keys()

将模型上传到 S3

为了创建一个加载 AI VAD PyTorch 模型的 SageMaker 端点,我们需要以下文件:

  • AI VAD PyTorch 模型的权重(即 state_dict)
  • 密度估计器的内存银行(不是模型的权重的一部分)
  • PyTorch 模型的超参数配置文件
  • Sagemaker 模型服务脚本(inference.py

下面的代码演示了如何将所有必需的文件组织在一个目录中。

顺便说一句,我重写了内置的 PyTorch ModelCheckpoint 回调函数,以确保这些内存银行作为检查点保存的一部分被保存(实现代码可以在这里找到)。

import torchimport joblibimport shutilcheckpoint = "results/ai_vad/ucsd/run/weights/lightning/model.ckpt"config_path = "results/ai_vad/ucsd/run/config.yaml"model_weights = torch.load(checkpoint)model_state_dict = model_weights["state_dict"]torch.save(model_state_dict, "../ai_vad_weights.pth")velocity_estimator_memory_bank = Nonepose_estimator_memory_bank = Noneappearance_estimator_memory_bank = Noneif "velocity_estimator_memory_bank" in model_weights:    velocity_estimator_memory_bank = model_weights["velocity_estimator_memory_bank"]if "pose_estimator_memory_bank" in model_weights:    pose_estimator_memory_bank = model_weights["pose_estimator_memory_bank"]if "appearance_estimator_memory_bank" in model_weights:    appearance_estimator_memory_bank = model_weights["appearance_estimator_memory_bank"]banks = (velocity_estimator_memory_bank, pose_estimator_memory_bank, appearance_estimator_memory_bank)joblib.dump(banks, "../ai_vad_banks.joblib")shutil.copyfile(config_path, "../ai_vad_config.yaml")

然后,使用下面的命令将这四个文件压缩成tar.gz格式。

tar -czvf ../ai_vad_model.tar.gz -C ../ ai_vad_weights.pth ai_vad_banks.joblib ai_vad_config.yaml inference.py

最后,使用 boto3 将文件上传到 S3。

import boto3from datetime import datetimecurrent_datetime = datetime.now().strftime('%Y-%m-%d-%H-%M-%S')s3 = boto3.resource('s3')s3.meta.client.upload_file("../ai_vad_model.tar.gz", "ai-vad", f"{current_datetime}/ai_vad_model.tar.gz")

将自定义的 Docker 镜像上传到 AWS ECR

如上所述,由于我们有一个额外的依赖项没有包含在预构建的 PyTorch Sagemaker 镜像中(即 anomalib 包),因此我们为此创建了一个新的 Docker 镜像。在构建自定义 Docker 镜像之前,需要对 Amazon ECR 仓库进行身份验证。

REGION=<my_aws_region>ACCOUNT=<my_aws_account> # 认证将 Docker 配置到 Amazon ECR 注册表aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin <docker_registry_url>.dkr.ecr.$REGION.amazonaws.com# 登录到您的私有 Amazon ECR 注册表aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ACCOUNT.dkr.ecr.$REGION.amazonaws.com

下面是 Dockerfile,不同的 Docker 注册表路径可以在此处找到。根据模型的需求(CPU/GPU,Python 版本等)和您的 AWS 区域选择正确的注册表路径。例如,如果区域是us-east-1,完整的 Docker 注册表路径应该类似于:763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.0-gpu-py310

# 将SageMaker PyTorch镜像作为基本镜像
FROM .dkr.ecr..amazonaws.com/pytorch-inference:2.0.0-gpu-py310
# 安装额外依赖项
RUN pip install "git+https://github.com/hairozen/anomalib.git@ai-vad-inference-improvements"

现在,我们可以运行经典的Docker构建命令来构建这个自定义镜像。
“`docker build -t ai-vad-image .“`
下一步是为我们构建的新镜像创建AWS ECR存储库,对其进行标记,并将镜像推送到AWS ECR存储库。
“`# 创建AWS ECR存储库
aws ecr create-repository –repository-name ai-vad-image
# 对镜像进行标记
docker tag ai-vad-image:latest $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest
# 将标记的镜像推送到AWS ECR存储库
docker push $ACCOUNT.dkr.ecr.$REGION.amazonaws.com/ai-vad-image:latest“`

在SageMaker中创建模型

这一步非常简单。以下是代码。
“`import boto3
import sagemaker

# 创建SageMaker客户端
sagemaker_client = boto3.client(service_name=”sagemaker”)
# 获取执行角色
role = sagemaker.get_execution_role()
# 设置模型名称
model_name = f”ai-vad-model-{current_datetime}”
# 设置主要容器
primary_container = {
“Image”: f”{my_aws_account}.dkr.ecr.{my_aws_region}.amazonaws.com/ai-vad-image:latest”,
“ModelDataUrl”: f”s3://ai-vad/{current_datetime}/ai_vad_model.tar.gz”
}
# 创建模型
create_model_response = sagemaker_client.create_model(
ModelName=model_name,
ExecutionRoleArn=role,
PrimaryContainer=primary_container)“`

创建端点配置

下一步是创建端点配置。下面是一个基本示例。
“`# 设置端点配置名称
endpoint_config_name = f”ai-vad-model-config-{current_datetime}”
# 创建端点配置
sagemaker_client.create_endpoint_config(
EndpointConfigName=endpoint_config_name,
ProductionVariants=[
{
“InstanceType”: “ml.g5.xlarge”,
“InitialVariantWeight”: 1,
“InitialInstanceCount”: 1,
“ModelName”: model_name,
“VariantName”: “AllTraffic”
}
])“`

创建端点

现在,我们已经准备好创建端点本身了。
“`# 设置端点名称
endpoint_name = f”ai-vad-model-endpoint-{current_datetime}”
# 创建端点
sagemaker_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=endpoint_config_name)“`
请注意,可能需要几分钟的时间才能将端点的状态从“Creating”更改为“InService”。可以使用下面显示的方式检查当前状态。
“`response = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
response[“EndpointStatus”]“`

调用端点

关键时刻到了。现在是时候调用端点来测试一切是否按预期工作了。
“`with open(file_name, “rb”) as f:
payload = f.read()
predictor = sagemaker.predictor.Predictor(endpoint_name=endpoint_name)
predictor.serializer = DataSerializer(content_type=”image/x-image”)
predictor.predict(payload)“`
这是一个很好的检查,但请注意,predictor.predict函数不会运行包含SageMaker发布脚本中的完整调用流程:
“`output_fn(predict_fn(input_fn(input_data, model_fn(model_dir)),accept)“`
为了测试这一点,我们可以使用API调用来调用模型。
“`with open(file_name, “rb”) as f:
payload = f.read()
sagemaker_runtime = boto3.client(“runtime.sagemaker”)
response = sagemaker_runtime.invoke_endpoint(
EndpointName=endpoint_name,
ContentType=”image/x-image”,
Body=payload)
response = json.loads(response[“Body”].read().decode())“`
使用anomalib提供的出色可视化工具,我们可以为UCSDped2数据集的给定帧绘制框和标签。

作者提供的图片。该图片使用anomalib软件包根据UCSD异常检测数据集生成。绿色框表示这些行人的行走没有异常,而自行车手的红色框可能表示AI VAD模型的速度和姿势特征存在异常。

结论

好的,让我们快速总结一下我们在这里涵盖的内容。部署SageMaker模型需要一系列步骤。

首先,必须编写Sagemaker模型服务脚本来定义模型的功能和行为。

然后,将模型上传到Amazon S3进行存储和检索。此外,还需要上传自定义的Docker镜像到AWS Elastic Container Registry(ECR)以容器化模型及其依赖项。接下来的步骤涉及在SageMaker中创建一个模型,将存储在S3中的模型构件与存储在ECR中的Docker镜像关联起来。

然后创建一个端点配置,定义用于托管模型的实例数量和类型。

最后,创建一个端点以建立部署模型和客户端应用程序之间的实时连接,使它们能够调用端点并进行实时预测。

通过这些步骤,部署SageMaker模型变得简化且确保高效可靠地提供模型。

附录

Reiss等人于2023年发表的“基于属性的准确可解释视频异常检测方法”的论文提出了一种简单而高效的视频异常检测(VAD)方法,使用基于属性的表示。

该论文指出,传统的VAD方法往往依赖于深度学习,往往难以解释,导致用户难以理解系统为什么将某些帧或对象标记为异常。

为了解决这个问题,作者提出了一种方法,通过速度、姿势和深度来表示视频中的每个对象。这些属性易于理解和解释,并可用于使用基于密度的方法计算异常分数。

论文显示,这种简单的表示足以在多个具有挑战性的VAD数据集上实现最先进的性能,包括ShanghaiTech,这是最大且最复杂的VAD数据集。

除了准确性之外,作者还表明他们的方法是可解释的。例如,他们可以向用户提供视频中对异常分数做出最大贡献的对象列表,以及它们的速度、姿势和深度信息。这可以帮助用户理解系统为何将视频标记为异常。

总的来说,本论文对VAD领域是一项重要的贡献。它提出了一种简单、准确、可解释的VAD方法,可以应用于各种应用。

Leave a Reply

Your email address will not be published. Required fields are marked *