Press "Enter" to skip to content

在Apache Spark中优化输出文件大小

管理分区、重新分区和合并分区操作的综合指南

照片由Unsplash上的zhao chen提供

想象一下你在大型Spark数据处理操作中的位置。在Spark优化讨论中,有一个经常提到的原则是,为了获得最佳的I/O性能和增强的并行性,每个数据文件的大小应该大约为128Mb,这是读取文件时的默认分区大小[1]。

把你的文件想象成航行在数据处理海洋中的船只。如果船只太小,就会浪费很多时间停靠和再次起航,这是执行引擎花费额外时间打开文件、列出目录、获取对象元数据、设置数据传输和读取文件的隐喻。相反,如果你的船太大,而且你没有使用港口的许多码头,它们就必须等待一个单独的漫长的装卸过程,这是查询处理等待直到一个单独的读取器完成读取整个文件的隐喻,这减少了并行性[图1]。

图1 - 图片由作者提供

为了生动地说明文件大小优化的重要性,请参考下图。在这个具体的示例中,每个表都包含8 GB的数据。

在Apache Spark中优化输出文件大小 四海 第3张

然而,在处理大型批处理作业时,要在这种微妙的平衡中航行并不容易。你可能会感到对输出文件的数量失去了控制。本指南将帮助你重新获得控制。

理解的关键:分区

当执行写操作时,保存到磁盘的输出文件数量等于Spark执行器中的分区数。然而,在执行写操作之前,估计分区的数量可能会很棘手。

当读取一个表时,Spark默认读取最大大小为128Mb的块(尽管你可以使用sql.files.maxPartitionBytes来更改)。因此,分区的数量取决于输入的大小。但实际上,分区的数量很可能等于sql.shuffle.partitions参数。这个数字的默认值为200,但对于较大的工作负载,这很少够用。查看这个视频,了解如何设置理想的shuffle分区数。

如果ETL中应用了至少一个宽转换,Spark执行器中的分区数等于sql.shuffle.partitions。如果只应用了窄转换,分区的数量将与读取文件时创建的数量相匹配。

当处理非分区表时,设置shuffle分区的数量只能给我们高级控制总分区的能力。一旦我们进入分区表的领域,改变sql.shuffle.partitions参数将不容易控制每个数据文件的大小。

方向盘:重新分区和合并

我们有两种主要的方法来在运行时管理分区的数量:repartition()coalesce()。以下是一个快速概述:

  • Repartitionrepartition(partitionCols, n_partitions)是一个惰性转换,有两个参数 – 分区数和分区列。当执行时,Spark根据分区列在集群中重新洗牌分区。然而,一旦表被保存,关于重新分区的信息就会丢失。因此,在读取文件时不会使用这个有用的信息。
df = df.repartition("column_name", n_partitions)
  • Coalescecoalesce(num_partitions)也是一种惰性转换,但它只接受一个参数-分区的数量。重要的是,coalesce操作不会在集群中传输数据,因此比 repartition更快。此外,coalesce只能减少分区的数量,如果试图增加分区的数量,它将不起作用。
df = df.coalesce(num_partitions)

这里要记住的主要见解是,使用coalesce方法通常更有益处。这并不是说repartitioning没有用处;当我们需要在运行时调整数据框的分区数时,它确实有用。

根据我在ETL过程中的经验,在处理多个大小不同的表并进行复杂的转换和连接时,我发现sql.shuffle.partitions无法提供我所需的精确控制。例如,在同一个ETL中使用相同数量的shuffle分区来连接两个小表和两个大表将是低效的-导致小表的分区过多或大表的分区不足。重新分区还可以帮助我避免倾斜连接和倾斜数据的问题[2]。

尽管如此,重新分区在将表写入磁盘之前不太合适,在大多数情况下,可以用coalesce替代。在写入磁盘之前,coalesce在以下几个方面占优势:

  1. 它防止在集群中不必要地重新分区数据。
  2. 它允许根据逻辑推断对数据进行排序。在写入之前使用repartition方法时,数据会在集群中进行重新分区,导致数据的顺序丢失。另一方面,使用coalesce保留顺序,因为数据是聚集在一起而不是重新分配。

让我们看看为什么对数据进行排序很重要。

地平线上的顺序:数据排序的重要性

我们上面提到,当我们应用repartition方法时,Spark不会将分区信息保存在表的元数据中。然而,在处理大数据时,这是两个原因的关键信息:

  1. 它允许在查询时更快地扫描表。
  2. 它允许更好的压缩-如果处理可压缩格式(如parquet,CSV,Json等)。这是一篇很好的文章,可以了解为什么。

关键是在保存之前对数据进行排序。该信息将保留在元数据中,并且在查询时使用,使查询速度更快。

现在让我们探讨保存到非分区表和分区表之间的差异,以及为什么保存到分区表需要一些额外的调整。

管理分区表中的文件大小

在处理非分区表时,管理保存操作期间的文件数量是一个直接的过程。在保存之前使用coalesce方法将完成任务,无论数据是否排序。

# 在保存非分区表之前使用coalesce方法的示例df.coalesce(10).write.format("parquet").save("/path/to/output")

然而,在处理分区表时,除非数据在合并之前进行排序,否则此方法是无效的。要理解为什么会发生这种情况,我们需要深入研究数据排序和不排序时Spark执行器中发生的操作[图2]。

图2-作者提供的图像

因此,保存数据到分区表的标准过程应该是:

# 在对数据进行排序之后,在分区表中使用coalesce方法的示例df.orderBy("columnName").coalesce(10).write.format("parquet").save("/path/to/output_partitioned")

其他导航辅助工具

repartitioncoalesce之外,您可能会发现maxnumberofrecords很有帮助。这是一个方便的方法,可以防止文件过大,并可与上述方法同时使用。

df.write.option("maxRecordsPerFile", 50000).save("file_path")

最后的思考

精通在Spark作业中控制文件大小往往需要经过试验和错误。在存储空间廉价且处理能力仅需一键之遥的时代,很容易忽视优化。但随着处理上TB和PB级数据成为常态,忽视这些简单的优化技术可能会带来显著的经济、时间和环境成本。

我希望本文能使您能够对ETL流程进行高效的调整。就像一位经验丰富的船长一样,愿您能够自信而清晰地驾驭Spark的海洋。

Leave a Reply

Your email address will not be published. Required fields are marked *