从数据处理到快速洞察,强大的数据管道对于任何机器学习系统来说都是必不可少的。通常,数据团队,包括数据和机器学习工程师,需要构建这个基础设施,而这个经验可能是痛苦的。然而,在机器学习中高效使用ETL数据管道可以帮助他们的工作更加轻松。
本文探讨了ETL数据管道在机器学习中的重要性,以一个常用工具构建ETL数据管道的实际示例为例,并提出了数据工程师增强和维持管道的最佳方法。我们还讨论了不同类型的机器学习ETL数据管道,并提供了实际应用示例,以帮助数据工程师选择合适的管道。在深入技术细节之前,让我们回顾一些基本概念。
什么是机器学习中的ETL数据管道?
ETL数据管道是一组工具和活动,用于执行所需数据的提取(Extract),转换(Transform)和加载(Load)。

这些活动涉及从一个系统中提取数据,对其进行转换,然后将其处理到另一个目标系统中,可以在其中存储和管理。
机器学习在很大程度上依赖于ETL数据管道,因为模型的准确性和效果直接受到训练数据质量的影响。这些管道通过确保数据清洁、正确格式化和准备好用于机器学习任务,帮助数据科学家节省时间和精力。
此外,ETL数据管道在打破数据孤岛和建立单一真相源方面起着至关重要的作用。让我们详细了解ETL数据管道的重要性。
为什么我们需要机器学习中的ETL管道?
ETL管道的重要性在于它们使组织能够从大型和复杂的数据集中获取有价值的洞察。以下是它们重要的一些具体原因:
- 数据集成:组织可以使用ETL管道集成来自各种来源的数据。这为数据科学家提供了数据的统一视图,并帮助他们决定如何训练模型、超参数的值等。
- 数据质量检查:随着数据通过整合步骤流动,ETL管道可以通过标准化、清洁和验证来提高数据质量。这确保用于机器学习的数据是准确、可靠和一致的。
- 节省时间:由于ETL管道自动化了提取、转换和加载这三个主要步骤的过程,这有助于节省大量时间,并减少人为错误的可能性。这使得数据科学家可以专注于模型的创建或持续改进。
- 可扩展性:现代ETL管道具有可扩展性,即可以根据需要处理的数据量进行扩展或缩减。基本上,它具有根据业务需求进行任何更改的灵活性和敏捷性。
ETL与数据管道有何区别?
数据管道是一个涵盖将数据在不同系统之间移动的类别的总称,而ETL数据管道则是数据管道的一种类型。
— Xoriant
常常会将ETL数据管道和数据管道混用。尽管这两个术语都指的是将数据从各种来源传递到单一存储库的功能和过程,但它们并不相同。让我们探讨为什么我们不应该将它们视为同义词。
比较 |
ETL管道 |
数据管道 |
术语 |
正如缩写所示,ETL涉及一系列的过程,提取数据,转换数据,最后将数据加载到目标源中。 |
数据管道也涉及将数据从一个源移动到另一个源,但不一定需要经过数据转换。 |
重点领域 |
ETL帮助将原始数据转换为结构化格式,便于数据科学家创建模型并为任何数据驱动的决策进行解释。 |
数据管道的创建重点是将数据从各种来源传输到数据仓库中。随后的进程或工作流可以轻松利用这些数据来创建商业智能和分析解决方案。 |
操作 |
ETL管道按计划运行,例如每天、每周或每月。基本的ETL管道是批处理的,数据按照指定的计划以块的形式移动。 |
数据管道通常进行实时处理。数据持续更新,支持实时报告和分析。 |
总结一下,ETL流水线是一种专门设计用于从多个来源提取数据、将其转换为公共格式并加载到数据仓库或其他存储系统的数据流水线类型。虽然数据流水线可以包括各种类型的流水线,但ETL流水线是数据流水线的一个特定子集。
我们介绍了ETL流水线的基本架构,并了解了每个步骤可以为不同目的执行,并且我们可以从各种工具中选择完成每个步骤。ELT架构及其类型因组织而异,因为它们具有不同的技术堆栈、数据源和业务需求。
机器学习中的不同类型的ETL流水线有哪些?
ETL流水线可以根据正在处理的数据类型以及处理方式进行分类。以下是一些类型:
- 批量ETL流水线:这是一种传统的ETL方法,它涉及一次性批量处理大量数据。数据从一个或多个来源提取,转换为所需格式,并加载到目标系统(如数据仓库)。批量ETL对于在历史数据上训练模型或运行定期批处理作业特别有用。
- 实时ETL流水线:这种流水线在数据到达时以接近实时或实时的方式进行处理;连续处理数据意味着任何时候都需要较小的处理能力,并且可以避免使用量的激增。流式/实时ETL对于诸如欺诈检测等需要实时处理的应用程序特别有用。实时ETL流水线需要流处理引擎和消息传递系统等工具和技术。
- 增量ETL流水线:这些流水线仅提取和处理自上次运行以来发生更改的数据,而不是处理整个数据集。它们适用于源数据经常更改,但目标系统只需要最新数据的情况,例如推荐系统等应用程序,其中数据经常更改但不是实时更改。
- 云ETL流水线:云端的机器学习ETL流水线涉及使用基于云的服务从数据中提取、转换和加载数据,以进行训练和部署。云供应商如AWS、Microsoft Azure和GCP提供了一系列可用于构建这些流水线的工具和服务。例如,AWS提供了AWS Glue用于ETL、Amazon S3用于数据存储,以及Amazon SageMaker用于机器学习训练和部署。
- 混合ETL流水线:这些流水线结合了批量和实时处理,利用了两种方法的优势。混合ETL流水线可以按预定间隔处理大批量数据,并在数据到达时捕获实时更新。混合ETL对于需要实时和历史数据的应用程序(例如预测维护)特别有用。
ETL流水线工具
要创建ETL流水线,如上一节所讨论的,我们需要工具,提供以下基本ETL架构步骤功能的工具。市场上有几种可用的工具,以下是其中一些流行的工具以及它们提供的功能。
工具 |
云端 |
预建连接器 |
无服务器 |
预建转换选项 |
API支持 |
完全托管 |
Hevo Data |
|
|
|
|
|
|
AWS Glue |
|
|
|
|
|
|
GCP Cloud Data Fusion |
|
|
|
|
|
|
Apache Spark |
|
|
|
|
|
|
Talend |
|
|
|
|
|
|
Apache Airflow |
|
|
|
|
|
|
如何构建一个机器学习ETL流水线?
在前一节中,我们简要探讨了一些基本的ETL概念和工具,在本节中,我们将讨论如何利用它们来构建一个ETL流水线。首先,让我们来看一下它的架构。
ETL架构
ETL架构的显著特点是数据在到达仓库之前需要经过所有必需的准备过程。因此,最终的存储库包含了干净、完整和可信任的数据,可以在不进行修改的情况下进一步使用。
— Coupler
ETL架构通常包括上面的示意图,概述了ETL流水线中从数据源到最终目的地的信息流。它由三个主要区域组成:落地区、暂存区和数据仓库区。
- 落地区是数据从源位置提取后的第一个目的地。在将数据通过ETL流水线移动之前,它可以存储多个数据批次。
- 暂存区是执行ETL转换的中间位置。
- 数据仓库区是ETL流水线中数据的最终目的地。它用于分析数据以获取有价值的见解,并做出更好的业务决策。
ETL数据流水线架构是分层的。每个子系统都是必不可少的,并且依次,每个子系统都会向下一个子系统提供数据,直到数据到达目的地。

- 数据发现:数据可以来自各种类型的系统,例如数据库、文件系统、API或流式数据源。我们还需要进行数据概要分析,即数据发现,以了解数据是否适合进行ETL。这涉及查看数据结构、关系和内容。
- 数据提取:您可以从各种数据源中提取数据到暂存区或数据湖中。可以使用各种技术进行提取,例如API、直接数据库连接或文件传输。数据可以一次性提取(从数据库中提取)或增量提取(使用API进行提取),或在有变化时提取(从云存储(如S3)上的触发器中提取数据)。
- 数据转换:这个阶段涉及对数据进行清洗、丰富和整理,以适应目标系统的要求。可以使用各种技术来操作数据,例如过滤、聚合、连接或应用复杂的业务规则。在操作数据之前,我们还需要清洗数据,这需要消除任何重复的条目,删除不相关的数据,并识别错误的数据。这有助于提高机器学习算法的数据准确性和可靠性。
- 数据存储:将转换后的数据以适合机器学习模型使用的格式存储起来。存储系统可以是数据库、数据仓库或基于云的对象存储。数据可以以结构化或非结构化的格式存储,具体取决于系统的要求。
- 特征工程:特征工程涉及选择、转换和组合原始数据,以创建可以用于机器学习模型的有意义的特征。它直接影响模型的准确性和可解释性。有效的特征工程需要领域知识、创造力和迭代实验,以确定适用于特定问题的最佳特征集。
现在让我们使用讨论过的工具之一来构建自己的ETL流水线!
使用AirFlow构建ETL流水线
假设我们想创建一个能够将花朵分类为三个不同类别(Setosa、Versicolour、Virginica)的机器学习分类模型。我们将使用一个每周更新的数据集。这听起来像是批处理ETL数据流水线的工作。
为了设置一个批处理ETL数据流水线,我们将使用Apache Airflow,它是一个开源的工作流管理系统,提供了一种简单的方式来编写、调度和监控ETL工作流程。按照下面的步骤来设置您自己的批处理ETL流水线。
以下是我们可以遵循的通用步骤来在AirFlow中创建ETL工作流程:
- 设置Airflow环境:在您的系统上安装和配置Airflow。您可以参考这里的安装步骤。
- 定义DAG并配置工作流程:在Airflow中定义一个有向无环图(DAG),以编排我们的ML分类器的ETL流水线。DAG将包含一组具有依赖关系的任务。对于这个练习,我们使用一个Python运算符来定义任务,并且将DAG的调度设置为“None”,因为我们将手动运行流水线。
创建一个DAG文件 – airflow_classification_ml_pipeline.py,代码如下:
from datetime import timedelta # DAG对象;我们需要它来实例化一个DAG from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.empty import EmptyOperator from airflow.operators.bash import BashOperator from airflow.utils.dates import days_ago from python_functions import download_dataset from python_functions import data_preprocessing from python_functions import ml_training_classification
with DAG( dag_id='airflow_classification_ml_pipeline', ## DAG运行的名称 default_args=args, description='分类机器学习流程', schedule = None, ) as dag:
# 任务1 - 下载数据集 task_download_dataset = PythonOperator( task_id='download_dataset', python_callable=download_dataset ) # 任务2 - 数据预处理 task_data_preprocessing = PythonOperator( task_id='data_preprocessing', python_callable=data_preprocessing ) # 任务3 - 训练机器学习模型 task_ml_training_classification = PythonOperator( task_id='ml_training_classification', python_callable=ml_training_classification ) # 定义工作流程 task_download_dataset >> task_data_preprocessing >> task_ml_training_classification
- 实现ETL任务:实现DAG中定义的每个任务。这些任务将包括从scikit-learn数据包加载iris数据集,数据转换,并使用优化后的数据框架创建机器学习模型。
创建一个包含所有ETL任务的Python函数文件 – etl_functions.py。
from sklearn.datasets import load_iris from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.metrics import confusion_matrix from sklearn.metrics import accuracy_score import pandas as pd import numpy as np def download_dataset(): iris = load_iris() iris_df = pd.DataFrame( data = np.c_[iris['data'], iris['target']], columns = iris['feature_names'] + ['target']) pd.DataFrame(iris_df).to_csv("iris_dataset.csv") def data_preprocessing(): iris_transform_df = pd.read_csv("iris_dataset.csv",index_col=0) cols = ["sepal length (cm)","sepal width (cm)","petal length (cm)","petal width (cm)"] iris_transform_df[cols] = iris_transform_df[cols].fillna( iris_transform_df[cols].mean()) iris_transform_df.to_csv("clean_iris_dataset.csv")
- 监控和管理流水线:现在DAG和工作流程代码都准备好了,我们可以在Airflow服务器上监控整个ETL流程。
1. 在Airflow服务器中获取DAG列表。
Airflow服务器中列出的DAG | 来源:作者 2. 检查工作流程图并运行流水线(触发DAG):
工作流程图 | 来源:作者 3. 监控和查看日志:在触发DAG后,您可以在用户界面上监控DAG的进度。下面的图像显示了所有3个步骤都成功完成。
在用户界面上监控DAG的进度 | 来源:作者 您可以使用用户界面中的甘特图来查看每个任务花费了多少时间:
使用甘特图查看每个任务所花费的时间 | 来源:作者 在这个练习中,我们使用DAG创建了一个ETL工作流程,并没有设置任何调度,但是您可以尝试根据您的喜好设置调度并监控流水线。您还可以尝试使用经常更新的数据集,并根据此来决定是否设置调度。
您还可以通过尝试不同的操作器和执行器来扩展Airflow编排。如果您有兴趣探索实时ETL数据流水线,请按照本教程进行。
构建ML中ETL流水线的最佳实践
对于数据驱动型组织,强大的ETL流水线至关重要。这涉及以下内容:
-
1
有效管理数据源 -
2
确保数据质量和准确性 -
3
优化数据流以实现高效处理
将机器学习模型与数据分析集成,赋予组织以提高准确性的先进能力来预测需求。
构建机器学习(ML)应用程序的ETL(提取、转换、加载)流水线有几个最佳实践。以下是其中一些最重要的实践:
- 从明确理解需求开始。确定支持机器学习模型所需的数据源。确保您使用适当的数据类型。这有助于确认数据格式正确,这对于ML算法有效地处理数据非常重要。从数据的子集开始,并逐渐扩大规模,这有助于对进一步的任务/流程进行检查。
- 纠正或删除数据中的不准确性和不一致性。这很重要,因为ML算法对数据中的不一致性和异常值敏感。
- 保护您的数据,实施访问控制以确保基于角色的数据访问。
- 在可能的情况下,使用分布式文件系统、并行处理、暂存表或缓存技术。这可以加速数据处理并帮助优化您的流水线。这最终有助于提高ML模型的性能。
- 安排或自动化数据驱动的工作流程,以在各个源之间移动和转换数据。
- 监控和记录您的ETL数据,这将被您的机器学习模型使用。例如,您希望跟踪可能影响您的ML模型性能的任何数据漂移。
- 维护您的ETL代码库的版本控制。这有助于跟踪任何更改,与其他开发人员合作,并确保流水线以预期的方式运行,不会影响您的模型性能。
- 如果您使用任何基于云的服务,请使用它们的ETL模板来节省创建一切的时间。
结论
在本文中,我们介绍了ML中ETL数据流水线的不同方面。
-
1
ETL流水线对于创建一个良好的机器学习模型非常重要。 -
2
根据数据和需求设置ETL架构以及使用不同类型的ETL数据流水线。 -
3
使用Airflow构建批处理ETL流水线,我们可以自动化ETL流程。我们还可以记录和监控工作流程,以对发生的一切进行监视。 -
4
如何创建可扩展和高效的ETL数据流水线。
希望本文对您有所帮助。通过参考本文和创建批处理流水线的实践练习,您应该能够自己创建一个流水线。您可以选择本文中提到的任何工具,并开始您的旅程。
祝学习愉快!
参考资料
- https://neptune.ai/blog/deployment-of-data-and-ml-pipelines-crypto-industry
- https://towardsdatascience.com/building-etl-pipelines-for-beginners-17c3492d29d2
- https://www.analyticsvidhya.com/blog/2022/06/a-complete-guide-on-building-an-etl-pipeline-for-beginners/
- https://www.projectpro.io/article/how-to-build-etl-pipeline-example/526
- https://hevodata.com/learn/steps-to-build-etl-pipeline/