Press "Enter" to skip to content

现代数据工程

平台专属工具和高级技术

来自Unsplash的Christopher Burns的照片

现代数据生态系统不断演变,新的数据工具时常出现。在这篇文章中,我想讨论那些影响数据工程师的关键因素。我们将讨论如何运用这些知识来支持先进的分析流程和运营卓越。

  • 现代数据工程(DE)。它是什么?
  • 你的DE是否足够好以支持先进的数据流程和商业智能(BI)?
  • 你的数据流程是否高效?
  • 从技术角度来看,实现运营卓越需要哪些条件?

去年十月,我写过关于数据工程师的崛起、角色、挑战、责任、日常工作以及如何在这个领域取得成功的文章。数据工程必然不断变化,但主要趋势似乎保持不变。

如何成为数据工程师

2024年初学者的捷径

towardsdatascience.com

作为一名数据工程师,我几乎每天都要设计高效的数据流程。因此,以下这些考虑因素可能对我们回答这些问题有所帮助。

  • ETL vs ELT
  • 简化数据连接和API集成
  • ETL框架的爆炸性增长
  • 数据基础架构即代码
  • 数据网格和分散式数据管理
  • 通过人工智能实现商业智能流水线的民主化
  • 关注数据素养

ELT vs ETL

DataformDBT这样的流行SQL数据转换工具在ELT方法的普及方面做出了重要贡献[1]。在数据存储的地方执行所需的数据转换,如清洗、丰富和提取,这样做是有道理的。通常,这是我们基础架构中的数据仓库解决方案(DWH)。云平台领导者使得DWH(Snowflake、BigQuery、Redshift、Firebolt)的基础架构管理变得非常简单,在许多情况下,它们在成本效益和速度方面将超过专门的内部基础架构管理团队。

数据仓库示例。作者:author

这也可能是一个位于中心位置的数据湖,这取决于我们的数据平台类型和所使用的工具。在这种情况下,SQL在许多情况下将不再是一个选择,这使得那些对编程不熟悉的用户难以查询数据。Databricks、Tabular和Galaxy等工具试图解决这个问题,确实感觉像是未来的趋势。事实上,数据湖可以存储包括非结构化数据在内的所有类型的数据,我们仍然需要能够分析这些数据集。

数据湖示例。作者:author。

想象一下具有事务一致性、可进行时刻快照隔离的数据湖表。

我以前在一篇关于Apache Iceberg表格格式的文章中介绍过这个问题[2]。

介绍Apache Iceberg表格

选择 Apache Iceberg 作为数据湖的几个引人注目的理由

towardsdatascience.com

简化数据集成

FivetranStitc 这样的托管解决方案专门用于轻松管理第三方 API 的集成。这些日子里,许多公司选择这种方法来简化与外部数据源的数据交互。这对于那些不熟悉编码的数据分析团队来说是正确的选择。

实际上,如果已经存在并在云端进行管理,我们为什么要从头开始构建数据连接器呢?

这种方法的缺点是其定价模式。

在企业级的数据摄取,即大数据管道方面,往往是基于行的,可能会变得非常昂贵。这就是开源替代方案发挥作用的地方。像 AirbyteMeltano 这样的框架可能是一种简单快捷的解决方案,可以部署一个数据源集成微服务。

如果你没有时间学习一个新的 ETL 框架,你可以自己创建一个简单的数据连接器。如果你懂一点 Python,这将是一项微不足道的任务。在我以前的一篇文章中,我写了如何轻松创建一个从NASA API中获取数据的微服务 [3]:

面向数据工程师的 Python

初学者的高级ETL技术

towardsdatascience.com

考虑一下 app.py 的这段代码:

import requestssession = requests.Session()url="https://api.nasa.gov/neo/rest/v1/feed"apiKey="your_api_key"requestParams = {    'api_key': apiKey,    'start_date': '2023-04-20',    'end_date': '2023-04-21'}response = session.get(url, params = requestParams, stream=True)print(response.status_code)

它可以部署在任何云供应商平台,并按照所需的频率进行调度运行。使用像 Terraform 这样的工具部署我们的数据管道应用程序始终是一种良好的实践。

ETL框架的爆炸式增长

我们可以目睹各种数据提取和转换的各种ETL框架的“寒武纪爆炸”。毫无疑问,其中许多都是开源的且基于Python的。

Luigi [8] 是其中之一,它可以帮助创建ETL流水线。它是由Spotify创建的,用于管理庞大的数据处理工作负载。它有一个命令行界面和很棒的可视化功能。然而,即使是基本的ETL流水线也需要一定水平的Python编程技巧。从我的经验来看,我可以告诉你,在严格而直接的流水线方面,它非常出色。我发现使用Luigi实现复杂的分支逻辑特别困难,但在许多情况下它效果很好。

Python ETL (PETL) [9] 是最广泛使用的用于简单数据转换的开源ETL框架之一。它在处理表格、从外部数据源提取数据并执行基本的ETL方面非常有用。在许多方面,它类似于 Pandas,但后者在底层拥有更多的分析功能。PETL非常适合聚合和基于行的ETL。

Bonobo [10] 是另一个轻量级的开源数据处理工具,非常适合快速开发、自动化和批处理数据流水线的并行执行。我喜欢它的一个优点是它使得与各种数据文件格式(如SQL、XML、XLS、CSV和JSON)一起工作变得非常容易。对于那些拥有最基本Python知识的人来说,它将是一个很好的工具。除此之外,我还喜欢它在处理半复杂数据模式时的表现。它非常适合简单ETL,并且可以在Docker容器中运行(它有一个Docker扩展)。

Pandas 是数据领域的绝对强者,在这个故事中没有必要涵盖它的功能。值得一提的是,它的数据框转换已被包括在许多现代数据仓库的基本数据加载方法中。将数据加载到BigQuery数据仓库解决方案中时,考虑以下数据加载示例:

从google.cloud导入bigqueryfrom google.oauth2导入service_account...#验证BigQuery客户端:service_acount_str = config.get('BigQuery')#使用configcredentials = service_account.Credentials.from_service_account_info(service_acount_str)client = bigquery.Client(credentials=credentials, project=credentials.project_id)...def load_table_from_dataframe(table_schema, table_name, dataset_id):    #!源数据文件格式必须是外部数组JSON:    """    [    {"id":"1"},    {"id":"2"}    ]    """    双引号= """            [    {"id":"1","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]},    {"id":"2","first_name":"John","last_name":"Doe","dob":"1968-01-22","addresses":[{"status":"current","address":"123 First Avenue","city":"Seattle","state":"WA","zip":"11111","numberOfYears":"1"},{"status":"previous","address":"456 Main Street","city":"Portland","state":"OR","zip":"22222","numberOfYears":"5"}]}    ]    """    body = json.loads(blob)    print(pandas.__version__)    table_id = client.dataset(dataset_id).table(table_name)    job_config = bigquery.LoadJobConfig()    schema = create_schema_from_yaml(table_schema)     job_config.schema = schema    df = pandas.DataFrame(    body,    #在加载的表中,列的顺序反映了DataFrame中列的顺序。    columns=["id", "first_name","last_name","dob","addresses"],    )    df['addresses'] = df.addresses.astype(str)    df = df[['id','first_name','last_name','dob','addresses']]    print(df)    load_job = client.load_table_from_dataframe(        df,        table_id,        job_config=job_config,    )    load_job.result()    print("Job finished.")

Apache Airflow,例如,并不是一个ETL工具,但它可以将我们的ETL流程组织成一个漂亮的可视化依赖图(DAGs),以描述任务之间的关系。典型的Airflow架构包括基于元数据的调度器、执行器、工作者和任务。

例如,我们可以在将数据导出到云存储(bq_export_op)之后运行ml_engine_training_op,并将此工作流程设为每天或每周运行。

ML model training using Airflow. Image by author.

考虑下面的例子。

它创建了一个简单的数据流水线图,将数据导出到云存储桶,然后使用MLEngineTrainingOperator训练ML模型。

"""推荐_bespoke模型训练的DAG定义。"""import airflowfrom airflow import DAGfrom airflow.contrib.operators.bigquery_operator import BigQueryOperatorfrom airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperatorfrom airflow.hooks.base_hook import BaseHookfrom airflow.operators.app_engine_admin_plugin import AppEngineVersionOperatorfrom airflow.operators.ml_engine_plugin import MLEngineTrainingOperatorimport datetimedef _get_project_id():  """从默认的GCP连接获取项目ID。"""  extras = BaseHook.get_connection('google_cloud_default').extra_dejson  key = 'extra__google_cloud_platform__project'  if key in extras:    project_id = extras[key]  else:    raise('必须在Airflow Console中配置google_cloud_default连接中的project_id')  return project_idPROJECT_ID = _get_project_id()#数据集常量,用于BigQuery任务。  您可以将这些数据更改为符合您的数据的规范。DATASET = 'staging' #'analytics'TABLE_NAME = 'recommendation_bespoke'#GCS桶名称和区域,也可以更改。BUCKET = 'gs://rec_wals_eu'REGION = 'us-central1' #'europe-west2' #'us-east1'JOB_DIR = BUCKET + '/jobs'default_args = {    'owner': 'airflow',    'depends_on_past': False,    'start_date': airflow.utils.dates.days_ago(2),    'email': ['mike.shakhomirov@gmail.com'],    'email_on_failure': True,    'email_on_retry': False,    'retries': 5,    'retry_delay': datetime.timedelta(minutes=5)}#默认的定时间隔使用cronjob语法-可以在这里自定义#或在Airflow控制台中进行自定义。schedule_interval = '00 21 * * *'dag = DAG('recommendations_training_v6', default_args=default_args,          schedule_interval=schedule_interval)dag.doc_md = __doc__###任务定义###向GCS导出大型查询培训数据的BigQueryt1 = BigQueryToCloudStorageOperator(    task_id='bq_export_op',    source_project_dataset_table='%s.recommendation_bespoke' % DATASET,    destination_cloud_storage_uris=[training_file],    export_format='CSV',    dag=dag)# ML引擎训练jobtraining_file = BUCKET + '/data/recommendations_small.csv'job_id = 'recserve_{0}'.format(datetime.datetime.now().strftime('%Y%m%d%H%M'))job_dir = BUCKET + '/jobs/' + job_idoutput_dir = BUCKETdelimiter=','data_type='user_groups'master_image_uri='gcr.io/my-project/recommendation_bespoke_container:tf_rec_latest'training_args = ['--job-dir', job_dir,                 '--train-file', training_file,                 '--output-dir', output_dir,                 '--data-type', data_type]master_config = {"imageUri": master_image_uri,}t3 = MLEngineTrainingOperator(    task_id='ml_engine_training_op',    project_id=PROJECT_ID,    job_id=job_id,    training_args=training_args,    region=REGION,    scale_tier='CUSTOM',    master_type='complex_model_m_gpu',    master_config=master_config,    dag=dag)t3.set_upstream(t1)

泡泡 [11] 是 Python 世界中另一个开源的 ETL 工具。它非常适合快速开发,我喜欢它通过元数据描述数据流水线的方式。Bubbles 的创建者称之为“抽象框架”,并表示它可以从许多其他编程语言中使用,不仅仅限于 Python。

还有许多其他具有更具体应用的工具,例如从网页中提取数据的工具(PyQuery、BeautifulSoup 等)和并行数据处理。这可以是另一个故事的话题,但我以前写过其中一些工具,例如joblib库 [12]

基于代码的数据基础设施

基础设施即代码(IaC)是一个流行且非常实用的管理数据平台资源的方法。即使是对于数据而言,这现在几乎是一种标准,并且向潜在雇主展示您熟悉 DevOps 标准肯定是一个极好的亮点。使用类似 Terraform(跨平台)和 CloudFormation 的工具,我们可以轻松地将开发工作和部署(运维)集成起来。

通常,我们希望为数据流水线拥有演练和生产数据环境。这有助于测试我们的流水线并促进团队之间的协作。

请看下面的图表。它解释了数据环境的工作原理。

数据环境。来自作者的图片

通常情况下,我们可能需要一个额外的沙盒用于测试目的,或者在我们的 ETL 服务触发 CI/CD 工作流时运行数据转换单元测试。我以前在这里写过:

适用于初学者的基础设施即代码

使用这些模板专业地部署数据流水线

levelup.gitconnected.com

使用 AWS CloudFormation 模板文件,我们可以描述所需的资源及其依赖关系,以便我们可以一起启动和配置它们。

如果您是一个数据专业人员,这种方法无疑将有助于在不同的数据环境中工作,并更快、更一致地复制数据平台资源,避免出现错误。

问题是,许多数据从业人员不熟悉 IaC,并且在开发过程中常常出现许多错误。

数据网格和分散式数据管理

在过去的十年中,数据空间发展得非常快,现在我们有许多数据工具和框架。 数据网格 定义了当我们拥有不同的数据领域(公司部门)与各自团队和共享数据资源时的状态。每个团队都有自己的目标、关键绩效指标、数据角色和责任。

很长一段时间以来,数据官僚主义一直是许多公司的痛点。

这种数据平台类型 [4] 可能看起来有些混乱,但它被设计成一个成功高效的选择,使得不同的团队可以访问跨领域的数据集并独立运行分析或 ETL 任务。

确实,如果您是一个数据分析师并且不熟悉 Spark,Snowflake 可能是您最喜欢的数据仓库解决方案。然而,当您希望在没有数据工程师帮助的情况下读取数据湖中的数据时,通常情况下这都是一个微不足道的问题。在这种情况下,对数据集的大量元数据记录可能非常有用,这就是数据网格如此成功的原因。

它使了解数据及其来源、以及其他团队如何更好地利用以前未知的数据集的用户能够掌握这些信息。

有时候,数据集和数据源连接变得非常复杂,因此将所有元数据和数据集描述放在一个单一的真实数据存储区或仓库中,总是一个好的做法。

在我以前的一些故事中 [5],我讨论了 SQL 作为团队和数据的统一查询语言的角色。确实,它在分析上有优势、自描述且具备动态性,这使得它成为所有数据用户的完美工具。

通常一切变成了一团糟

这一事实使得像 DBT、Jinja 和 Dataform 这样基于 SQL 的模板引擎非常受欢迎。 想象一下,您拥有一个类似于 SQL 的平台,其中所有数据集及其转换都被详细描述和定义 [6]。

Dataform的依赖图和元数据。图片作者:author.

理解数据团队与数据源和模式之间的关系可能是一个巨大的挑战。往往这一切都纠缠在数据集依赖和ETL转换的团团乱麻中。数据工程在辅导、改善数据素养和让公司其他部门运用先进的数据处理技术和最佳实践方面发挥着至关重要的作用。

通过人工智能实现商业智能流程的民主化

提高数据可访问性一直是数据领域中的热门话题,但有趣的是,整个数据流程设计过程变得越来越易于为之前不熟悉数据的团队所掌握。现在几乎每个部门都可以利用内置的人工智能能力在数据上进行复杂的商业智能转换。

他们只需要用自己的话描述他们希望在商业智能方面获得的内容

例如,像Thoughspot这样的商业智能工具使用具有直观的“类似于Google搜索界面”的人工智能,从任何现代数据仓库解决方案(如Google Big Query,Redshift,Snowflake或Databricks)中获取数据洞察。

现代数据堆栈包括帮助进行数据建模和可视化的商业智能工具。其中许多工具已经具备了内置的人工智能能力,可以根据用户行为更快地获得数据洞察。

我相信将GPT(生成预训练模型)和商业智能进行整合是一项相当容易的任务。在接下来的几年中,我们将看到许多使用这项技术的新产品。

GPT可以预处理文本数据,生成理解您意图并回答您问题的SQL查询。

结论

在本文中,我试图对当前影响数据工程角色的主要数据趋势进行了高层次的概述。数据网格和模板化SQL以及依赖图是使整个分析过程民主化的关键。带有复杂ETL技术和转换的先进数据流程现在可以为组织中的每个人透明化。数据流程对其他团队来说越来越易于访问,他们不需要掌握编程知识就可以学习和理解ETL的复杂性。数据网格和元数据有助于解决这个问题。根据我的经验,我发现越来越多的人学习SQL来贡献于转换层。在“先进数据分析”时代出生的公司可以轻松访问云供应商产品及其托管服务,这无疑有助于获得所需的数据技能并改进它们以获得竞争优势。

[1] https://medium.com/towards-data-science/data-pipeline-design-patterns-100afa4b93e3

[2] https://towardsdatascience.com/introduction-to-apache-iceberg-tables-a791f1758009

[3] https://towardsdatascience.com/python-for-data-engineers-f3d5db59b6dd

[4] https://medium.com/towards-data-science/data-platform-architecture-types-f255ac6e0b7

[5] https://medium.com/towards-data-science/advanced-sql-techniques-for-beginners-211851a28488

[6] https://medium.com/towards-data-science/easy-way-to-create-live-and-staging-environments-for-your-data-e4f03eb73365

[7] https://docs.thoughtspot.com/cloud/latest/search-sage

[8] https://github.com/spotify/luigi

[9] https://petl.readthedocs.io/en/stable/

[10] https://www.bonobo-project.org

[11] http://bubbles.databrewery.org/

[12] https://medium.com/towards-data-science/how-to-become-a-data-engineer-c0319cb226c2

Leave a Reply

Your email address will not be published. Required fields are marked *