Amazon Redshift 是最受欢迎的云数据仓库,每天被数以万计的客户用于分析以 exabytes 为单位的数据。许多从业人员正在使用 Amazon SageMaker 进行机器学习(ML),以离线代码方式或低代码/无代码方式扩展这些 Redshift 数据集,并将特征数据存储在 Amazon Redshift 中,在生产环境中实现大规模处理。
在本文中,我们将向您展示在 SageMaker 中以大规模准备 Redshift 源数据的三个选项,包括从 Amazon Redshift 加载数据、执行特征工程,并将特征导入 Amazon SageMaker 特征存储:
- 选项 A – 在 Amazon SageMaker Studio 上使用 AWS Glue 交互会话(在开发环境中)和 AWS Glue 作业(在生产环境中)与 Spark
- 选项 B – 使用 Amazon SageMaker Processing 作业与 Redshift 数据集定义,或使用 SageMaker 特征处理在 SageMaker 特征存储中运行 SageMaker 训练作业
- 选项 C – 以低代码/无代码方式使用 Amazon SageMaker 数据整理器
如果您是 AWS Glue 用户并且希望以交互方式执行该过程,请考虑选项 A。如果您熟悉 SageMaker 并且能编写 Spark 代码,则可以选择选项 B。如果您想以低代码/无代码方式执行该过程,可以按照选项 C 进行操作。
Amazon Redshift 使用 SQL 分析结构化和半结构化数据,跨数据仓库、操作性数据库和数据湖,使用 AWS 设计的硬件和 ML 提供任何规模的最佳性价比。
SageMaker Studio 是第一个完全集成的机器学习开发环境(IDE)。它提供一个单一的基于 Web 的可视界面,您可以在其中执行所有的机器学习开发步骤,包括准备数据、构建、训练和部署模型。
AWS Glue 是一个无服务器数据集成服务,可以轻松发现、准备和组合数据,用于分析、机器学习和应用程序开发。AWS Glue 可以通过各种功能(包括内置转换)无缝地收集、转换、清洗和准备数据,以便存储在数据湖和数据管道中。
解决方案概述
以下图表说明了每个选项的解决方案架构。
先决条件
要继续使用本文中的示例,您需要创建所需的 AWS 资源。为此,我们提供了一个 AWS CloudFormation 模板来创建包含这些资源的堆栈。当您创建堆栈时,AWS 会在您的账户中创建一些资源:
- 一个 SageMaker 域,其中包括一个关联的 Amazon Elastic File System(Amazon EFS)卷
- 一组授权用户和各种安全、应用程序、策略和 Amazon Virtual Private Cloud(Amazon VPC)配置
- 一个 Redshift 集群
- 一个 Redshift 密钥
- 用于 Amazon Redshift 的 AWS Glue 连接
- 一个 AWS Lambda 函数,用于设置所需的资源、执行角色和策略
请确保您在运行 CloudFormation 模板的区域中没有已经存在两个 SageMaker Studio 域。这是每个受支持区域中允许的最大域数。
部署 CloudFormation 模板
执行以下步骤来部署 CloudFormation 模板:
- 将 CloudFormation 模板 sm-redshift-demo-vpc-cfn-v1.yaml 保存在本地。
- 在 AWS CloudFormation 控制台上,选择 创建堆栈。
- 对于 准备模板,选择 模板已准备好。
- 对于 模板来源,选择 上传模板文件。
- 选择 选择文件,然后导航到计算机上下载 CloudFormation 模板的位置,选择该文件。
- 输入堆栈名称,例如
Demo-Redshift
。 - 在 配置堆栈选项 页面上,保留所有内容默认值,并选择 下一步。
- 在 审核 页面上,选中 我确认 AWS CloudFormation 可能会创建带有自定义名称的 IAM 资源,然后选择 创建堆栈。
您应该可以看到一个名为Demo-Redshift
的新 CloudFormation 堆栈正在创建。请等待堆栈的状态为CREATE_COMPLETE(大约 7 分钟)后再继续。您可以导航到堆栈的资源选项卡来检查创建了哪些 AWS 资源。
启动 SageMaker Studio
完成以下步骤以启动 SageMaker Studio 域:
- 在 SageMaker 控制台中,选择导航窗格中的域。
- 选择您作为 CloudFormation 堆栈的一部分创建的域(
SageMakerDemoDomain
)。 - 选择启动和Studio。
当您首次访问 SageMaker Studio 时,该页面可能需要 1-2 分钟的加载时间,之后您将被重定向到一个主页选项卡。
下载 GitHub 存储库
完成以下步骤以下载 GitHub 存储库:
- 在 SageMaker 笔记本中,选择文件菜单,然后选择新建和终端。
- 在终端中,输入以下命令:
git clone https://github.com/aws-samples/amazon-sagemaker-featurestore-redshift-integration.git
您现在可以在 SageMaker Studio 的导航窗格中看到amazon-sagemaker-featurestore-redshift-integration
文件夹。
使用 Spark 连接器设置批量摄取
完成以下步骤以设置批量摄取:
- 在 SageMaker Studio 中,打开
amazon-sagemaker-featurestore-redshift-integration
文件夹下的笔记本 1-uploadJar.ipynb。 - 如果提示选择内核,请选择数据科学作为镜像和Python 3作为内核,然后选择选择。
- 对于以下的笔记本,请选择相同的镜像和内核,除了 AWS Glue 交互会话笔记本 (4a)。
- 通过按下Shift+Enter在每个单元格中运行代码。
当代码运行时,方括号之间会出现一个星号(*)。当代码运行完成时,星号将被数字替换。这个操作对于所有其他笔记本也有效。
设置模式并将数据加载到 Amazon Redshift
下一步是设置模式并从 Amazon Simple Storage Service (Amazon S3) 加载数据到 Amazon Redshift。要执行此操作,请运行笔记本 2-loadredshiftdata.ipynb。
在 SageMaker 特征商店中创建特征存储
要创建您的特征存储,请运行笔记本 3-createFeatureStore.ipynb。
执行特征工程并将特征摄入 SageMaker 特征商店
在本节中,我们介绍了三种选项的步骤,以执行特征工程并将处理后的特征摄入 SageMaker 特征商店。
选项 A:使用带有无服务器 AWS Glue 交互会话的 SageMaker Studio
完成以下步骤以进行选项 A:
- 在 SageMaker Studio 中,打开笔记本 4a-glue-int-session.ipynb。
- 如果提示选择内核,请选择SparkAnalytics 2.0作为镜像和Glue Python [PySpark and Ray]作为内核,然后选择选择。
环境准备过程可能需要一些时间才能完成。
选项 B:使用带有 Spark 的 SageMaker 处理作业
在此选项中,我们使用一个带有 Spark 脚本的 SageMaker 处理作业,从 Amazon Redshift 加载原始数据集,进行特征工程,并将数据导入 SageMaker 特征存储。要执行此操作,请在 SageMaker Studio 环境中打开笔记本 4b-processing-rs-to-fs.ipynb。
在这里,我们使用 RedshiftDatasetDefinition
从 Redshift 集群检索数据集。 RedshiftDatasetDefinition
是处理作业的一种输入类型,它提供了一个简单的接口,供实践者配置与 Redshift 连接相关的参数,如标识符、数据库、表、查询字符串等。您可以使用 RedshiftDatasetDefinition
轻松建立 Redshift 连接,而无需全天候维护连接。我们还在处理作业中使用 SageMaker 特征存储 Spark 连接器库来连接到分布式环境中的 SageMaker 特征存储。使用此 Spark 连接器,您可以轻松地将数据从 Spark DataFrame 吞入特征组的在线和离线存储。此外,此连接器包含自动加载特征定义的功能,有助于创建特征组。总之,此解决方案为您提供了一种从 Amazon Redshift 到 SageMaker 的原生 Spark 方式实现端到端数据流水线的方法。您可以在 Spark 上下文中执行任何特征工程,并将最终特征仅通过一个 Spark 项目吞入 SageMaker 特征存储。
要使用 SageMaker 特征存储 Spark 连接器,我们需要扩展一个预构建的带有 sagemaker-feature-store-pyspark
安装的 SageMaker Spark 容器。在 Spark 脚本中,使用系统可执行命令运行 pip install
,在本地环境中安装此库,并获取 JAR 文件依赖项的本地路径。在处理作业 API 中,将此路径提供给处理作业创建的 Spark 集群节点的 submit_jars
参数。
在处理作业的 Spark 脚本中,我们首先从 Amazon S3 中读取原始数据集文件,Amazon S3 临时存储了从 Amazon Redshift 卸载的数据集作为 VoAGI。然后,我们以 Spark 方式执行特征工程,并使用 feature_store_pyspark
将数据吞入离线特征存储。
对于处理作业,我们提供一个带有 redshift_dataset_definition
的 ProcessingInput
。在此处,我们根据接口构建一个结构,提供与 Redshift 连接相关的配置。您可以使用 query_string
根据 SQL 过滤数据集并将其卸载到 Amazon S3。请参阅以下代码:
rdd_input = ProcessingInput(
input_name="redshift_dataset_definition",
app_managed=True,
dataset_definition=DatasetDefinition(
local_path="/opt/ml/processing/input/rdd",
data_distribution_type="FullyReplicated",
input_mode="File",
redshift_dataset_definition=RedshiftDatasetDefinition(
cluster_id=_cluster_id,
database=_dbname,
db_user=_username,
query_string=_query_string,
cluster_role_arn=_redshift_role_arn,
output_s3_uri=_s3_rdd_output,
output_format="PARQUET"
),
),
)
每个处理作业(包括 USER
、PLACE
和 RATING
数据集)需要等待 6-7 分钟。
有关 SageMaker 处理作业的更多详细信息,请参阅数据处理。
对于来自 Amazon Redshift 的特征处理的 SageMaker 本地解决方案,您还可以使用 SageMaker 特征存储中的特征处理,该解决方案用于包括提供计算环境、创建和维护 SageMaker 流水线以加载和吞入数据在内的基础设施。您只需关注包含转换函数的特征处理器定义、Amazon Redshift 的源和 SageMaker 特征存储的接收端。调度、作业管理和其他生产工作负载由 SageMaker 管理。特征处理器流水线是 SageMaker 流水线,因此标准的监控机制和集成都可用。
选项 C:使用 SageMaker Data Wrangler
SageMaker Data Wrangler 允许您从包括 Amazon Redshift 在内的各种数据源导入数据,以便以低代码/无代码方式准备、转换和生成特征。完成数据准备后,您可以使用 SageMaker Data Wrangler 将特征导出到 SageMaker 特征存储。
有一些 AWS Identity and Access Management(IAM)设置允许 SageMaker Data Wrangler 连接到 Amazon Redshift。首先,创建一个包含 Amazon S3 访问策略的 IAM 角色(例如,redshift-s3-dw-connect
)。对于本文,我们将 AmazonS3FullAccess
策略附加到 IAM 角色。如果您有访问指定 S3 存储桶的限制,可以在 Amazon S3 访问策略中定义。我们将 IAM 角色附加到之前创建的 Redshift 集群上。接下来,为了让 SageMaker 能够通过获取群集凭据访问 Amazon Redshift,创建一个策略,并将其附加到 SageMaker 的 IAM 角色。策略的代码如下:
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "redshift:getclustercredentials",
"Effect": "Allow",
"Resource": [
"*"
]
}
]
}
配置完成后,SageMaker Data Wrangler允许您查询Amazon Redshift并将结果输出到S3存储桶中。有关连接到Redshift集群并从Amazon Redshift导入数据到SageMaker Data Wrangler的说明,请参阅从Amazon Redshift导入数据。
SageMaker Data Wrangler提供了超过300个预构建的数据转换选项,用于常见的用例,例如删除重复行、填充缺失数据、独热编码和处理时间序列数据。您还可以使用pandas或PySpark添加自定义转换。在我们的示例中,我们对数据应用了一些转换,例如删除列、数据类型强制和序数编码。
当您的数据流程完成后,您可以将其导出到SageMaker Feature Store。此时,您需要创建一个特征组:为特征组命名,选择在线和离线存储,提供用于离线存储的S3存储桶名称,并提供具有SageMaker Feature Store访问权限的角色。最后,您可以创建一个作业,该作业会创建一个SageMaker Processing作业,从Redshift数据源中运行SageMaker Data Wrangler流程,将特征导入到您的特征组中。
以下是PLACE特征工程场景中的端到端数据流程。
使用SageMaker特征存储进行模型训练和预测
要使用SageMaker特征存储进行模型训练和预测,请打开笔记本5-classification-using-feature-groups.ipynb。
在将Redshift数据转换为特征并导入SageMaker特征存储后,这些特征可供跨多个独立ML模型和用例负责的数据科学团队进行搜索和发现。这些团队可以在建模时使用这些特征,而无需重新构建或重新运行特征工程流程。特征组可以独立管理和扩展,并且可以在上游数据源不受限制的情况下重复使用和连接。
下一步是使用从一个或多个特征组中选择的特征构建ML模型。您可以自行决定要为模型使用哪些特征组。有两种选项可以从特征组创建ML数据集,两者都使用SageMaker Python SDK:
- 使用SageMaker特征存储DatasetBuilder API – SageMaker特征存储
DatasetBuilder
API允许数据科学家从离线存储中的一个或多个特征组创建ML数据集。您可以使用该API从单个或多个特征组创建数据集,并将其输出为CSV文件或pandas DataFrame。请参阅以下示例代码:
from sagemaker.feature_store.dataset_builder import DatasetBuilder
fact_rating_dataset = DatasetBuilder(
sagemaker_session = sagemaker_session,
base = fact_rating_feature_group,
output_path = f"s3://{s3_bucket_name}/{prefix}",
record_identifier_feature_name = 'ratingid',
event_time_identifier_feature_name = 'timestamp',
).to_dataframe()[0]
- 使用FeatureGroup API中的athena_query函数运行SQL查询 – 另一种选项是使用FeatureGroup API的自动构建的AWS Glue数据目录。FeatureGroup API包括一个
Athena_query
函数,用于创建一个AthenaQuery实例以运行用户定义的SQL查询字符串。然后,您可以运行Athena查询并将查询结果组织成pandas DataFrame。此选项允许您指定更复杂的SQL查询,以从特征组中提取信息。请参阅以下示例代码:
dim_user_query = dim_user_feature_group.athena_query()
dim_user_table = dim_user_query.table_name
dim_user_query_string = (
'SELECT * FROM "'
+ dim_user_table
+ '"'
)
dim_user_query.run(
query_string = dim_user_query_string,
output_location = f"s3://{s3_bucket_name}/{prefix}",
)
dim_user_query.wait()
dim_user_dataset = dim_user_query.as_dataframe()
接下来,我们可以将来自不同特征组的查询数据合并到最终的数据集中,用于模型的训练和测试。在本文中,我们使用批量转换进行模型推理。批量转换允许您在Amazon S3上对大量数据进行模型推理,并将推理结果存储在Amazon S3中。有关模型训练和推理的详细信息,请参阅笔记本5-classification-using-feature-groups.ipynb。
在Amazon Redshift上运行联接查询以获取预测结果
最后,我们查询推理结果,并将其与Amazon Redshift中的原始用户配置文件进行联接。为此,我们使用Amazon Redshift Spectrum将Amazon S3中的批量预测结果与原始的Redshift数据进行联接。有关详细信息,请参阅笔记本run 6-read-results-in-redshift.ipynb。
清理资源
在本部分,我们提供了清理本文中创建的资源以避免持续收费的步骤。
关闭SageMaker应用
完成以下步骤来关闭您的资源:
- 在SageMaker Studio中,选择文件菜单,然后选择关闭。
- 在关闭确认对话框中,选择全部关闭以继续。
- 在收到“服务器已停止”消息后,您可以关闭此选项卡。
删除应用
完成以下步骤来删除您的应用:
- 在SageMaker控制台中,导航到域。
- 在域页面上,选择
SageMakerDemoDomain
。 - 在域详细信息页面中,在用户配置文件下,选择用户
sagemakerdemouser
。 - 在应用程序部分,对于任何活动应用程序,在操作列中选择删除应用程序。
- 确保所有应用程序的状态列都显示为已删除。
删除与您的SageMaker域关联的EFS存储卷
在SageMaker控制台上找到您的EFS卷并删除它。有关说明,请参阅SageMaker Studio中管理Amazon EFS存储卷。
删除SageMaker的默认S3存储桶
如果您不使用该区域的SageMaker,请删除默认的S3存储桶(sagemaker-<region-code>-<acct-id>
)。
删除CloudFormation堆栈
删除您的AWS账户中的CloudFormation堆栈,以清理所有相关资源。
结论
在本文中,我们演示了从Redshift数据仓库到SageMaker的端到端数据和机器学习流程。您可以轻松使用AWS原生集成的专用引擎进行无缝的数据流程。请访问AWS博客了解更多关于从现代数据仓库构建ML特征的实践。