Press "Enter" to skip to content

掌握多功能数据工具包(VDK)的批量数据处理

数据管理

使用VDK执行批量数据处理的教程

Mika Baumeister在Unsplash上的照片

Versatile Data Kit (VDK)是一个开源的数据导入和处理框架,旨在简化数据管理的复杂性。虽然VDK可以处理各种数据集成任务,包括实时流处理,本文将重点介绍如何在批量数据处理中使用VDK。

本文内容包括:

  • 介绍批量数据处理
  • 在VDK中创建和管理批量处理流水线
  • 监控VDK中的批量数据处理

1 介绍批量数据处理

批量数据处理是一种在指定间隔内处理大量数据的方法。批量数据必须满足以下要求:

  • 与时间无关:数据不需要立即处理,通常对实时要求不敏感。与需要即时处理的流数据不同,批量数据可以在预定间隔或资源可用时进行处理。
  • 可分割成块:批量数据可以被分割成更小、更易管理的段,而不是在单个资源密集型操作中处理整个数据集。这些段可以顺序或并行处理,具体取决于数据处理系统的能力。

此外,批量数据可以离线处理,这意味着它不需要与数据源或外部服务保持持续连接。当数据源可能间歇性或暂时不可用时,这一特性非常有价值。

ELT(提取、加载、转换)是批量数据处理的典型用例。ELT由三个主要阶段组成:

  • 提取(E):从多个不同格式的源中提取数据,包括结构化和非结构化数据。
  • 加载(L):将数据加载到目标位置,例如数据仓库。
  • 转换(T):提取的数据通常需要预处理,例如清洗、统一化和转换为通用格式。

现在您已经了解了批量数据处理是什么,让我们继续下一步:在VDK中创建和管理批量处理流水线。

2 在VDK中创建和管理批量处理流水线

VDK采用基于组件的方法,使您能够快速构建数据处理流水线。关于VDK的介绍,请参考我的之前的文章Versatile Data Kit概述。本文假设您已经在计算机上安装了VDK。

为了解释在VDK中批量处理流水线的工作原理,我们考虑一个您必须执行ELT任务的场景。

假设您想要在VDK中导入和处理文森特·梵高(Vincent Van Gogh)在Europeana上可用的绘画作品,Europeana是一个知名的欧洲文化遗产聚合器。Europeana通过其公共REST API提供所有文化遗产对象。关于文森特·梵高,Europeana提供了700多件作品。

以下图示展示了在此场景中进行批量数据处理的步骤。

作者提供的图片

让我们逐个点进行说明。您可以在VDK GitHub仓库中找到实现此场景的完整代码。

2.1 提取和加载

该阶段包括VDK作业调用Europeana REST API来提取原始数据。具体而言,它定义了三个作业:

  • job1 — 删除现有表(如果有)
  • job2 — 创建新表
  • job3 — 直接从REST API中摄取表值。

此示例需要一个有效的互联网连接才能正确工作,以访问Europeana REST API。该操作是一个批处理过程,因为它只下载数据一次,不需要流化。

我们将提取的数据存储在一个表中。这个任务的困难在于在job3中建立REST API之间的映射。

编写job3只需编写执行这个映射的Python代码,但不将提取的文件保存到本地文件,而是调用VDK函数(job_input.send_tabular_data_for_ingestion)将文件保存到VDK,如下面的代码片段所示:

import inspectimport loggingimport osimport pandas as pdimport requestsfrom vdk.api.job_input import IJobInputdef run(job_input: IJobInput):    """    Download datasets required by the scenario and put them in the data lake.    """    log.info(f"Starting job step {__name__}")    api_key = job_input.get_property("api_key")    start = 1    rows = 100    basic_url = f"https://api.europeana.eu/record/v2/search.json?wskey={api_key}&query=who:%22Vincent%20Van%20Gogh%22"    url = f"{basic_url}&rows={rows}&start={start}"    response = requests.get(url)    response.raise_for_status()    payload = response.json()    n_items = int(payload["totalResults"])    while start < n_items:        if start > n_items - rows:            rows = n_items - start + 1        url = f"{basic_url}&rows={rows}&start={start}"        response = requests.get(url)        response.raise_for_status()        payload = response.json()["items"]        df = pd.DataFrame(payload)        job_input.send_tabular_data_for_ingestion(            df.itertuples(index=False),            destination_table="assets",            column_names=df.columns.tolist(),        )        start = start + rows

完整代码请参考GitHub上的示例。请注意,您需要一个免费的API密钥才能从Europeana下载数据。

在提取阶段产生的输出是一个包含原始值的表。

2.2 转换

这个阶段涉及数据清洗和提取只有相关信息。我们可以通过两个作业在VDK中实现相关的作业:

  • job4 — 删除现有表(如果有)
  • job5 — 创建清洗表。

Job5仅需编写一个SQL查询,如下面的代码片段所示:

CREATE TABLE cleaned_assets AS (    SELECT        SUBSTRING(country, 3, LENGTH(country)-4) AS country,        SUBSTRING(edmPreview, 3, LENGTH(edmPreview)-4) AS edmPreview,        SUBSTRING(provider, 3, LENGTH(provider)-4) AS provider,        SUBSTRING(title, 3, LENGTH(title)-4) AS title,        SUBSTRING(rights, 3, LENGTH(rights)-4) AS rights    FROM assets)

在VDK中运行此作业将产生另一个名为cleaned_asset的表,其中包含处理过的值。最后,我们可以在某个地方使用经过清洗的数据。在我们的情况下,我们可以构建一个显示提取的绘画作品的 Web 应用程序。您可以在VDK GitHub存储库中找到执行此任务的完整代码。

3 在VDK中监控批处理数据处理

VDK提供了VDK UI,一个用于监控数据作业的图形用户界面。要安装VDK UI,请参考官方VDK视频此链接。下图显示了VDK UI的快照。

图片由作者提供

这里有两个主要页面:

  • 探索:这个页面可以让您探索数据工作,比如作业执行成功率、过去24小时执行失败的作业以及过去24小时内最多执行失败的作业。
  • 管理:这个页面提供更多作业详情。您可以通过列来排序作业,搜索多个参数,按照某些列进行过滤,查看特定作业的源代码,添加其他列等等。

观看以下官方VDK视频,了解如何使用VDK UI。

总结

恭喜!您刚刚学会了如何在VDK中实现批量数据处理!它只需要摄入原始数据,进行处理,最后可以根据自己的目的使用它!您可以在VDK GitHub仓库中找到许多其他示例。

保持与< a href=”https://github.com/vmware/versatile-data-kit/tree/main”>VDK最新的数据处理发展和最佳实践保持同步。继续探索和完善您的专业知识!

Versatile Data Kit概览

入门Versatile Data Kit,一个可以使数据工程师工作更有效的框架

towardsdatascience.com

处理Versatile Data Kit中的缺失值

使用VDK处理缺失值的数据流程构建教程

towardsdatascience.com

从原始数据到清洁的数据库:深入了解Versatile Data Kit

使用Versatile Data Kit(VMware最近发布的框架)和Trino DB的完整示例

towardsdatascience.com

Leave a Reply

Your email address will not be published. Required fields are marked *