本文介绍了 Meerschaum Compose,一种使用 YAML 定义 ETL 管道并作为数据工程框架 Meerschaum 的插件的工具。
Docker 是一个改变游戏规则的产品,它革新了我们设计、构建和运行云应用的方式。然而,开发人员很早就意识到其灵活性使得协作变得困难,因此 docker-compose
成为管理环境和多容器项目的工具选择。
类似地,ETL 框架 Meerschaum 也出现了一致性环境的问题。随着越来越多的数据工程师使用该平台构建其管道,管道的动态性意味着需要提供项目级别的隔离解决方案。
受 Docker Compose 的启发,这个解决方案以 Meerschaum Compose 的形式出现。我在工作中和个人项目中每天使用 Compose 来构建和管理我的数据管道,今天我想展示如何使用 Compose 构建 ETL 项目。
如何使用 Compose 管理引擎
伴随着巨大的力量而来的是巨大的责任,在 Docker 的情况下,这个责任由一个称为 docker-compose.yml
的清单文件处理,它描述了应用程序中的服务应如何运行。该文件作为活的文档,促进了原型设计,并描述了期望的环境给 CloudOps。通过一个简单的标准,Docker Compose 通过提供一种方便的方式来标准化和共享多服务项目的环境,填补了开发过程中的空白。
Meerschaum Compose 有着相似的作用:在 mrsm-compose.yml
文件中,您可以指定项目中可能需要的所有内容:期望的环境、插件、管道和连接器。
命令
当您遇到一个新的 Compose 项目(使用下面截图中的 Tech Slam ’N Eggs 演示项目)时,可以尝试以下命令以了解情况:
注意:如果您想在 Docker 容器中运行 Compose,请参阅此存储库。
mrsm compose run
注册管道并逐个同步它们。这对于确保执行顺序和使用数据库更新管道参数非常有用。一个常见的模式是将各个阶段链在一起,作为更大的 ETL 过程的一部分:
mrsm compose run --file mrsm-compose-00-extract.yaml && \mrsm compose run --file mrsm-compose-01-transform.yaml && \mrsm compose run --file mrsm-compose-02-load.yaml
注意:命令
mrsm compose sync pipes
是并行的,最好用于 compose 文件中的管道是独立的情况。
mrsm compose explain
解析 Compose 文件并打印定义管道的当前环境和状态。这在故障排除和理解项目结构时非常有用。
mrsm compose up --dry
注册并更新远程管道的参数。这隐含地作为 mrsm compose run
的一部分运行,并应在像 mrsm compose sync pipes
这样的标准 Meerschaum 操作之前运行。 --dry
标志可以防止同步作业运行。
注意:Compose 会为管道打上项目名称的标记。如果在项目中使用多个 compose 文件,请设置键
project_name
。
mrsm compose down -v
停止作业并删除管道。这类似于 docker compose down -v
(即 -v
表示“卷”)。
mrsm compose <action>
从项目环境中执行标准的 Meerschaum 操作(例如同步管道、删除管道、自定义操作)。我经常运行 mrsm compose python
,以在项目环境中进入 REPL。
每当您使用 mrsm compose
运行操作时,标志 --tags {project_name}
会附加(除非被覆盖),以确保您仅与项目内的管道交互。
Pipes
谦虚的管道是 Meerschaum 用于增量 ETL 的抽象。管道具有输入和输出连接器,并存储参数以配置其同步过程的行为。这可能仅仅是一个 SQL 查询,也可能包括用于插件的自定义键。
由于管道的元数据与其表存储在一起,因此它们易于编辑(无论是通过 edit pipes
还是在 Web UI 上),这有助于原型制作。但是,这种动态性引入了本文开头描述的相同问题:为了扩展开发,需要定义 Compose 文件以定义项目组件,以便轻松进行版本控制。
根据 Meerschaum Compose 规范,管道在 sync:pipes
键下的列表中定义。每个项目定义了构建管道所需的键和参数,就像蓝图一样,您可以预期数据库中的管道反映出来。
例如,以下片段将定义一个管道,该管道将从远程 PostgreSQL 数据库(在此项目中定义为 sql:source
)同步表 weather
到本地 SQLite 文件(在此项目中为 sql:dest
)。
sync: pipes: - connector: "sql:source" metric: "weather" target: "weather" columns: datetime: "timestamp" station: "station" parameters: fetch: backtrack_minutes: 1440 query: |- SELECT timestamp, station, temperature FROM weatherconfig: meerschaum: instance: "sql:dest" connectors: sql: source: "postgresql://user:pass@host:5432/db" dest: "sqlite:////tmp/dest.db"
这个示例将使用 timestamp
作为 datetime 轴进行增量更新(回溯 1 天),而此列加上 ID 列 station
一起组成用于去重的复合主键。
URI 仅以字面形式作为示例编写;如果要提交 Compose 文件,请引用环境变量(例如
$SECRET_URI
)或您的主机 Meerschaum 配置(例如MRSM{meerschaum:connectors:sql:source}
)。
连接器
首先,让我们快速复习一下 Meerschaum 连接器:您可以通过多种方式定义连接器,最流行的方式是通过环境变量。假设您在环境文件中定义了连接密钥:
export MRSM_SQL_REMOTE='postgresql://user:pass@host:5432/db'export MRSM_FOO_BAR='{ "user": "abc", "password": "def"}'
第一个环境变量 MRSM_SQL_REMOTE
将定义连接器sql:remote
。如果您引用此文件,可以使用命令mrsm show connectors sql:remote
验证此连接器。
第二个变量是定义自定义FooConnector
的示例,可以使用插件中的@make_connector
装饰器创建。自定义连接器是一个强大的工具,但现在,这是基本的结构:
from meerschaum.connectors import make_connector, Connector@make_connectorclass FooConnector(Connector): REQUIRED_ATTRIBUTES = ['username', 'password'] def fetch(pipe, **kwargs): docs = [] return docs
所以,我们刚刚回顾了如何在我们的主机环境中定义连接器。让我们看看如何在Meerschaum项目中使这些主机连接器可用。在compose文件中,我们需要的所有连接器都在config:meerschaum:connectors
下定义。使用MRSM{}
语法引用主机环境的键,并将它们传递到项目中。
config: meerschaum: instance: "sql:app" connectors: sql: app: MRSM{meerschaum:connectors:sql:remote} foo: bar: MRSM{meerschaum:connectors:foo:bar}
插件
Meerschaum可以通过插件轻松扩展,插件是Python模块。插件可以获取数据,实现自定义连接器和/或扩展Meerschaum(例如自定义操作、标志、API端点等)。
Meerschaum支持多个插件目录(通过
MRSM_PLUGINS_DIR
),可以在mrsm-compose.yaml
中的plugins_dir
键下设置(默认目录为plugins
)。
将插件存储在Compose项目中可以清楚地说明您希望如何使用插件。例如,MongoDBConnector项目中的Compose文件演示了如何将自定义连接器用作连接器和实例。
包管理
当您首次开始使用Meerschaum Compose时,您会注意到它将开始安装大量Python包。不用担心您的环境――一切都安装在您的项目的root
子目录中的虚拟环境中(有点讽刺,对吧?)。您可以使用mrsm compose init
安装插件的依赖项。
要在项目之间共享软件包,请将mrsm-compose.yml
中的root_dir
键设置为新路径。删除此root
目录将有效地卸载Compose下载的所有软件包,从而保持您的主机环境完好无损。
结论
Meerschaum Compose已成为我构建小猪AI-scale ETL项目的首选工具。它具有与Meltano或dbt类似的工作流程,但具有较低的入门门槛和对ETL过程的更动态的控制。它是一个整洁的工作流程,可以以团队友好的方式组织插件、连接器和管道。
您可以使用Meerschaum Compose模板存储库快速启动新项目――请参阅MongoDBConnector插件或Tech Slam’N Eggs演示以获取实用示例。
请随意将您的项目添加到Awesome Meerschaum列表中!