在本地Polars库之外实现高速度

介绍
Polars以其速度、内存效率和优美的API而闻名世界。如果您想知道它的强大之处,不妨看看DuckDB基准测试。而且这些甚至还没有使用最新版本的Polars。
尽管Polars能够完成所有令人惊叹的事情,但在完成您可能想要的所有计算方面,它并不一定是比Pandas更好的解决方案。有一些例外情况Polars还未超越。但随着最近发布的用于Rust的Polars插件系统,情况可能已经有所改变。
Polars插件
什么是Polars插件?它只是一种使用原生Rust创建自己的Polars表达式并将其暴露给使用自定义命名空间的表达式的方法。它允许您利用Rust的速度并将其应用于Polars DataFrame,以便以利用Polars提供的速度和内置工具进行计算。
让我们来看一些具体的例子。
顺序计算
Polars似乎在涉及需要了解DataFrame先前值的操作方面缺乏一些功能。用本地的Polars表达式编写顺序性计算并不总是非常容易或高效的。让我们来看一个具体的例子。
我们有以下算法来计算给定运行的数组的累积值,该运行被定义为具有相同符号的数字集。例如:
┌───────┬───────────┐│ value ┆ run_value ││ --- ┆ --- ││ i64 ┆ i64 │╞═══════╪═══════════╡│ 1 ┆ 1 │ # 运行开始于此│ 2 ┆ 3 ││ 3 ┆ 6 ││ -1 ┆ -1 │ # 运行在此处重置│ -2 ┆ -3 ││ 1 ┆ 1 │ # 运行在此处重置└───────┴───────────┘
因此,我们希望对一列进行累积求和,该列在值的符号从正变为负或从负变为正时会重置。
让我们先从用Pandas编写的基准版本开始。
def calculate_runs_pd(s: pd.Series) -> pd.Series: out = [] is_positive = True current_value = 0.0 for value in s: if value > 0: if is_positive: current_value += value else: current_value = value is_positive = True else: if is_positive: current_value = value is_positive = False else: current_value += value out.append(current_value) return pd.Series(out)
我们遍历一个Series,在每个位置计算运行的当前值,并返回一个新的Pandas Series。
基准测试
在继续之前,我们将进行一些基准测试。我们将使用pytest-benchmark和pytest-memray来测量执行速度和内存消耗。我们将设置问题,使我们有一个实体列、一个时间列和一个特征列。目标是计算数据中每个实体在时间上的运行值。我们将将实体数和时间戳数都设置为1,000,从而获得一个包含1,000,000行的DataFrame。
当我们使用Pandas的groupby apply功能运行我们的Pandas实现对比基准测试时,我们得到以下结果:


Polars Naive Implementation
好的,现在我们有了我们的基准测试。现在让我们看看如何在Polars中实现相同的功能。我们将从一个非常相似的版本开始,通过在Polars GroupBy对象上映射函数来应用它。
def calculate_runs_pl_apply(s: pl.Series) -> pl.DataFrame: out = [] is_positive = True current_value = 0.0 for value in s: if value is None: pass elif value > 0: if is_positive: current_value += value else: current_value = value is_positive = True else: if is_positive: current_value = value is_positive = False else: current_value += value out.append(current_value) return pl.DataFrame(pl.Series("run", out))
现在让我们看看这与我们原始的Pandas基准测试相比如何。


嗯,这个效果不太好。但这并不奇怪。Polars的作者已经明确表示,在Polars中,Pandas中非常常见的groupby apply方法不是一种高效计算方式。这里也可以看出来。速度和内存消耗都比我们原始的Pandas实现要差。
Polars表达式实现
现在让我们将这个相同的函数编写为本地Polars表达式。这是使用Polars的首选和优化方式。算法会有些不同。但这是我用来计算相同输出的方法。
def calculate_runs_pl_native(df: pl.LazyFrame, col: str, by: str) -> pl.LazyFrame: return ( df.with_columns((pl.col(col) > 0).alias("__is_positive")) .with_columns( (pl.col("__is_positive") != pl.col("__is_positive").shift(1)) .over(by) .fill_null(False) .alias("__change_sides") ) .with_columns(pl.col("__change_sides").cumsum().over(by).alias("__run_groups")) .with_columns(pl.col(col).cumsum().over(by, "__run_groups").alias("runs")) .select(~cs.starts_with("__")) )
这里对我们所做的事情做一个快速解释:
- 找到所有特征为正数的行
- 找到所有
__is_positive列与前一行不同的行。 - 对
__change_sides进行累积求和,以标记每个不同的运行。 - 对每个不同的运行中的值进行累积求和。
所以现在我们有了本地Polars函数。让我们再次进行基准测试。


不幸的是,我们没有看到函数执行速度的改善。这可能是由于我们必须进行的over语句的数量,以计算运行值。不过,我们确实看到了预期的内存减少。可能有一种更好的使用Polars表达式实现的方法,但现在我不会担心这个。
Polars插件
现在让我们来看看新的Polars插件。如果您想了解如何设置这些插件的教程,请查看此处的文档。在这里,我主要要展示一种特定的插件实现。首先,我们将在Rust中编写我们的算法。
use polars::prelude::*;use pyo3_polars::derive::polars_expr;#[polars_expr(output_type=Float64)]fn calculate_runs(inputs: &[Series]) -> PolarsResult<Series> { let values = inputs[0].f64()?; let mut run_values: Vec<f64> = Vec::with_capacity(values.len()); let mut current_run_value = 0.0; let mut run_is_positive = true; for value in values { match value { None => { run_values.push(current_run_value); } Some(value) => { if value > 0.0 { if run_is_positive { current_run_value += value; } else { current_run_value = value; run_is_positive = true; } } else if run_is_positive { current_run_value = value; run_is_positive = false; } else { current_run_value += value; } run_values.push(current_run_value); } } } Ok(Series::from_vec("runs", run_values))}
您会注意到这看起来与我们在Python中编写的算法非常相似。我们在这里没有使用任何奇技淫巧!我们使用polars提供的宏来表示输出类型,然后注册我们的新函数作为表达式。
from polars import selectors as csfrom polars.utils.udfs import _get_shared_lib_locationlib = _get_shared_lib_location(__file__)@pl.api.register_expr_namespace("runs")class RunNamespace: def __init__(self, expr: pl.Expr): self._expr = expr def calculate_runs( self, ) -> pl.Expr: return self._expr.register_plugin( lib=lib, symbol="calculate_runs", is_elementwise=False, cast_to_supertypes=True, )
然后,我们可以像这样运行它:
from polars_extentsion import RunNamespacedf.select( pl.col(feat_col).runs.calculate_runs().over(entity_col).alias("run_value")).collect()
好了,现在让我们来看看结果!


现在更像了!我们获得了14倍的速度提升,并且从大约57MiB的内存分配减少到了大约8MiB。
何时使用Polars插件
既然我已经展示了使用插件的强大之处,让我们讨论一下什么时候不应该使用它们。以下是我可能不使用插件的几个原因(每个原因都有自己的注意事项):
- 如果你可以轻松使用原生的Polars表达式编写一个非常快速的计算版本。 Polars的开发人员非常聪明。我不会赌钱,认为自己能写出比他们更快的函数。Polars提供了必要的工具。充分利用它们在哪些方面擅长!
- 如果你的计算没有自然的并行化。 例如,如果我们在单个实体上运行上面的问题,我们的加速可能会明显减少。我们在速度上受益于Rust的快速性,以及Polars能够一次对多个组应用我们的Rust函数的自然能力。
- 如果你不需要顶级的速度或内存性能。 许多人都会同意,写Rust比写Python要困难得多且耗费更多时间。因此,如果您不介意函数运行需要2秒而不是200毫秒,可能就不需要使用插件。
牢记上述事项,我现在列出了一些使我有时倾向于使用插件的要求:
- 速度和内存非常重要。 最近,我将数据流水线的很多功能重写为Polars插件,因为我们在Polars和其他工具之间频繁切换,内存分配变得过大。在我们希望使用的基础设施上以及我们想要的数据量上运行流水线变得困难。使用插件可以在更短的时间内在更小的机器上运行相同的流水线变得容易得多。
- 你有一个独特的用例。 Polars提供了许多内置函数。但它是一个广泛适用于许多问题的通用工具集。有时,这个工具集对于你要解决的问题可能不具体适用。在这种情况下,插件可能正是你想要的。我遇到的最常见的两个例子是更复杂的数学计算,例如应用横向线性回归,或者逐行计算,就像我们在这里展示的那样。
新的插件系统是对Polars已经支持的所有基于列的计算的完美补充。通过这个新功能,Polars允许对其功能进行美丽的扩展。除了编写自己的插件,还可以关注一些正在开发中的很棒的Polars插件包,可以扩展你的功能而无需自己编写插件!
Polars发展迅速,引起了轰动。请查看这个项目,开始使用它,留意它们将发布的其他令人赞叹的功能,也许在此过程中开始学习一些Rust吧!