Press "Enter" to skip to content

使用枚举和functools升级您的Pandas数据管道

编程

通过两个逐步示例查看更有效的数据处理编程

Image used with permission by my talented sister ohmintyartz

在处理原始数据时,您可能已经使用过Pandas创建了一个数据处理管道。编写过滤、分组和计算数据的代码只是构建数据管道和ETL过程的第一步。

大规模处理数据意味着我们应该编写功能齐全的代码,易于阅读和维护。

有很多不同的方法可以改进现有的数据管道,例如添加高效的日志记录,包括数据验证,甚至使用Pandas之外的新库,如PySpark和Polars。

此外,您还可以以不同的方式构造用于处理数据的实际代码。这意味着不一定是为了提高管道性能而做一些事情,而是专注于编写易于修改且随时间迭代的代码。

在本文中,让我们使用一些本机Python,特别是使用枚举functools,通过两个简单的示例来看看如何做到这一点。

可以在您选择的笔记本或IDE中随意跟随!您可以在此处从Kaggle下载数据集,以CC0 1.0通用(CC0 1.0)公共领域捐赠许可证免费使用。然后导入并运行以下内容,我们可以开始了!

import pandas as pdfrom pandas import DataFrameimport numpy as npfrom functools import reducefrom enum import EnumFILE_PATH = "/Updated_sales.csv"CHUNK_SIZE = 1000def read_raw_data(file_path: str, chunk_size: int=1000) -> DataFrame:    csv_reader = pd.read_csv(file_path, chunksize=chunk_size)    processed_chunks = []    # append the processed chunk to the list    for chunk in csv_reader:        chunk = chunk.loc[chunk["Order ID"] != "Order ID"].dropna()        processed_chunks.append(chunk)    # concatenate the processed chunks into a single DataFrame    return pd.concat(processed_chunks, axis=0)df = read_raw_data(file_path=FILE_PATH)

使用枚举更好地构造数据管道

枚举的简要介绍

您可能首先想知道“什么是枚举”?

枚举,缩写为枚举,是“一组符号名称(成员)绑定到唯一值”(Python文档,2023年)。从实际意义上讲,这意味着您可以在一个主要的“类”下定义和使用一组相关变量。

一个简单的例子是有一个枚举类“Color”,并且有像“Red”、“Green”和“Blue”这样的名称,您可以在需要引用特定颜色时使用。

接下来,您可能会想知道在数据处理管道中单独定义一些变量的枚举类的意义是什么,当您可以直接调用所需的名称时。

枚举有一些关键优点:

  • 定义枚举让您在一个(或多个)类中拥有相关的常量,这些常量可以作为您在管道中调用的维度、度量和其他常量的真实来源;
  • 使用枚举将允许您避免在数据管道中传递无效值,假设您正确地定义和维护枚举类;
  • 枚举允许用户使用标准化的数据点和常量,这在多个人基于一个主要数据源进行聚合或创建模型时很有帮助(有助于避免原始数据源中同一列的多个定义或别名)。

这听起来有点抽象,所以让我们看看在处理数据时如何实际应用枚举,在一个标准的预处理管道示例中。

在数据处理管道中使用枚举

我们已经有了初始的DataFrame,所以让我们开始创建一个函数,通过拆分购买地址来为我们的数据添加更多的列。

def split_purchase_address(df_to_process: DataFrame) -> DataFrame:    df_address_split = df_to_process["Purchase Address"].str.split(",", n=3, expand=True)    df_address_split.columns = ["街道名称", "城市", "州和邮政编码"]        df_state_postal_split = (        df_address_split["州和邮政编码"]        .str.strip()        .str.split(" ", n=2, expand=True)    )    df_state_postal_split.columns = ["州代码", "邮政编码"]        return pd.concat([df_to_process, df_address_split, df_state_postal_split], axis=1)

接下来,我们可以使用原生的Pandas pipe方法将其应用到现有的表中,我们在DataFrame上调用pipe并将函数名作为参数传递。

processed_df = df.pipe(split_purchase_address)

使用枚举和functools升级您的Pandas数据管道 数据科学 第2张

接下来,您会发现我们拥有的数据仍然处于非常粒度化的水平,其中订单ID是表的主键。当我们想要聚合数据进行进一步的分析时,我们可以使用Pandas中的groupby方法来做到这一点。

在Pandas中,您可能会看到一些代码,用于在一组列上对数据进行分组,然后对其中一个维度(在本例中我们将使用订单ID)进行聚合计数,如下所示:

# groupby normallygrouped_df = (    processed_df    .groupby(        ["产品", "订购数量", "街道名称", "城市", "州代码", "邮政编码"]    )    ["订单ID"]    .count()    .reset_index()    .sort_values("订单ID", ascending=False)    .rename({"订单ID": "订单ID计数"}, axis=1))

这将产生一个新的DataFrame,其外观如下:

使用枚举和functools升级您的Pandas数据管道 数据科学 第3张

在这个简单的例子中,仅按六个列进行分组并不太困难,我们可以将这些列的列表直接传递给groupby方法。然而,这有一些缺点:

  • 如果我们使用更大的数据集并想要按20个列进行分组怎么办?
  • 如果我们的最终用户有新的需求,并且我们需要调整要分组的特定列怎么办?
  • 如果底层表格发生变化,列的名称或别名发生了变化怎么办?

我们可以在枚举类中部分解决这些问题。特别是对于这种情况,我们可以在一个新的类SalesGroupByColumns中定义与我们的销售表有关的这些分组列,如下所示:

class SalesGroupByColumns(Enum):    产品 = "产品"    订购数量 = "订购数量"    街道名称 = "街道名称"    城市 = "城市"    州代码 = "州代码"    邮政编码 = "邮政编码"

我们所做的最终只是在一个新的枚举类中定义列作为常量(这是从from enum import Enum导入的)。

现在我们已经定义了这些新的枚举值,我们可以像这样访问枚举的各个成员:

SalesGroupByColumns.产品

使用枚举和functools升级您的Pandas数据管道 数据科学 第4张

SalesGroupByColumns.产品.value

使用枚举和functools升级您的Pandas数据管道 数据科学 第5张

仅调用枚举名称将返回枚举成员,调用目标枚举的value让我们直接访问枚举成员的字符串值。现在,要将枚举的所有成员获取为我们可以传递给groupby的列表,我们可以使用列表推导式,如下所示:

[column.value for column in SalesGroupByColumns]

使用枚举和functools升级您的Pandas数据管道 数据科学 第6张

把它们放进一个列表中后,我们可以将这个输出结果赋值给一个变量,然后将这个变量传递给我们的groupby方法,而不是直接传递原始字符串列表:

# groupby adjustedgroupby_columns = [column.value for column in SalesGroupByColumns]grouped_df = (    processed_df    .groupby(groupby_columns)    ["Order ID"]    .count()    .reset_index()    .sort_values("Order ID", ascending=False)    .rename({"Order ID": "Count of Order IDs"}, axis=1))grouped_df.head()

使用枚举和functools升级您的Pandas数据管道 数据科学 第7张

我们得到了和之前一样的表格,但代码看起来更加清晰。这对于可维护性的好处可以在您随着时间而进行的处理管道上看到。

例如,您可能会发现您想要添加一些新列,例如,如果您还想进行更多的特征工程并创建一个房屋编号和产品类别列,然后再添加到分组中。您可以像这样更新您的枚举类:

# what's the benefit? adding new columns!class SalesGroupByColumns(Enum):    PRODUCT = "Product"    QUANTITY_ORDERED = "Quantity Ordered"    STREET_NAME = "Street Name"    CITY = "City"    STATE_CODE = "State Code"    POSTAL_CODE = "Postal Code"    HOUSE_NUMBER = "House Number"    PRODUCT_CATEGORY = "Prouct Category"    # then you run the code same as before and it would still work

然后,您不需要更改现有的处理代码,因为列表推导式会自动抓取SalesGroupByColumns类中的所有值,并将其应用到聚合逻辑中。

这里需要注意的是,只有当您确切地知道您在枚举类中定义的是什么,并且仅按照预期使用它们时,所有这些才能正常工作。如果您在这里进行更改,并且您正在从几个不同的表格中获取所有这些列以进行分组,则重要的是这就是您打算做的。

否则,您可以将需要在特定表格中使用的枚举集合定义为单独的类,或者如果在列的单独列表中(因此您仍然避免将原始字符串列表传递给groupby方法)合理的话。

在Pandas中使用枚举进行数据聚合的另一个例子

例如,假设我们有一个不同的情况,我们通过更改一些列的数据类型并创建一个新的总成本列来对数据进行了一些额外的转换。我们可以像这样添加到我们之前的管道中:

def convert_numerical_column_types(df_to_process: DataFrame) -> DataFrame:    df_to_process["Quantity Ordered"] = df_to_process["Quantity Ordered"].astype(int)    df_to_process["Price Each"] = df_to_process["Price Each"].astype(float)    df_to_process["Order ID"] = df_to_process["Order ID"].astype(int)        return df_to_processdef calculate_total_order_cost(df_to_process: DataFrame) -> DataFrame:    df_to_process["Total Cost"] = df_to_process["Quantity Ordered"] * df_to_process["Price Each"]    return df_to_processprocessed_df = (    df    .pipe(split_purchase_address)    .pipe(convert_numerical_column_types)    .pipe(calculate_total_order_cost))

使用枚举和functools升级您的Pandas数据管道 数据科学 第8张

现在,我们的DataFrame在Order ID级别上进行了转换,接下来让我们在一组新的列上执行另一个分组,但这次是在一些不同的度量上进行聚合:

# let's say we have a file now "SalesColumns.py"# we can add to itimport numpy as npclass AddressColumns(Enum):    STREET_NAME = "Street Name"    CITY = "City"    STATE_CODE = "State Code"    POSTAL_CODE = "Postal Code"    class SalesMeasureColumns(Enum):    TOTAL_COST = "Total Cost"    QUANTITY_ORDERED = "Quantity Ordered"        # then separately we can do the groupbygroupby_columns = [column.value for column in AddressColumns]grouped_df = (    processed_df    .groupby(groupby_columns)    .agg(        Total_Cost=(SalesMeasureColumns.TOTAL_COST.value, np.sum),        Total_Quantity_Ordered=(SalesMeasureColumns.QUANTITY_ORDERED.value, np.sum)    )    .reset_index()    .sort_values("Total_Cost", ascending=False))

使用枚举和functools升级您的Pandas数据管道 数据科学 第9张

这里有几个要注意的关键点:

  • 我们定义了一组新的枚举类:AddressColumnsSalesMeasureColumns。现在对于另一个我们想要按地址字段进行分组的表,我们可以定义 groupby_columns 列表,包含那些列,然后将其传递给转换后的 DataFrame 上的 groupby 方法。
  • SalesMeasureColumns 类包含我们要进行聚合的度量。将原始表中的列名放入该类中意味着,如果其他人也想对成本和订购数量进行求和,他们可以调用正确的列。

我们还可以在之前的管道中添加链式管道和我们之前定义的功能,并将此代码放在新函数中以收集列列表并聚合表。然后最终代码变得更易于阅读,并且随着时间的推移,更易于调试和记录。

对于聚合,总成本和订购数量也可能针对不同的表、团队和最终用户定义不同。在 SalesMeasuresColumns 枚举中定义它意味着对于销售表和度量,所有用户都可以在这些列上使用相同的定义进行聚合。

利用 functools 中的 reduce 方法来清理数据过滤

现在,让我们看看如何使用 functools 中的 reduce 方法来改善我们过滤数据的方式。

在 Pandas 中,过滤原始数据的常见方法是使用 loc 方法。例如,让我们编写一些代码,以在包含“North”的街道上过滤我们的数据。

grouped_df.loc[grouped_df["Street Name"].str.contains("North")]

使用枚举和functools升级您的Pandas数据管道 数据科学 第10张

现在,我们的输出 DataFrame 仅包含符合过滤条件的列。通常,在进行特定问题的分析或为机器学习模型调整数据集时,您可能需要对 DataFrame 应用多个过滤器。您可以使用 loc 方法应用多个过滤器,例如:

grouped_df.loc[    (grouped_df["Street Name"].str.contains("North")) &    (grouped_df["Postal Code"].str.contains("940")) &    (grouped_df["Total_Cost"] < 1000)]

使用枚举和functools升级您的Pandas数据管道 数据科学 第11张

接下来,就像之前我们在枚举中优化代码一样,如果我们想要添加、编辑或删除某些过滤器以进行此过程怎么办?来自最终用户的更改要求或从某些探索性数据分析中发现的新见解可能意味着您需要随时间调整如何过滤数据。

与其直接在 loc 方法中继续添加更多代码行,不如在变量中定义一组过滤器,然后稍后将其传递给 loc

filter_conditions = [    grouped_df["Street Name"].str.contains("North"),    grouped_df["Postal Code"].str.contains("940"),    grouped_df["Total_Cost"] < 1000]

但是,我们不能直接将此列表传递到 loc。在 loc 中接受多个条件的格式是来自 DataFrame 的布尔掩码,这些掩码由 & 运算符分隔。

# 不起作用 -> 你不能只是把一个列表传递给 locdf.loc[FILTER_CONDITIONS]# 格式应该是这样的df.loc[condition_1 & condition_2 & condition_3]

这就是 functools.reduce 方法的作用。我们如何实现正确的格式可以在这里的示例中看到:

# functools reduce
reduce(lambda x, y: f"{x} & {y}", ["condition_1", "condition_2", "condition_3"])

使用枚举和functools升级您的Pandas数据管道 数据科学 第12张

在这个简单的例子中,我们只是将一堆字符串组合在一起,以输出我们后来需要在loc方法中编写的内容。

来自functools的reduce方法允许您将函数和可迭代对象作为参数传递。然后,reduce方法累积地将函数应用于可迭代对象中的元素。这意味着它将按顺序为可迭代对象中的元素集和组合执行函数。

在我们的例子中,我们有一个匿名的lambda函数,它采用两个参数xy,并使用f-string将它们组合起来,并用&运算符将它们分开。这意味着首先该函数将输出condition_1 & condition_2,然后它将把condition_3添加到第一个累积值中,结果为condition_1 & condition_2 & condition_3

现在,我们可以将此reduce方法与函数和过滤条件一起传递给loc方法,而不是逐个将所有原始过滤条件传递给loc

grouped_df.loc[reduce(lambda x, y: x & y, filter_conditions)]

使用枚举和functools升级您的Pandas数据管道 数据科学 第13张

我们应用了所有过滤器后,最终的DataFrame如下。现在,如果您想要包含另一个过滤器,则只需将它们添加到现有的filter_conditions变量中,而不更改其余代码。

本文的重点是介绍一些不同的代码结构方式,以改善阅读性和可维护性。虽然这些更改不一定会加速您的流水线或提高内存使用率,但重要的是要考虑您的代码易于使用的程度。

这可以有助于两个方面:

  1. 如果您有多个人随着时间的推移处理相同的原始数据,则在集中的位置定义列名称和过滤器意味着每个人都可以参考单个真实来源,并避免使用不同的名称和逻辑来引用最终相同的内容;
  2. 在处理包括变化要求的数据时,您只需要在一个地方修复列名称和逻辑,以加快开发时间。

我希望您会发现这些有关改进数据流程的提示有用。尝试使用这种结构来改进数据处理并查看它是否适合您!

如果您喜欢我的内容,请考虑关注我并使用下面的推荐链接注册成为小猪AI会员。它只需每月$5,您就可以无限制地访问小猪AI的所有内容。使用我的链接注册可以让我获得少量佣金。如果您已经注册关注我,非常感谢您的支持!

作为小猪AI会员,您的会费的一部分将支付给您阅读的作家,并且您可以完全访问每个故事…

byrondolon.medium.com

更多内容: – 3种有效的方式来按子字符串过滤Pandas DataFrame列 – 5个有用的提示,帮助有抱负的数据分析师 – 使用Python中的堆叠条形图改进数据可视化 – Pandas中的条件选择和分配 – 5(半)行代码,用于理解您的数据与Pandas

Leave a Reply

Your email address will not be published. Required fields are marked *