数据管理
使用VDK执行批量数据处理的教程
Versatile Data Kit (VDK)是一个开源的数据导入和处理框架,旨在简化数据管理的复杂性。虽然VDK可以处理各种数据集成任务,包括实时流处理,本文将重点介绍如何在批量数据处理中使用VDK。
本文内容包括:
- 介绍批量数据处理
- 在VDK中创建和管理批量处理流水线
- 监控VDK中的批量数据处理
1 介绍批量数据处理
批量数据处理是一种在指定间隔内处理大量数据的方法。批量数据必须满足以下要求:
- 与时间无关:数据不需要立即处理,通常对实时要求不敏感。与需要即时处理的流数据不同,批量数据可以在预定间隔或资源可用时进行处理。
- 可分割成块:批量数据可以被分割成更小、更易管理的段,而不是在单个资源密集型操作中处理整个数据集。这些段可以顺序或并行处理,具体取决于数据处理系统的能力。
此外,批量数据可以离线处理,这意味着它不需要与数据源或外部服务保持持续连接。当数据源可能间歇性或暂时不可用时,这一特性非常有价值。
ELT(提取、加载、转换)是批量数据处理的典型用例。ELT由三个主要阶段组成:
- 提取(E):从多个不同格式的源中提取数据,包括结构化和非结构化数据。
- 加载(L):将数据加载到目标位置,例如数据仓库。
- 转换(T):提取的数据通常需要预处理,例如清洗、统一化和转换为通用格式。
现在您已经了解了批量数据处理是什么,让我们继续下一步:在VDK中创建和管理批量处理流水线。
2 在VDK中创建和管理批量处理流水线
VDK采用基于组件的方法,使您能够快速构建数据处理流水线。关于VDK的介绍,请参考我的之前的文章Versatile Data Kit概述。本文假设您已经在计算机上安装了VDK。
为了解释在VDK中批量处理流水线的工作原理,我们考虑一个您必须执行ELT任务的场景。
假设您想要在VDK中导入和处理文森特·梵高(Vincent Van Gogh)在Europeana上可用的绘画作品,Europeana是一个知名的欧洲文化遗产聚合器。Europeana通过其公共REST API提供所有文化遗产对象。关于文森特·梵高,Europeana提供了700多件作品。
以下图示展示了在此场景中进行批量数据处理的步骤。

让我们逐个点进行说明。您可以在VDK GitHub仓库中找到实现此场景的完整代码。
2.1 提取和加载
该阶段包括VDK作业调用Europeana REST API来提取原始数据。具体而言,它定义了三个作业:
- job1 — 删除现有表(如果有)
- job2 — 创建新表
- job3 — 直接从REST API中摄取表值。
此示例需要一个有效的互联网连接才能正确工作,以访问Europeana REST API。该操作是一个批处理过程,因为它只下载数据一次,不需要流化。
我们将提取的数据存储在一个表中。这个任务的困难在于在job3中建立REST API之间的映射。
编写job3只需编写执行这个映射的Python代码,但不将提取的文件保存到本地文件,而是调用VDK函数(job_input.send_tabular_data_for_ingestion
)将文件保存到VDK,如下面的代码片段所示:
import inspectimport loggingimport osimport pandas as pdimport requestsfrom vdk.api.job_input import IJobInputdef run(job_input: IJobInput): """ Download datasets required by the scenario and put them in the data lake. """ log.info(f"Starting job step {__name__}") api_key = job_input.get_property("api_key") start = 1 rows = 100 basic_url = f"https://api.europeana.eu/record/v2/search.json?wskey={api_key}&query=who:%22Vincent%20Van%20Gogh%22" url = f"{basic_url}&rows={rows}&start={start}" response = requests.get(url) response.raise_for_status() payload = response.json() n_items = int(payload["totalResults"]) while start < n_items: if start > n_items - rows: rows = n_items - start + 1 url = f"{basic_url}&rows={rows}&start={start}" response = requests.get(url) response.raise_for_status() payload = response.json()["items"] df = pd.DataFrame(payload) job_input.send_tabular_data_for_ingestion( df.itertuples(index=False), destination_table="assets", column_names=df.columns.tolist(), ) start = start + rows
完整代码请参考GitHub上的示例。请注意,您需要一个免费的API密钥才能从Europeana下载数据。
在提取阶段产生的输出是一个包含原始值的表。
2.2 转换
这个阶段涉及数据清洗和提取只有相关信息。我们可以通过两个作业在VDK中实现相关的作业:
- job4 — 删除现有表(如果有)
- job5 — 创建清洗表。
Job5仅需编写一个SQL查询,如下面的代码片段所示:
CREATE TABLE cleaned_assets AS ( SELECT SUBSTRING(country, 3, LENGTH(country)-4) AS country, SUBSTRING(edmPreview, 3, LENGTH(edmPreview)-4) AS edmPreview, SUBSTRING(provider, 3, LENGTH(provider)-4) AS provider, SUBSTRING(title, 3, LENGTH(title)-4) AS title, SUBSTRING(rights, 3, LENGTH(rights)-4) AS rights FROM assets)
在VDK中运行此作业将产生另一个名为cleaned_asset
的表,其中包含处理过的值。最后,我们可以在某个地方使用经过清洗的数据。在我们的情况下,我们可以构建一个显示提取的绘画作品的 Web 应用程序。您可以在VDK GitHub存储库中找到执行此任务的完整代码。
3 在VDK中监控批处理数据处理
VDK提供了VDK UI,一个用于监控数据作业的图形用户界面。要安装VDK UI,请参考官方VDK视频此链接。下图显示了VDK UI的快照。

这里有两个主要页面:
- 探索:这个页面可以让您探索数据工作,比如作业执行成功率、过去24小时执行失败的作业以及过去24小时内最多执行失败的作业。
- 管理:这个页面提供更多作业详情。您可以通过列来排序作业,搜索多个参数,按照某些列进行过滤,查看特定作业的源代码,添加其他列等等。
观看以下官方VDK视频,了解如何使用VDK UI。
总结
恭喜!您刚刚学会了如何在VDK中实现批量数据处理!它只需要摄入原始数据,进行处理,最后可以根据自己的目的使用它!您可以在VDK GitHub仓库中找到许多其他示例。
保持与< a href=”https://github.com/vmware/versatile-data-kit/tree/main”>VDK最新的数据处理发展和最佳实践保持同步。继续探索和完善您的专业知识!
您可能感兴趣的其他文章…
Versatile Data Kit概览
入门Versatile Data Kit,一个可以使数据工程师工作更有效的框架
towardsdatascience.com
处理Versatile Data Kit中的缺失值
使用VDK处理缺失值的数据流程构建教程
towardsdatascience.com
从原始数据到清洁的数据库:深入了解Versatile Data Kit
使用Versatile Data Kit(VMware最近发布的框架)和Trino DB的完整示例
towardsdatascience.com