Press "Enter" to skip to content

Apache Beam中的ParDo和DoFn实现详解

ODISSEI在Unsplash上的照片

对初学者详细解释代码

我之前写过一个关于Apache Beam中一些常见转换函数的教程,其中涵盖了map、filter和combinePerKey()。本教程将介绍ParDo转换,它只是另一种执行Map操作的方式。但是不同之处在于,ParDo在每个PCollection中应用转换并将零个或多个元素返回到输出PCollection。而Map转换对于每个输入元素都会输出一个元素。因此,ParDo提供了很多灵活性。

ParDo转换的另一个重要方面是,它需要用户以DoFn形式提供代码。让我们看一些示例。

请随意下载此公共数据集并跟随操作:

样本销售数据 | Kaggle

我使用了Google Colab笔记本来编写此代码,所以安装非常简单。以下是安装的代码:

!pip install --quiet apache_beam

我创建了一个名为“data”的目录,用于存放我们将使用的CSV文件和我们今天的练习的输出。

mkdir -p data

首先,我将只处理数据集中的最简单的内容。读取数据集,并将每一行转换为列表,然后将它们输出到文本文件中。

在Beam管道中读取文本文件非常简单和直接。我们有一个CSV文件。因此,我们将为此定义一个CustomCoder()类,它首先将对象编码为字节字符串,然后将字节解码为其对象,并且最后指定编码器是否保证对值进行确定性编码。以下是编码器的文档。

from apache_beam.coders.coders import Coderclass CustomCoder(Coder):    """用于读写UTF-8字符串的自定义编码器。"""    def encode(self, value):        return value.encode("utf-8", "replace")    def decode(self, value):        return value.decode("utf-8", "ignore")    def is_deterministic(self):        return True

还有一个SplitRow()类,它只是使用Python的.split()函数。

class SplitRow(beam.DoFn):  def process(self, element)...
Leave a Reply

Your email address will not be published. Required fields are marked *