黏贴到ML工程中的一个重要组成部分是构建可靠且可扩展的程序,以从文件存储或数据库中提取数据、转换数据、丰富数据并加载数据。这是数据科学家和ML工程师最常合作的组件之一。通常,数据科学家提出了数据集应该是什么样子的粗略版本。理想情况下,不是在Jupyter笔记本上。然后,ML工程师加入这个任务,支持使代码更易读、高效和可靠。
ML ETL可以由多个子ETL或任务组成。它们可以采用非常不同的形式实现。以下是一些常见的例子:
- 基于Scala的Spark作业,在S3中以Parquet文件的形式读取和处理事件日志数据,并通过Airflow每周定期调度。
- Python进程通过定期调度的AWS Lambda函数执行Redshift SQL查询。
- 通过使用EventBridge触发器,通过Sagemaker处理作业执行复杂的以pandas为主的处理。
ETL中的实体
我们可以在这些类型的ETL中识别出不同的实体,我们有来源(原始数据所在的地方),目标(最终数据存储的位置),数据处理(数据获取、处理和加载的方式)和触发器(ETL的启动方式)。
- 在来源中,我们可以有AWS Redshift、AWS S3、Cassandra、Redis或外部API等存储。目标也是一样的。
- 数据处理通常在临时Docker容器中运行。我们可以使用Kubernetes或其他AWS托管服务(如AWS ECS或AWS Fargate)来添加另一层抽象。或者使用SageMaker Pipelines或处理作业。您可以通过利用特定的数据处理引擎(如Spark、Dask、Hive、Redshift SQL引擎)在集群中运行这些过程。另外,您可以使用Python进程和Pandas进行简单的单实例处理。除此之外,还有一些其他有趣的框架,如Polars、Vaex、Ray或Modin,可以用来解决中间问题。
- 最受欢迎的触发工具是Airflow。其他可以使用的工具有Prefect、Dagster、Argo Workflows或Mage。
我应该使用框架吗?
框架是一组抽象、约定和开箱即用的工具,可以用于在解决具体问题时创建更统一的代码库。框架对于ETL非常方便。正如我们之前所描述的,存在非常通用的实体,可能可以通过抽象或封装来生成全面的工作流。
我建议构建内部数据处理框架的步骤如下:
- 首先,构建一个连接器库,用于连接到不同的来源和目标。根据您在不同项目中的需要逐步实现它们。这是避免YAGNI的最佳方式。
- 创建一个简单且自动化的开发工作流,以便快速迭代代码库。例如,配置CI/CD工作流以自动测试、lint和发布您的软件包。
- 创建实用程序,如读取SQL脚本、启动Spark会话、日期格式化函数、元数据生成器、日志记录工具、用于获取凭据和连接参数的函数以及报警工具等。
- 选择是构建用于编写工作流的内部框架还是使用现有框架。考虑到内部开发的复杂性范围广泛。您可以从构建一些简单的工作流常规开始,最终构建基于DAG的带有通用类(如Luigi或Metaflow)的库。这些是您可以使用的流行框架。
构建“Utils”库
这是您的数据代码库的关键和核心部分。您的所有过程都将使用此库将数据从一个来源移动到另一个目标。一个坚实而周密的初始软件设计至关重要。
但是,为什么我们想要这样做呢? 好吧,主要原因是:
- 可重用性:在不同的软件项目中使用相同的软件组件可以提高生产力。 只需要开发一次软件组件。 然后,可以将其集成到其他软件项目中。 但是,这个想法并不新鲜。 我们可以在 1968年的一次会议上找到相关参考资料,该会议旨在解决所谓的软件危机。
- 封装:不需要将库内部的所有连接器的内部显示给最终用户。 因此,提供一个易于理解的接口就足够了。 例如,如果我们有一个连接器连接到数据库,我们不希望连接字符串作为连接器类的公共属性暴露出来。 通过使用库,我们可以确保对数据源的安全访问。
- 更高质量的代码库:我们只需开发测试一次。 因此,开发人员可以依赖库,因为它包含一个测试套件(理想情况下,具有非常高的测试覆盖率)。 当调试错误或问题时,如果我们对测试套件有信心,可以在第一次通过时忽略问题在库中的问题。
- 标准化 / “意见”:拥有连接器库在某种程度上决定了您开发ETL的方式。 这是很好的,因为组织中的ETL将具有在不同数据源中提取或写入数据的相同方式。 标准化促进更好的沟通,更高的生产力以及更好的预测和计划。
在构建此类型的库时,团队承诺随着时间的推移对其进行维护,并承担在需要时执行复杂重构的风险。 需要进行这些重构的原因可能有:
- 组织迁移到不同的公共云。
- 数据仓库引擎更改。
- 新的依赖版本破坏接口。
- 需要进行更多的安全权限检查。
- 新的团队以对库设计有不同意见。
接口类
如果要使您的ETL与来源或目的地无关,那么创建基本实体的接口类是一个不错的决策。 接口用作模板定义。
例如,您可以为定义DatabaseConnector所需方法和属性的抽象类创建抽象类。 让我们展示一个简化的示例,说明此类可能的外观:
从abc中导入ABC类数据库连接器(ABC): def __init__(self,connection_string:str): self.connection_string = connection_string @abc.abstractmethod def connect(self): pass @abc.abstractmethod def execute(self,sql:str): pass
其他开发人员可以从DatabaseConnector中派生并创建新的具体实现。 例如,可以以这种方式实现 MySqlConnector 或 CassandraDbConnector 。 这将帮助最终用户快速了解如何使用从DatabaseConnector派生的任何连接器,因为它们都具有相同的接口(相同的方法)。
mysql = MySqlConnector(connection_string)mysql.connect()mysql.execute(“SELECT * FROM public.table”)cassandra = CassandraDbConnector(connection_string)cassandra.connect()cassandra.execute(“SELECT * FROM public.table”)
具有以良好命名的方法的简单接口非常强大,可以提高生产力。 所以我的建议是花费时间思考它。
正确的文档
文档不仅指代码中的文档字符串和行内注释。 它还指您在库中给出的周围解释。 写出关于软件包的最终目标和贡献要求和指南的精确说明很重要。
例如:
"此工具库将在所有机器学习数据流水线和特征工程作业中使用,以提供对组织中不同系统的简单可靠的连接器。"
或者
"该库包含一组用于特征工程的方法、转换和算法,可以使用简单的接口直接使用,并且可以像 scikit-learn 的流水线一样进行链式调用。"
明确工具库的使命可以为贡献者提供正确的解释。这就是为什么开源库(如:pandas、scikit-learn等)在最近几年中变得非常流行的原因。贡献者们拥抱了这个库的目标,并致力于遵循规定的标准。我们在组织中应该做类似的事情。
在声明了使命之后,我们应该开发基础软件架构。我们希望接口看起来如何?我们应该通过接口方法提供更灵活的功能(例如:更多的参数导致不同的行为)还是更细粒度的方法(例如:每个方法具有非常具体的功能)?
之后是编码规范。确定首选模块层次结构、所需的文档深度、如何发布PR、覆盖率要求等。
关于代码中的文档,文档字符串需要对函数行为进行足够描述,但我们不应该只是复制函数名称。有时,函数名称本身已经足够有表达力,不需要再解释其行为。要简明准确。让我们提供一个简单的例子:
❌不好!
class NeptuneDbConnector: ... def close(): """This function checks if the connection to the database is opened. If it is, it closes it and if it doesn’t, it does nothing. """
✅好!
class NeptuneDbConnector: ... def close(): """关闭与数据库的连接。"""
谈到内联注释,我总是喜欢使用它们来解释一些看起来奇怪或不规则的事情。此外,如果我必须使用复杂逻辑或特殊语法,最好在代码片段之上写清晰的解释。
# 获取列表中的最大整数
l = [23, 49, 6, 32]
reduce((lambda x, y: x if x > y else y), l)
此外,您还可以包含指向Github问题或Stackoverflow答案的链接。这对您来说非常有用,特别是如果您不得不编写奇怪的逻辑来解决已知的依赖问题。如果您从Stackoverflow中获取了优化技巧,这是非常方便的。
这两种方式:接口类和清晰的文档,是我认为保持共享库长期使用的最佳方法。它将适应那些懒惰和保守的新开发人员,同时也适应充满活力、激进和拥有高度主观意见的人。变更、改进或革命性的重构将会很顺利。
将软件设计模式应用于ETL
从代码的角度来看,ETL应该具有三个明确定义的高级函数。每个函数与以下步骤之一相关联:提取、转换、加载。这是ETL代码的最简要求之一。
def extract(source: str) -> pd.DataFrame: ...def transform(data: pd.DataFrame) -> pd.DataFrame: ...def load(transformed_data: pd.DataFrame): ...
当然,命名这些函数并不是强制的,但是这将增加可读性,因为它们是广泛接受的术语。
DRY(不要重复你自己)
这是一个很好的设计模式,可以解释为什么需要一个连接器库。您只需编写一次,然后在不同的步骤或项目中重用它。
函数式编程
这是一种旨在使函数“纯净”或无副作用的编程风格。输入必须是不可变的,并且给定这些输入,输出始终相同。这些函数在隔离测试和调试时更容易。因此,对于数据流水线来说,提供了更好的可重现性。
应用函数式编程到ETL,我们应该能够提供幂等性。这意味着每次运行(或重新运行)管道时,它应该返回相同的输出。有了这个特性,我们能够自信地操作ETL,并确保重复运行不会生成重复数据。有多少次你不得不创建一个奇怪的SQL查询来从错误的ETL运行中删除插入的行?确保幂等性有助于避免这些情况。Maxime Beauchemin,Apache Airflow和Superset的创建者,是一个广受赞誉的函数式数据工程的支持者。
SOLID
我们将使用类定义的引用,但这部分也可以应用于一级函数。我们将使用大量面向对象编程来解释这些概念,但这并不意味着这是开发ETL的最佳方式。没有具体的共识,每个公司都有自己的方式。
关于单一责任原则,您必须创建只有一个变更原因的实体。例如,将职责分离到两个对象中,如SalesAggregator和SalesDataCleaner类。后者可能包含清理销售数据的特定业务规则,前者则专注于从不同系统中提取销售数据。两个类的代码可能因不同原因而发生变化。
对于开闭原则,实体应该是可扩展的,以添加新功能,但不能被修改。想象一下,SalesAggregator接收一个StoresSalesCollector作为组件,用于从实体店中提取销售数据。如果公司开始在线销售,我们想要获取那些数据,我们会声明如果它可以使用具有兼容接口的另一个OnlineSalesCollector,那么SalesCollector对于扩展是开放的。
from abc import ABC, abstractmethodclass BaseCollector(ABC): @abstractmethod def extract_sales() -> List[Sale]: passclass SalesAggregator: def __init__(self, collectors: List[BaseCollector]): self.collectors = collectors def get_sales(self) -> List[Sale]: sales = [] for collector in self.collectors: sales.extend(collector.extract_sales()) return salesclass StoreSalesCollector: def extract_sales() -> List[Sale]: # 从实体店提取销售数据class OnlineSalesCollector: def extract_sales() -> List[Sale]: # 从在线销售提取数据if __name__ == "__main__": sales_aggregator = SalesAggregator( collectors = [ StoreSalesCollector(), OnlineSalesCollector() ] sales = sales_aggregator.get_sales()
里士科夫替换原则,或者行为子类型化,在ETL设计中并不那么直观,但在之前提到的实用库中是如此。该原则尝试为子类型设定规则。在使用父类型的给定程序中,可以潜在地将其替换为一个子类型,而不会改变程序的行为。
from abc import ABC, abstractmethodclass DatabaseConnector(ABC): def __init__(self, connection_string: str): self.connection_string = connection_string @abstractmethod def connect(): pass @abstractmethod def execute_(query: str) -> pd.DataFrame: passclass RedshiftConnector(DatabaseConnector): def connect(): # Redshift连接实现 def execute(query: str) -> pd.DataFrame: # Redshift连接实现class BigQueryConnector(DatabaseConnector): def connect(): # BigQuery连接实现 def execute(query: str) -> pd.DataFrame: # BigQuery连接实现class ETLQueryManager: def __init__(self, connector: DatabaseConnector, connection_string: str): self.connector = connector(connection_string=connection_string).connect() def run(self, sql_queries: List[str]): for query in sql_queries: self.connector.execute(query=query)
我们可以看到下面的示例中,任何DatabaseConnector的子类型都符合里士科夫替换原则,因为ETLManager类中可以使用其任何子类型。
现在,让我们谈谈接口隔离原则。它声明客户端不应依赖于它们不使用的接口。这个原则对于DatabaseConnector设计非常有用。如果您正在实现一个DatabaseConnector,不要用不会在ETL上下文中使用的方法重载接口类。例如,您不需要grant_permissions或check_log_errors等方法。这些方法与数据库的管理使用相关,而不是与ETL相关。
首先介绍依赖倒置原则。这个原则说高层模块不应该依赖低层模块,而是应该依赖抽象层。上面的SalesAggregator清楚地说明了这种行为。请注意,它的__init__方法不依赖于StoreSalesCollector或OnlineSalesCollector的具体实现,而是依赖于BaseCollector接口。
一个优秀的ML ETL是什么样的?
在上面的示例中,我们大量使用面向对象的类来展示如何将SOLID原则应用于ETL作业。然而,关于构建ETL时应遵循的最佳代码格式和标准并没有普遍一致的观点。它可以采用非常不同的形式,更多的是一个内部有着良好文档化以及有着明确意见框架的问题,正如前面讨论的那样,而不是试图在整个行业中达成全局标准。
因此,在本节中,我将试图重点解释一些使ETL代码更易读、更安全和更可靠的特点。
命令行应用程序
所有您可以想到的数据处理过程基本上都是命令行应用程序。在使用Python开发ETL时,始终提供一个参数化的命令行接口,以便可以从任何地方执行它(例如,可以在Kubernetes集群下运行的Docker容器中)。有多种工具可用于构建命令行参数解析,例如argparse、click、typer、yaspin或docopt。Typer可能是最灵活、易于使用且与现有代码最不冲突的工具。它是由著名的Python网络服务库FastApi的创建者开发的,其GitHub贡献者持续增加。文档很好,正在越来越成为行业标准。
from typer import Typerapp = Typer()@app.command()def run_etl( environment: str, start_date: str, end_date: str, threshold: int): ...
要运行上述命令,只需执行:
python {file_name}.py run-etl --environment dev --start-date 2023/01/01 --end-date 2023/01/31 --threshold 10
进程 vs 数据库引擎计算权衡
在基于数据仓库的ETL上构建时,通常建议尽可能多地将计算处理推送到数据仓库中。如果您有一个能根据需求自动扩展的数据仓库引擎,那么这样做是可以的。但并非每个公司、情况或团队都是如此。某些ML查询可能非常耗时,并且很容易使集群超载。通常需要从非常不同的表中聚合数据,在多年的数据中进行回溯,执行时点子句等。因此,将所有内容都推送到集群并不总是最佳选择。在某些情况下,将计算隔离到进程实例的内存中可能更安全。这是一种无风险的做法,因为它不会对集群造成影响,也不会破坏或延迟业务关键查询。这对于Spark用户来说是一个显而易见的情况,因为由于其需要的大规模,所有的计算和数据都是分布在执行器中的。但是,如果您正在使用Redshift或BigQuery集群,请始终注意可以委托给它们多少计算。
跟踪输出结果
ML ETL会生成不同类型的输出工件。这些工件可以是HDFS中的Parquet文件、S3中的CSV文件、数据仓库中的表、映射文件、报告等。这些文件以后可以用于训练模型、在生产中进行数据增强、在线获取特征等多种选项。
这非常有助于通过工件标识符将数据集构建作业与训练作业关联起来。例如,在使用Neptune track_files()方法时,您可以跟踪此类文件。这里有一个非常清晰的示例可供参考。
实现自动回填
想象一下你有一个每日ETL过程,获取昨天的数据来计算用于训练模型的特征。如果由于任何原因你的ETL在某一天未能运行,那么在下一天运行时,你将会丢失先前计算的昨天的数据。
为解决此问题,一种好的做法是查看目标表或文件中的最后一个注册时间戳。然后,ETL可以针对滞后的两天执行。
开发松耦合组件
代码很容易改变,而依赖数据的流程更加如此。构建表的事件可能会演变,列可能会改变,大小可能会增加等等。当你的ETL依赖于不同的信息源时,将它们隔离在代码中是很好的做法。这是因为如果有任何时候你必须将这两个组件分开作为两个不同的任务(例如:一个需要更大的实例类型来运行处理,因为数据已经增长),如果代码不会成为一团乱麻,那么这样做会更容易!
使您的ETL可幂等
通常会多次运行相同的过程,以防源表或过程本身存在问题。为了避免生成重复的数据输出或半填充的表格,ETL应该是可幂等的。也就是说,如果意外地使用相同条件两次运行相同的ETL,那么第一次运行的输出或副作用不应受到影响(参考)。您可以通过应用“删除写入”模式来确保ETL遵守这一点,即在写入新数据之前,管道将首先删除现有数据。
保持您的ETL代码简洁
我总是喜欢在实际实现代码和业务/逻辑层之间做出明确的分离。当我构建ETL时,第一层应该被读作一系列明确说明数据处理情况的步骤(函数或方法)。抽象层次多样并不是坏事。如果您需要多年维护ETL,这将非常有帮助。
始终将高层次和低层次函数相互隔离。发现如下所示的情况非常奇怪:
from config import CONVERSION_FACTORSdef transform_data(data: pd.DataFrame) -> pd.DataFrame: data = remove_duplicates(data=data) data = encode_categorical_columns(data=data) data["price_dollars"] = data["price_euros"] * CONVERSION_FACTORS["dollar-euro"] data["price_pounds"] = data["price_euros"] * CONVERSION_FACTORS["pound-euro"] return data
在上面的示例中,我们使用了“remove_duplicates”和“encode_categorical_columns”等高层函数,但同时我们明确显示了一个实施操作,用于通过转换因子将价格转换。将那两行代码移除并替换成“convert_prices”函数,会不会更好呢?
from config import CONVERSION_FACTORdef transform_data(data: pd.DataFrame) -> pd.DataFrame: data = remove_duplicates(data=data) data = encode_categorical_columns(data=data) data = convert_prices(data=data) return data
在这个例子中,可读性不成问题,但想象一下,如果你将一个包含5行代码的groupby操作嵌入“transform_data”中,同时又有“remove_duplicates”和“encode_categorical_columns”。在两种情况下,你都将高层和低层函数混合在一起。强烈推荐保持代码的内聚性分层。有时候保持函数或模块100%的内聚性分层是不可避免和过度设计,但追求这个目标是非常有益的。
使用纯函数
不要让副作用或全局状态复杂化您的ETL。纯函数在传入相同参数的情况下返回相同的结果。
❌下面的函数不是纯函数。您正在传递一个与另一个函数连接的数据框,后者从外部源读取。这意味着表格可能会变化,因此每次以相同参数调用函数时,可能会返回不同的数据框。
def transform_data(data: pd.DataFrame) -> pd.DataFrame: reference_data = read_reference_data(table="public.references") data = data.join(reference_data, on="ref_id") return data
为了使这个函数变得纯净,您需要做以下操作:
def transform_data(data: pd.DataFrame, reference_data: pd.DataFrame) -> pd.DataFrame: data = data.join(reference_data, on="ref_id") return data
现在,当传递相同的“data”和“reference_data”参数时,该函数将产生相同的结果。
这只是一个简单的示例,但我们都见证过更糟糕的情况。函数依赖于全局状态变量的修改;根据某些条件改变类属性的状态的方法,可能会改变ETL中其他即将到来的方法的行为等等。
尽可能多地使用纯函数将导致更加功能强大的ETL。正如我们在前面的几点中已经讨论过的那样,这带来了巨大的好处。
尽可能多地参数化
ETL会发生变化。这是我们必须接受的。源表定义发生变化,业务规则发生变化,期望的结果发生变化,实验不断改进,机器学习模型需要更复杂的特征等等。
为了使我们的ETL具有一定程度的灵活性,我们需要仔细评估在哪些地方投入大量工作以提供参数化的ETL执行。参数化是通过简单界面更改参数即可改变过程行为的特征。界面可以是YAML文件,类初始化方法,函数参数,甚至是CLI参数。
一个简单直接的参数化方法是定义ETL的“环境”或“阶段”。在将ETL运行到生产环境中会影响下游处理和系统之前,有一个“测试”、“集成”或“开发”隔离环境是很好的,这样我们就可以测试我们的ETL。这个环境可能涉及不同程度的隔离。可以从执行基础设施(开发实例与生产实例隔离)、对象存储、数据仓库、数据源等方面进行。
这是一个明显的参数,可能是最重要的参数。但我们还可以将参数化扩展到与业务相关的参数。我们可以将窗口日期参数化为运行ETL的日期,可以变化或精炼的列名、数据类型、过滤值等。
精确适量的日志记录
这是ETL最被低估的属性之一。日志对于检测生产运行时的异常或隐含错误或解释数据集非常有用。记录有关提取数据的属性始终是有用的。除了在代码中进行验证以确保不同的ETL步骤成功运行之外,我们还可以记录:
- 对源表、API或目标路径的引用(例如:“从`item_clicks`表获取数据”)
- 预期模式的更改(例如:“`promotion`表中有一列”)
- 提取的行数(例如:“从`item_clicks`表提取了145234093行”)
- 关键列中的空值数量(例如:“在源列中找到了125个空值”)
- 数据的简单统计信息(例如均值、标准差等)(例如:“CTR均值:0.13,CTR标准差:0.40”)
- 分类列的唯一值(例如:“Country列包括:’Spain’、’France’和’Italy’”)
- 消除重复行的数量(例如:“删除了1400行重复行”)
- 计算密集型操作的执行时间(例如:“聚合操作花费了560秒”)
- ETL不同阶段的完成检查点(例如:“富集过程成功完成”)
Manuel Martín 是一位具有超过6年数据科学专业知识的工程经理。他之前曾担任过数据科学家和机器学习工程师的职位,现在领导Busuu的ML/AI实践。
[Manuel Martín](https://www.linkedin.com/in/manuelmart%C3%ADn11/) 是一位具有超过6年数据科学专业知识的工程经理。他曾在数据科学和机器学习工程师领域工作,并且现在在Busuu负责机器学习和人工智能的领域。