Press "Enter" to skip to content

在8分钟内使用Hamilton简化Airflow DAG的创建和维护

如何使用Hamilton编写更易维护的Airflow DAGs

Airflow和Hamilton的抽象关系图。Airflow帮助整合所有内容,而Hamilton帮助管理内部细节。图片来源:Pixabay。

本文与Thierry Jean合作撰写,最初发表于此处。

本文介绍了两个开源项目Hamilton和Airflow以及它们的有向无环图(DAGs)如何协同工作的好处。从高层次来看,Airflow负责编排(宏观),而Hamilton帮助编写清晰且易于维护的数据转换代码(微观)。

对于不熟悉Hamilton的人,我们推荐您到tryhamilton.dev上的交互式概述页面,或者参考我们的其他文章,例如这篇文章。否则,我们将在高层次上介绍Hamilton,并提供更多详细的参考文档。值得一提的是,我是Hamilton的共同创作者之一。

对于那些仍然在努力理解这两者如何共同运行的人来说,你可以在Airflow中运行Hamilton的原因是Hamilton只是一个具有较小依赖关系的库,因此可以在Airflow设置中轻松添加Hamilton!

简要回顾一下,Airflow是编排数据流水线的行业标准。它支持各种数据项目,包括ETL、机器学习流水线和商业智能。自2014年成立以来,Airflow用户在编写和维护数据流水线方面遇到了一些问题:

  1. 如何可维护地管理工作流的演变;简单的工作流程往往变得复杂。
  2. 如何编写模块化、可重用和可测试的代码,以在Airflow任务中运行。
  3. 如何跟踪Airflow DAG生成的代码和数据产物的血缘关系。

这就是我们认为Hamilton可以帮助的地方!Hamilton是一个用于编写数据转换的Python微框架。简而言之,通过“声明式”方式编写Python函数,Hamilton会根据名称、参数和类型注解解析并连接它们,形成一个图。可以请求特定的输出,Hamilton将执行所需的函数路径以生成它们。因为Hamilton不提供宏观编排能力,所以它与Airflow搭配使用,帮助数据专业人士编写更干净、更可重用的Airflow DAG代码。

Hamilton范式的示例图。此示例展示了如何将过程化的pandas代码映射到定义DAG的Hamilton函数中。注意:Hamilton可用于任何Python对象类型,不仅限于Pandas。作者:作者。

编写易维护的Airflow DAGs

Airflow常用于辅助机器学习/数据科学工作。在生产环境中运行此类工作负载通常需要复杂的工作流程。在Airflow中,必须做出的一个设计决策是如何将工作流程分解为Airflow任务。如果任务过多,会增加调度和执行开销(例如移动大量数据);如果任务过少,会存在运行时间较长的整体任务,但可能更高效。在Airflow DAG复杂性与每个任务内代码复杂性之间存在权衡。这使得调试和理解工作流程更加困难,尤其是如果您不是最初编写Airflow DAG的人。往往情况下,Airflow DAG的初始任务结构变得固定不变,因为重构任务代码变得很困难!

虽然像A->B->C这样的简单DAG是理想的,但结构的简单性与每个任务的代码量之间存在固有的紧张关系。每个任务的代码越多,越难以识别失败点,这是以可能的计算效率为代价的,但在失败的情况下,重试的成本会随着任务的“大小”增加。

Airflow DAG结构选择:有多少个任务?每个任务有多少代码?作者提供的图片。

相反,如果您能够在Airflow任务中同时处理复杂性,而不管其中的代码量有多大,并且能够以最小的努力轻松更改Airflow DAG的形状,那将会怎样?这就是Hamilton的用武之地。

使用Hamilton,您可以将每个Airflow任务中的代码替换为Hamilton DAG,其中Hamilton处理任务中代码的“微”编排。注意:Hamilton实际上使您能够逻辑地模拟您希望Airflow DAG执行的所有操作。下面将详细介绍。

要使用Hamilton,您需要加载一个包含Hamilton函数的Python模块,实例化一个Hamilton Driver,并在几行代码中在Airflow任务中执行Hamilton DAG。通过使用Hamilton,您可以以任意粒度编写数据转换,从而更详细地检查每个Airflow任务的操作。

具体代码的工作原理如下:

  1. 导入您的函数模块
  2. 将它们传递给Hamilton driver以构建DAG。
  3. 然后,使用您定义的DAG调用Driver.execute()以执行所需的输出。

让我们看一些代码,创建一个单节点Airflow DAG,但使用Hamilton来训练和评估机器学习模型:

现在,我们没有在这里展示Hamilton代码,但这种方法的好处是:

  1. 单元和集成测试。通过命名和类型注释要求,Hamilton推动开发人员编写模块化的Python代码。这导致Python模块非常适合进行单元测试。一旦您的Python代码通过了单元测试,您可以开发集成测试以确保它在Airflow任务中正常工作。相比之下,在Airflow任务中测试代码不太容易,特别是在CI/CD环境中,因为它需要访问Airflow环境。
  2. 重用数据转换。这种方法将数据转换代码保存在Python模块中,与Airflow DAG文件分开。这意味着这段代码也可以在Airflow之外运行!如果您来自分析领域,这应该感觉类似于在外部的.sql文件中开发和测试SQL查询,然后将其加载到Airflow的Postgres操作符中。
  3. 轻松重新组织Airflow DAG。现在,更改Airflow DAG所需的工作量大大降低。如果您在Hamilton中逻辑地建模了所有内容,例如端到端的机器学习流水线,只需确定每个Airflow任务中需要计算多少个Hamilton DAG的部分即可。例如,您可以将任务数量从一个庞大的Airflow任务更改为几个或许多个,而无需更改Hamilton DAG的任何内容!

使用Hamilton和Airflow进行迭代开发

在大多数数据科学项目中,从第一天开始编写最终系统的DAG是不可能的,因为需求会发生变化。例如,数据科学团队可能希望尝试不同的特征集来训练模型。在列表设置和最终确定之前,将特征集放在源代码中并进行版本控制可能是不可取的;最好使用配置文件。

Airflow支持默认和运行时DAG配置,并记录这些设置以使每个DAG运行可重现。然而,添加可配置的行为将需要在Airflow任务代码中添加条件语句和复杂性。这段代码可能在项目中变得过时,或者只在特定场景下有用,从而降低了DAG的可读性。

相比之下,Hamilton可以使用Airflow的运行时配置来在函数图上执行不同的数据转换。这种分层的方法可以大大增加Airflow DAG的表达能力,同时保持结构的简单性。另外,Airflow还可以根据配置动态生成新的DAG,但这可能会降低可观察性,而且其中一些功能仍然处于实验阶段。

Airflow DAG运行配置的用户界面。作者提供的图片。

如果您使用交接模型工作,这种方法可以在Airflow生产系统的数据工程师和负责编写Hamilton代码的数据科学家之间实现责任分离。这种分离还可以提高数据一致性并减少代码重复。例如,可以使用单个Airflow DAG和不同的Hamilton模块重用来创建不同的模型。同样地,可以在不同的Airflow DAG中重用相同的Hamilton数据转换来支持仪表盘、API、应用程序等。

下面是两张图片。第一张是高级的Airflow DAG,包含两个节点。第二张显示了导入到Airflow任务train_and_evaluate_model中的Python模块evaluate_model的低级Hamilton DAG。

1. Airflow UI: 缺勤的Airflow DAG
2. Hamilton驱动可视化: evaluate_model.py的函数图

处理数据工件

数据科学项目会产生大量的数据工件,包括数据集、性能评估、图表、训练模型等。在项目生命周期中(数据探索、模型优化、生产调试等),所需的工件会发生变化。对于Airflow来说,这是一个问题,因为从DAG中删除任务将删除其元数据历史记录并破坏工件的血统关系。在某些情况下,产生不必要或冗余的数据工件可能会导致重大的计算和存储成本。

Hamilton可以通过其数据保存器API为数据工件生成提供所需的灵活性。通过使用@save_to.*修饰的函数可以添加存储其输出的功能,只需通过Driver.execute()请求此功能即可。在下面的代码中,调用validation_predictions_table将返回表格,而调用save_validation_predictionsoutput_name_值将返回表格并将其保存为.csv文件。

这种灵活性使用户能够轻松切换生成的工件,而且可以直接通过Airflow运行时配置进行操作,而无需编辑Airflow DAG或Hamilton模块。

此外,细粒度的Hamilton函数图允许进行精确的数据血统和溯源。实用函数what_is_downstream_of()what_is_upstream_of()帮助可视化和以编程方式探索数据依赖关系。我们建议对此感兴趣的读者查阅更多详细信息。

结束语和一个示例以入门

希望到现在为止我们已经向您展示了将Hamilton与Airflow结合使用可以帮助您解决Airflow DAG创建和可维护性方面的挑战。由于这是一篇简短的文章,为了总结,让我们转向Hamilton存储库中为您提供的代码。

为了帮助您快速上手,我们提供了一个使用Hamilton与Airflow的示例。它应该涵盖您开始所需的所有基础知识。README中包含了如何使用Docker设置Airflow的说明,这样您就不需要担心安装依赖项,只需使用示例即可。

至于示例中的代码,它包含两个Airflow DAG,一个展示了使用Hamilton创建“特征”来训练模型的基本操作,另一个是更完整的机器学习项目示例,它执行完整的端到端流程,包括创建特征、拟合和评估模型。对于这两个示例,您将在插件文件夹中找到Hamilton代码。

在Airflow示例中您应该看到的内容。图片由作者提供。

如果您有问题或需要帮助,请加入我们的Slack。否则,要了解更多关于Hamilton的功能和功能,请参阅Hamilton的文档。

参考资料和进一步阅读

感谢您查看本文。如果您想深入了解,或者想了解更多关于Hamilton的信息,我们为您提供以下链接供您浏览!

  • Hamilton + Airflow 代码示例
  • Hamilton 文档
  • tryhamilton.dev — 一个交互式学习更多关于Hamilton的方式
  • 如果您想了解与Hamilton集成的另一个编排系统,您可以查看 Hamilton + Metaflow。
  • Hamilton Slack 社区
  • 10分钟内了解Lineage + Hamilton
  • 介绍Hamilton(背景故事和介绍)
  • 5分钟内了解Hamilton + Pandas
  • 如何在Notebook环境中使用Hamilton
Leave a Reply

Your email address will not be published. Required fields are marked *