Press "Enter" to skip to content

使用PySpark UDF在合成表中创建多对一关系的列

利用简单方程在测试表格中生成相关列。

使用DALL-E 3生成的图像

最近,我一直在使用Databricks Labs Data Generator玩弄完全合成的数据集。作为这个过程的一部分,我研究了如何构建围绕不同的商店、员工和顾客的销售数据。因此,我想要创建我人为填充的列之间的关系,例如将员工和顾客映射到特定的商店。

通过使用PySpark UDF和一些逻辑,我们可以生成遵循多对一关系的相关列。通过一些魔法,我们甚至可以扩展这种映射的逻辑,使之具有一定的变化性,例如顾客通常在本地商店购买,但有时也会从其他商店购买。

使用Databricks Labs Data Generator生成我们的基本DataFrame

注意:如果不需要,请跳过此部分!

首先,我们需要创建一个带有我们第一个随机生成列的DataFrame。在我们的案例中,我们将从商店开始,因为逻辑上我们将有“多个员工在一个商店工作”和“许多顾客重复购物在一个商店”。

考虑到星型模式数据模型,我们要从我们的销售事实表开始,这是一个包含销售Id、商店Id、员工Id和顾客Id的关键值的事务表,还包括一些购买的日期时间数据。然后,我们可以在以后的维度表中填写有关商店、员工和顾客的具体信息。

我们可以先从小规模开始,1000个销售项足够了。现在我们需要决定如何在商店、员工和顾客之间分配这些销售项。让我们建议以下内容:

  • 商店数量=20
  • 员工数量=100
  • 顾客数量=700

我们还可以说销售将在最近一个月内记录:

  • 第一个销售日期=2023年11月1日
  • 最后一个销售日期=2023年11月30日

销售Id需要是独特的列,因此我们可以为此生成一个Id列。现在我们需要将这1000个销售项分配给这20个商店。为了简单起见,我们假设这是随机的。

使用Databricks Lab Generator,我们可以使用以下代码完成:

现在添加一些代码来记录销售的时间和金额。为了保持简单,我们将使销售的时间戳舍入到最近的小时。

要计算销售金额,我们可以使用withColumn表达式中的“expr”参数,允许我们生成一个随机数,并设置一些规则/边界。

在这种情况下,表达式非常简单:产生一个随机数(介于0和1之间),加上0.1(确保销售值不为0),然后乘以350。

现在我们已经有了DataFrame的基本结构,将它们组合起来:

我们可以创建一个快速的数据概述,查看列值的分布情况:

由作者生成的数据概述,使用Databricks生成

我们可以看到,商店ID的分布在这20个商店中相对均匀,没有缺失值,并且平均值在我们预期的中心附近。时间戳和金额的情况也是如此。

添加一个依赖的多对一列

现在我们可以将员工Id列添加到DataFrame中。我们现在已经完成了Databricks Lab Data Generator的使用,所以将只使用PySpark操作来添加列到DataFrame中。

从代码的角度来看,我们希望将其建模为以下陈述:

  • 有20个商店。
  • 每个商店都有多于1位员工。
  • 每位员工只在一个商店工作。

首先,我们需要将员工分配到各个商店。可以使用以下Python函数来完成此操作:

现在,我们已经为每个商店分配了员工,让我们开始分配ID!

employeesPerStore列表确保每个商店的员工ID不重叠。我们可以使用以下等式将员工ID随机分配给表中的销售:

这个函数目前只适用于单个值 – 我们需要将其放入PySpark DataFrame可以快速处理的东西中(在功能上)

我们可以将PySpark UDF传递给withColumn方法,所以让我们将此逻辑重新格式化为一个函数,并将其设置为UDF:

现在在DataFrame中调用此函数作为新列:

我们可以通过在Databricks中使用可视化工具来快速测试是否正确查看每个商店的雇员ID的不同计数。这是我偏好的方式,但如果需要,您也可以使用逻辑分组或其他绘图模块。

作者图像:每个商店的唯一计数雇员ID

重要提醒:此逻辑允许有可能从结果中漏掉雇员。这意味着雇员可能销售额为0,因此不会包含在DataFrame中。我们将在下一节中看如何确保所有客户都有销售记录。

添加客户列

客户列有所不同…虽然我们的使用案例表明客户有可能在同一家商店多次消费,但完全有可能他们有一天去了另一家商店。我们如何建模这个情况呢?

我们已经通过为员工列完成的工作获得了起点,因此可以将get_employees函数和UDF逻辑重复用于客户,如下所示:

在此处我们可能又漏掉了一些客户。以下是纠正此问题的几种方法:

  • 在while循环中重新计算,直到获得包含所有客户的DataFrame(效率低下,昂贵,可能无限运行)
  • 在while循环中随机更新客户ID,直到DataFrame中包含所有客户(需要逻辑仅覆盖相同商店,也可能无限运行)
  • 返回销售表中有多于1条记录的所有客户ID的列表,并随机覆盖,直到添加所有丢失的ID(还需要逻辑以覆盖相同商店中的客户,可能还需要while循环逻辑)
  • 反向进行此过程并从雇员开始,这样可以随机分配每个雇员到行,然后使用映射应用商店ID。

希望清楚为什么最后一种选择是计算量最小的选择 – 我们拥有所需的全部代码,只需要稍微调整一下。

我们的新脚本如下所示:

作者图像:新DataFrame的Databricks数据配置文件

为客户添加随机性

现在我们需要一些随机性,我们需要定义它。对于我们的示例,假设每个客户在通常的商店(”本地”商店)购物的概率为90%。如果不需要在结果集中返回所有客户,我们可以调整我们的customers_udf如下,并使用df2:

该逻辑涉及使用random.choices函数提供加权列表并返回单个值。

为了计算加权列表,我们有客户对应于“本地”商店的权重,这里是90%,因此需要将剩余的10%分配给其他商店,这里有19家商店。因此,每个其他商店被选择的概率将为10/19 = 0.526%。我们可以使用这些百分比填充一个数组,它看起来可能如下所示:[0.526,0.526,0.526,…,90,0.526,…0.526]

将其传递给random.choices,我们然后从带有相应权重的列表中随机选择一个商店ID,并将其作为customer_id变量的输入,就像之前那样。

注意:random.choices的输出返回一个列表(您可以请求k个结果),因此访问列表的第0个元素以获取store_id作为整数值。

如果我们需要将此逻辑与包含所有顾客的DataFrame结合起来,我们可以稍微调整一下这个过程。权重逻辑仍然有效,因此我们可以将其插入随机选择一个商店并将其作为结果返回:

作者图像:Databricks中最终DataFrame的示例

结论

我们成功了!通过严格和宽松的列映射,我们创建了一个合成的DataFrame。现在,您可以继续下一步,填充包含更多描述信息的相关表,例如商店名称、地址、员工名称、角色等维表。这也可以使用Databricks Labs Data Generator或您熟悉的任何其他工具/流程来完成。

在Databricks Labs Data Generator的GitHub Repo上有一些很好的示例GitHub Repo,以及相关的文档,请务必查看,如果您想了解更多。

您可以从以下GitHub Repo访问我所有的代码。

如果您对这个演示有任何想法、评论或替代方案,请在评论中提出。谢谢!

Leave a Reply

Your email address will not be published. Required fields are marked *