对初学者详细解释代码
我之前写过一个关于Apache Beam中一些常见转换函数的教程,其中涵盖了map、filter和combinePerKey()。本教程将介绍ParDo转换,它只是另一种执行Map操作的方式。但是不同之处在于,ParDo在每个PCollection中应用转换并将零个或多个元素返回到输出PCollection。而Map转换对于每个输入元素都会输出一个元素。因此,ParDo提供了很多灵活性。
ParDo转换的另一个重要方面是,它需要用户以DoFn形式提供代码。让我们看一些示例。
请随意下载此公共数据集并跟随操作:
样本销售数据 | Kaggle
我使用了Google Colab笔记本来编写此代码,所以安装非常简单。以下是安装的代码:
!pip install --quiet apache_beam
我创建了一个名为“data”的目录,用于存放我们将使用的CSV文件和我们今天的练习的输出。
mkdir -p data
首先,我将只处理数据集中的最简单的内容。读取数据集,并将每一行转换为列表,然后将它们输出到文本文件中。
在Beam管道中读取文本文件非常简单和直接。我们有一个CSV文件。因此,我们将为此定义一个CustomCoder()类,它首先将对象编码为字节字符串,然后将字节解码为其对象,并且最后指定编码器是否保证对值进行确定性编码。以下是编码器的文档。
from apache_beam.coders.coders import Coderclass CustomCoder(Coder): """用于读写UTF-8字符串的自定义编码器。""" def encode(self, value): return value.encode("utf-8", "replace") def decode(self, value): return value.decode("utf-8", "ignore") def is_deterministic(self): return True
还有一个SplitRow()类,它只是使用Python的.split()函数。
class SplitRow(beam.DoFn): def process(self, element)...