Press "Enter" to skip to content

使用Mage在数据流水线中实现行为驱动开发

最大化数据流水线的质量和生产力

Unsplash上的Nick Fewings的照片

在我之前的文章中,我讨论了数据流水线中测试的重要性以及如何分别创建数据测试和单元测试。尽管测试起着至关重要的作用,但它可能并不总是开发周期中最令人兴奋的部分。因此,许多现代数据堆栈引入了框架或插件以加速数据测试的实施。此外,Python中的单元测试框架(如Pytest、unittest)早已存在,并帮助工程师高效地为数据流水线和任何Python应用程序创建单元测试。

在本文中,我想介绍一个使用两种现代技术的设置:行为驱动开发(BDD)-一种面向业务的测试框架,以及Mage-一种现代数据流水线工具。通过结合这两种技术,目标是在具有无缝开发者体验的同时为数据流水线创建高质量的单元测试。

什么是行为驱动开发(BDD)?

在构建业务数据流水线时,很可能会遇到复杂而棘手的业务逻辑。一个例子是根据年龄、收入和过往购买情况的组合定义客户细分。以下示例仅表示业务逻辑可能涉及的一小部分复杂性。随着每个属性中的属性数量和细粒度增加,它可能变得越来越复杂。想想你日常工作中的一个例子!

1. 年龄在19到60之间且过去购买金额高的人是“高级”。2. 年龄在19到60之间且收入高的人是“高级”。3. 年龄超过60且收入高且过去购买金额高的人是“高级”。4. 其他人是“基本”。

所以问题是业务规则应该在哪里进行文档记录,以及如何确保文档与代码之间的同步。一种常见的方法是在代码旁边包含注释,或者努力编写自说明和易于理解的代码。但仍然存在着注释过时或难以理解的挑战性代码的风险。

最终,我们要寻找的是一种“文档即代码”的解决方案,可以使工程师和业务利益相关者都受益,而这正是BDD可以提供的。如果您熟悉“数据合同”的概念,BDD可以被视为一种数据合同形式,但侧重于利益相关者而不是数据源。 这对于具有复杂业务逻辑的数据流水线特别有益,它有助于防止关于“功能还是错误”的争论。

BDD本质上是一种软件开发方法,强调利益相关者和开发人员之间的协作和沟通,以确保软件达到期望的业务结果。行为在展示预期输入和输出的场景中进行描述。每个场景都以特定的“给定-当-然后”格式编写,其中每个步骤描述了特定的条件或操作。

让我们看看客户细分示例的场景可能是什么样子。由于特性文件是用英语编写的,因此业务利益相关者可以很好地理解它,甚至可以为其做出贡献。它的工作方式类似于利益相关者和工程师之间的合同,其中工程师负责准确实现需求,利益相关者则需要提供所有必要的信息。

在利益相关者和工程师之间建立明确的合同有助于正确分类数据问题,区分因实现错误而导致的“软件错误”和因缺少需求而导致的“功能请求”。

特性文件(由作者创建)

下一步是从特性生成测试代码,这就是连接发生的地方。Pytest代码充当文档和实施代码之间的桥梁。当它们之间存在任何不一致时,测试将失败,突显了文档和实施之间的同步需求。

测试代码充当桥梁(由作者创建)

以下是测试代码的样子。为了使示例简短,我只实现了第一个场景的测试代码。给定步骤设置了场景的初始上下文,本例中从示例中获取了客户年龄、收入和过去购买数据。当步骤触发了被测试的行为,即get_user_segment函数。然后步骤中,我们将来自当步骤的结果与场景示例中的预期输出进行比较。

测试代码的第一个场景在特性文件中(作者创建)

想象一下,在第一个场景中,年龄范围发生了变化,添加了一个62岁的例子,但没有更新代码。在这种情况下,测试会立即失败,因为代码存在冲突的预期。

Mage是什么?

到目前为止,我们已经看到了BDD的潜力,并学习了如何使用Python来实现它。现在,是时候将BDD纳入我们的数据流水线中了。当涉及到数据编排时,Airflow作为第一个基于Python的编排器并具有Web界面的工具已成为执行数据流水线的最常用工具。

但是,它肯定不是完美的。例如,在生产环境之外测试流水线,特别是在使用KubernetesOperator等运算符时,可能会具有挑战性。此外,DAG文件可能会混杂着样板代码和复杂的配置,这使得很难区分每个任务的目的,无论是摄取、转换还是导出。此外,Airflow并不专注于成为一个数据驱动的编排工具,因为它更关心任务的成功执行,而不是最终数据资产的质量。

随着数据工程社区的不断发展,出现了许多填补Airflow中的空白的替代工具。Mage是其中一个不断发展的数据流水线工具,被视为Airflow的现代替代品。它的四个设计理念使Mage与Airflow区别开来,我们可以从开发周期的一开始就感受到这种差异。

Mage的设计原则(作者创建)

Mage具有非常直观的用户界面,使工程师能够轻松高效地编辑和测试流水线。

每个流水线由多种类型的块组成:@data_loader、@transformer、@data_exporter等,具有明确的目的。这是我最喜欢的功能之一,因为我可以立即理解每个任务的目标,而不是陷入样板代码的困境。

Mage用户界面(作者创建)

BDD + Mage

一个常规的数据流水线有三个主要步骤:摄取、转换和导出。转换是实现所有复杂业务逻辑的地方,往往会有多个转换步骤。

清晰地将摄取任务和转换任务分开使得将BDD应用于转换逻辑变得非常简单直观。事实上,它感觉就像测试一个普通的Python函数一样,忽略了它是数据流水线的一部分。

让我们回到用户分段示例。业务规则应该位于@transformer块中,与加载器和导出器解耦。

数据流水线中的@transformer块(作者创建)

只要加载器返回一个pandas数据帧,相同的@transformer块可以插入到多个流水线中。要运行测试,我们只需要在终端或CI/CD流水线中运行pytest命令。触发器等流水线配置在单独的文件中,使得主要的流水线文件尽可能简洁。

让我们想象一下,如果我们在Airflow中实现这个功能会发生什么。由于这不是一个复杂的例子,Airflow肯定可以很好地处理它。但是有一些细节让我在从Mage切换到Airflow时感到“嗯嗯”。

  1. DAG文件变得混乱,因为每个DAG都有一个大的代码块来定义其元数据。在Mage中,配置被移动到一个yaml文件中,因此流水线文件保持简洁。
@dag(    dag_id="user_segment",    schedule_interval="0 0 * * *",    start_date=pendulum.datetime(2023, 1, 1, tz="UTC"),    catchup=False,    dagrun_timeout=datetime.timedelta(minutes=60),)

2. Airflow中的数据传递需要小心处理。在Airflow中,使用XCOM来在任务之间传递数据。然而,不建议直接通过XCOM传递大型数据集,例如数据框。为了解决这个问题,我们需要首先将数据持久化存储,这似乎是一种不必要的工程努力。而Mage可以自然地处理数据传递,我们不需要担心数据集的大小。

3. 从技术上讲,Airflow支持多个版本的Python包,但代价很大。KubernetesPodOperator和PythonVirtualenvOperator允许您在隔离的环境中运行任务。但您将失去Airflow提供的所有便利性,比如使用其他操作符。相比之下,Mage通过使用一个集中的requirements.txt来解决了这个挑战,确保所有任务都可以访问Mage的所有原生功能。

结论

在本文中,我将两种技术结合在一起,旨在提高测试质量和开发者体验。BDD旨在通过在代码库中直接嵌入特性文件的方式,创建一种合同,以增强利益相关者和工程师之间的协作。另一方面,Mage是一个优秀的数据流水线工具,将开发者体验作为他们的首要任务,并真正将数据视为一等公民。

我希望这篇文章能给你带来启发,并激励你在日常工作中探索和融入至少一种技术。正确选择工具肯定可以提升团队的生产力。我很想知道你的想法,请在评论中告诉我。谢谢!

Leave a Reply

Your email address will not be published. Required fields are marked *