Press "Enter" to skip to content

在AWS上利用PySpark处理大数据的机器学习

在AWS上利用PySpark处理大数据的机器学习 四海 第1张

编辑注:Suman Debnath是ODSC APAC 2023年8月22日至23日的演讲嘉宾。一定要查看他的演讲,“在AWS上使用Spark构建分类和回归模型”!

在不断变化的数据科学领域中,明智地选择和应用合适的工具可以极大地影响您的机器学习项目的结果。向所有数据科学爱好者致以热忱的问候!我很幸运有机会在即将到来的ODSC APAC会议上演讲,该会议定于2023年8月22日举行。我的演讲将重点介绍如何使用PySpark在AWS上开发分类和回归模型。

了解本次会议

在这个引人入胜且互动的会议中,我们将深入研究PySpark MLlib,这是机器学习领域中不可或缺的资源,并探索如何使用AWS Glue/EMR作为我们的平台来实现各种分类算法。

我们的重点将是实践操作,强调对基本机器学习概念的实际应用和理解。参与者将介绍各种机器学习算法,重点关注逻辑回归,这是一种强大的监督学习技术,用于解决二分类问题。

但是,本次会议不仅仅涉及概念和算法。我们还将介绍关键的数据预处理技术,这些技术对于创建有效的机器学习模型至关重要。通过本次会议,参与者将掌握处理缺失值、修改列数据类型以及将数据划分为训练集和测试集的技能。所有这些实践经验将在多功能的AWS Glue/EMR环境中进行。

你将获得什么?

本次会议旨在帮助参与者深入了解以下内容:

  • PySpark MLlib
  • 无监督学习技术
  • 各种类型的分类算法
  • 逻辑回归分类器的实现
  • 使用PySpark在AWS上进行数据预处理,使用AWS Glue和Amazon EMR
  • 在AWS上使用PySpark进行模型构建

如果您是数据工程师、数据科学家或机器学习爱好者,并希望开始使用Apache Spark在AWS上进行机器学习,那么本次会议非常适合您。

现在,让我们为您展示一下即将呈现的内容(GitHub代码存储库可在此处找到)。

我们选择了一个数据集,包含20,057个菜名,每个菜名都详细描述了680个列,这些列描述了成分列表、营养含量和菜品的类别。我们的共同目标是预测一个菜品是否是甜点。这是一个简单和大部分明确的问题-通过阅读菜名,我们大多数人都可以简单地将一个菜品分类为甜点或非甜点,这使其成为一个简单的机器学习模型的优秀候选。

步骤1:导入必要的库

第一步是导入必要的库,包括PySpark SQL函数和类型

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.ml.feature import Imputer, MinMaxScaler, VectorAssembler

步骤2:数据预处理和EDA(探索性数据分析)

我们使用Spark的read.csv函数加载食谱CSV数据集。inferSchema参数设置为True,以推断列的数据类型,header设置为True,使用第一行作为标题。

# 加载数据
dataset = 's3://fcc-spark-example/dataset/2023/recipes_dataset/epi_r.csv'
food = (
          spark
              .read
              .csv(dataset, inferSchema=True, header=True)
      )
     
# 清理列名
def sanitize_column_name(name):
  answer = name
  for i, j in ((" ", "_"), ("-", "_"), ("/", "_"), ("&", "and")):
      answer = answer.replace(i, j)
  return "".join(
      [
          char
          for char in answer
          if char.isalpha() or char.isdigit() or char == "_"
      ]
  )
food = food.toDF(*[sanitize_column_name(name) for name in food.columns])

脚本的这部分通过替换空格、破折号、斜杠和和号为下划线来清理列名。它还删除非字母数字字符。

# 过滤数据
food = food.where(
  (
      F.col("cakeweek").isin([0.0, 1.0])
      | F.col("cakeweek").isNull()
  )
  & (
      F.col("wasteless").isin([0.0, 1.0])
      | F.col("wasteless").isNull()
  )
)

在这里,我们过滤数据,只保留cakeweek和wasteless列的值为0.0或1.0,或者为空的行。

# 定义标识符、连续变量、目标变量和二进制变量列
IDENTIFIERS = ["title"]
CONTINUOUS_COLUMNS = [
  "rating",
  "calories",
  "protein",
  "fat",
  "sodium",
]
TARGET_COLUMN = ["dessert"]
BINARY_COLUMNS = [
  x
  for x in food.columns
  if x not in CONTINUOUS_COLUMNS
  and x not in TARGET_COLUMN
  and x not in IDENTIFIERS
]

在这一部分,我们定义了哪些列是标识符、连续变量、目标变量和二进制变量。

# 处理缺失值
food = food.dropna(
  how="all",
  subset=[x for x in food.columns if x not in IDENTIFIERS],
)
food = food.dropna(subset=TARGET_COLUMN)
food = food.fillna(0.0, subset=BINARY_COLUMNS)

我们通过删除所有列都为空(不包括标识符列)的行,删除目标列中为空的行,并将二进制列中的空值填充为0.0来处理缺失值。

# 将字符串数字转换为浮点数并限制连续变量
from typing import Optional

@F.udf(T.BooleanType())
def is_a_number(value: Optional[str]) -> bool:
  if not value:
      return True
  try:
      _ = float(value)
  except ValueError:
      return False
  return True
for column in ["rating", "calories"]:
  food = food.where(is_a_number(F.col(column)))
  food = food.withColumn(column, F.col(column).cast(T.DoubleType()))
maximum = {
  "calories": 3203.0,
  "protein": 173.0,
  "fat": 207.0,
  "sodium": 5661.0,
}
for k, v in maximum.items():
  food = food.withColumn(
      k,
      F.when(F.isnull(F.col(k)), F.col(k)).otherwise(
          F.least(F.col(k), F.lit(v))
      ),
  )

在这部分中,我们创建了一个用户定义函数is_a_number来检查字符串是否可以转换为浮点数。我们使用这个函数来过滤掉“rating”和“calories”列中的非数字值,然后将它们转换为双精度类型。

然后,我们将连续变量“calories”、“protein”、“fat”和“sodium”的值限制在指定的最大值以处理可能的异常值。

# 计算每个二进制列的总和
inst_sum_of_binary_columns = [
  F.sum(F.col(x)).alias(x) for x in BINARY_COLUMNS
]
# 选择二进制列的总和并将结果转换为字典
sum_of_binary_columns = (
  food.select(*inst_sum_of_binary_columns).head().asDict()
)
# 计算总行数
num_rows = food.count()
# 识别稀有特征
too_rare_features = [
  k
  for k, v in sum_of_binary_columns.items()
  if v < 10 or v > (num_rows - 10)
]
# 从二进制列中排除稀有特征
BINARY_COLUMNS = list(set(BINARY_COLUMNS) - set(too_rare_features))

接下来,我们计算每个二进制列的总和,并将结果转换为字典。然后,我们识别“稀有”特征——那些在不到10次为True或在所有实例中为True的实例中出现的特征,并将它们从我们的二进制列中移除。

# 创建新的特征
food = food.withColumn(
  "protein_ratio", F.col("protein") * 4 / F.col("calories")
).withColumn(
  "fat_ratio", F.col("fat") * 9 / F.col("calories")
)
# 处理新特征中的缺失值
food = food.fillna(0.0, subset=["protein_ratio", "fat_ratio"])
# 将新特征添加到连续变量列中
CONTINUOUS_COLUMNS += ["protein_ratio", "fat_ratio"]

在这里,我们创建了新的特征“protein_ratio”和“fat_ratio”,分别表示蛋白质和脂肪相对于卡路里的比例。我们用0.0填充这些新特征中的缺失值,并将它们添加到我们的连续列中。

# 在连续列中填充缺失值
OLD_COLS = ["calories", "protein", "fat", "sodium"]
NEW_COLS = ["calories_i", "protein_i", "fat_i", "sodium_i"]
imputer = Imputer(
  strategy="mean",
  inputCols=OLD_COLS,
  outputCols=NEW_COLS,
)
imputer_model = imputer.fit(food)
# 更新连续列
CONTINUOUS_COLUMNS = (
  list(set(CONTINUOUS_COLUMNS) - set(OLD_COLS)) + NEW_COLS
)
# 将imputer模型应用于数据
food = imputer_model.transform(food)

在这个部分,我们使用Spark的Imputer将“calories”、“protein”、“fat”和“sodium”列中的缺失值用其平均值填充。然后,我们更新连续列的列表,包括填充的列。

# 将连续特征组合成一个向量
CONTINUOUS_NB = [x for x in CONTINUOUS_COLUMNS if "ratio" not in x]
continuous_assembler = VectorAssembler(
  inputCols=CONTINUOUS_NB, outputCol="continuous"
)
food_features = continuous_assembler.transform(food)

接下来,我们使用VectorAssembler将连续特征组合成一个单独的向量列“continuous”。

# 缩放连续特征
continuous_scaler = MinMaxScaler(
  inputCol="continuous",
  outputCol="continuous_scaled",
)
food_features = continuous_scaler.fit(food_features).transform(
  food_features
)

最后,我们使用MinMaxScaler将连续特征缩放到[0, 1]范围内,将其拟合到我们的数据并转换我们的数据。此时,我们的数据集已经准备好进行机器学习任务!

现在我们准备进行机器学习训练任务。

步骤3:训练、测试和评估模型

一旦数据经过处理和转换,我们就可以将其拆分为训练集和测试集。在训练模型之后,我们可以使用各种指标评估其性能。在本节中,我们使用我们用于甜点预测特征准备程序的估计器构建了一个ML管道,并将建模步骤加入其中。

from pyspark.ml import Pipeline
import pyspark.ml.feature as MF
imputer = MF.Imputer(
                              strategy="mean",
                              inputCols=["calories", "protein", "fat", "sodium"],
                              outputCols=["calories_i", "protein_i", "fat_i", "sodium_i"],
                            )
continuous_assembler = MF.VectorAssembler(
                                              inputCols=["rating", "calories_i", "protein_i", "fat_i", "sodium_i"],
                                              outputCol="continuous",
                                          )
continuous_scaler = MF.MinMaxScaler(
                                        inputCol="continuous",
                                        outputCol="continuous_scaled",
                                    )
food_pipeline = Pipeline(
                                        stages=[imputer, continuous_assembler, continuous_scaler]
                                    )

我们可以使用向量列类型组装最终的数据集。

preml_assembler = MF.VectorAssembler(
                                              inputCols=BINARY_COLUMNS
                                              + ["continuous_scaled"]
                                              + ["protein_ratio", "fat_ratio"],
                                              outputCol="features",
                                          )
food_pipeline.setStages(
                                        [imputer, continuous_assembler, continuous_scaler, preml_assembler]
                                    )
food_pipeline_model = food_pipeline.fit(food)
food_features = food_pipeline_model.transform(food)

我们的数据框已经准备好进行机器学习!我们有许多记录,每个记录都有:

  • 一个目标(或标签)列dessert,包含一个二进制输入(如果该食谱是甜点,则为1.0,否则为0.0)
  • 一个特征向量features,包含我们想要用于训练机器学习模型的所有信息

我们可以显示预测的结果:

food_features.select("title", "dessert", "features").show(30, truncate=30)

现在让我们使用LogisticRegression分类器来训练一个机器学习模型:

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(
                          featuresCol="features", labelCol="dessert", predictionCol="prediction"
                      )
food_pipeline.setStages(
  [
      imputer,
      continuous_assembler,
      continuous_scaler,
      preml_assembler,
      lr,
  ]
)
# 将数据框拆分为训练集和测试集
train, test = food.randomSplit([0.7, 0.3], 13)
train.cache()
food_pipeline_model = food_pipeline.fit(train)
results = food_pipeline_model.transform(test)
现在让我们评估模型,并查看混淆矩阵
results.select("prediction", "rawPrediction", "probability").show(3, False)
# 使用pivot()创建我们的模型的混淆矩阵
results.groupby("dessert").pivot("prediction").count().show()
最后,我们可以计算模型的精确度和召回率:
lr_model = food_pipeline_model.stages[-1]
metrics = lr_model.evaluate(results.select("title", "dessert", "features"))
print(f"模型精确度:{metrics.precisionByLabel[1]}")
print(f"模型召回率:{metrics.recallByLabel[1]}")

请注意,完整的脚本已经为本教程的目的进行了简化。要全面了解实际应用程序,包括从数据准备到模型部署的详细代码演示,请加入我们的ODSC APAC 2023会议。

这个简短的教程给你展示了ODSC会议中将要涵盖的内容。通过参加会议,您将更深入地探索这些主题,了解PySpark MLlib的复杂性。主要目标是让数据科学爱好者和专业人士能够充分利用Spark MLlib在他们的机器学习项目中的潜力。

请记住,掌握任何技能的关键在于持续学习和实际实施。所以,准备好深入挖掘Spark在AWS上与机器学习的迷人世界,参加ODSC会议吧。期待在那里见到你!

关于作者:

在AWS上利用PySpark处理大数据的机器学习 四海 第2张在AWS上利用PySpark处理大数据的机器学习 四海 第3张Suman Debnath是亚马逊网络服务的首席开发者倡导者(数据工程),主要关注数据工程、数据分析和机器学习。他对大规模分布式系统充满激情,是Python的忠实粉丝。他的背景是存储性能和工具开发,他开发了各种性能基准测试和监控工具。

Leave a Reply

Your email address will not be published. Required fields are marked *