Press "Enter" to skip to content

使用 RAPIDS cuDF 利用 GPU 进行特征工程

通过在创建数据框和特征工程中使用cuDF替换Pandas,并与Google Colab集成,提高性能

在解决问题时,特定方法的成功可能不会在不同的规模上产生相同的结果。当距离改变时,鞋子也需要改变。

在机器学习中,数据和数据处理对于确保模型的成功至关重要,而特征工程是该过程的一部分。 当数据较小时,经典的 Pandas 库可以轻松处理 CPU 上的任何处理任务。 然而,在处理大数据时,Pandas 处理速度可能太慢。 提高数据处理和特征工程的速度和效率的一个解决方案是 RAPIDS。

“RAPIDS 是一组开源软件库,可在图形处理单元 (GPU) 上完全执行端到端的数据科学和分析流程。 RAPIDS 加速数据科学管道以创建更具生产力的工作流程。 [1]”

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第1张

RAPIDS 中一种有效地操作表格数据的特征工程和数据预处理工具是 cuDF。RAPIDS cuDF 可以创建 GPU 数据帧,并执行几种 Pandas 操作,例如索引、groupby、合并和字符串处理。正如 RAPIDS 网站所定义的:

“cuDF 是一个 Python GPU 数据框架库(构建在 Apache Arrow 列式内存格式上),可使用类似于 Pandas 的 DataFrame 样式 API 加载、连接、聚合、过滤和操纵表格数据。[2]”

本文试图解释如何使用 cuDF 在 GPU 上创建和操作数据框,并在真实数据集上应用特征工程。

我们的数据集属于 Kaggle 的 Optiver 实现波动率预测。 它包含与金融市场中交易的实际执行相关的股票市场数据,包括订单簿快照和已执行的交易[3]。

我们将在以下部分了解更多有关该数据的信息。 然后,我们将与 Kaggle 和 RAPIDS 集成 Google Colab。 在第三部分中,我们将看到如何使用 Pandas 和 cuDF 对此数据集进行特征工程。 这将为我们提供两个库的比较性能评估。 在最后一部分中,我们将绘制并评估结果。

数据

我们将使用的数据由两组文件[3]组成:

  1. book_[train/test].parquet:一个 parquet 文件,以 stock_id 为分区,提供最具竞争力的买卖订单簿数据,包含被动买卖意向更新。

book_[train/test].parquet 中的特征列:

  • stock_id – 股票的 ID 代码。加载时,Parquet 会将此列强制转换为分类数据类型。
  • time_id – 时间桶的 ID 代码。时间 ID 不一定是连续的,但在所有股票中保持一致。
  • seconds_in_bucket – 从桶开始的秒数,始终从 0 开始。
  • bid_price[1/2] – 最具竞争力的买入价位的规范化价格/第二位。
  • ask_price[1/2] – 最具竞争力的卖出价位的规范化价格/第二位。
  • bid_size[1/2] – 最具竞争力的买入价位上的股票数量/第二位。
  • ask_size[1/2] – 最具竞争力的卖出价位上的股票数量/第二位。

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第2张

该文件大小为 5.6 GB,包含超过 1.67 亿条目。有 112 个股票和 3830 个 10 分钟的时间窗口(time_id)。每个时间窗口(桶)最多有 600 秒。由于每秒可以在每个股票的每个时间窗口发生一个交易意图,所以上述数字的乘积可以解释为什么会有数百万条目。一个警告是,并非每秒都会发生交易意向,这意味着某些特定时间窗口中的某些秒数是缺失的。

  1. trade_[train/test].parquet:一个分区为 stock_id 的 parquet 文件,包含实际执行的交易数据。

trade_[train/test].parquet 中的特征列:

  • stock_id – 如上所述。
  • time_id – 如上所述。
  • seconds_in_bucket – 如上所述。请注意,由于交易和订单簿数据来自同一时间窗口,并且交易数据通常更为稀疏,因此该字段不一定从 0 开始。
  • price – 在一秒钟内发生的执行交易的平均价格。价格已经过规范化,平均值已经根据每次交易中交易的股票数量进行了加权。
  • size – 交易的股票总数。
  • order_count – 发生的独特交易订单数量。

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第3张

trade_[train/test].parquet文件的大小远小于book_[train/test].parquet文件。前者大小为512.5 MB,拥有超过3,800万条目。由于实际交易不必匹配意图,交易数据更为稀疏,因此条目更少。

目标是从相同的stock_id/time_id下的特征数据中预测在接下来的10分钟窗口中计算的实现股票价格波动率。该项目涉及大量的特征工程,应在大型数据集上执行。开发新功能还会增加数据的大小和计算复杂性。一种解决方法是使用cuDF而不是Pandas库。

在本博客中,我们将看到一些特征工程任务和数据框操作,尝试使用Pandas和cuDF来比较它们的性能。但是,我们不会使用所有数据,而是只使用单个股票的记录来看一个示例实现。可以查看笔记本电脑,以查看对整个数据完成的所有特征工程工作。

由于我们在Google Colab上执行代码,因此我们应该首先配置笔记本电脑以集成Kaggle和RAPIDS。

Google Colab笔记本电脑的配置

配置Colab笔记本电脑需要以下几个步骤:

  1. 在Kaggle帐户上创建API令牌,以使用Kaggle服务验证笔记本电脑。

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第4张

进入“设置”并单击“创建新令牌”。将下载一个名为“kaggle.json”的文件,其中包含用户名和API密钥。

  1. 在Google Colab上启动一个新笔记本电脑,并上传kaggle.json文件。

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第5张

通过单击“上传到会话存储”图标,在Google Colab中上传kaggle.json文件。

  1. 单击页面顶部的“Runtime”下拉菜单,然后单击“更改运行时类型”,并确认实例类型为GPU。
  2. 执行以下命令并检查输出,以确保已分配Tesla T4、P4或P100。
!nvidia-smi
  1. 获取RAPIDS-Colab安装文件并检查您的GPU:
!git clone https://github.com/rapidsai/rapidsai-csp-utils.git
!python rapidsai-csp-utils/colab/pip-install.py

如果此单元格的输出中显示您的Colab实例与RAPIDS兼容,则表示您的Colab配置已完成。

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第6张

  1. 检查RAPIDS库是否正确安装:
import cudf, cuml
cudf.__version__

如果设置没有错误,则我们已经完成了Google Colab的配置。现在,我们可以上传Kaggle数据集。

导入和上传Kaggle数据集

我们需要在Colab实例中进行一些安排,以从Kaggle导入数据集。

  1. 安装Kaggle库:
!pip install -q kaggle
  1. 创建名为“.kaggle”的目录:
!mkdir ~/.kaggle
  1. 将“kaggle.json”复制到此新目录中:
!cp kaggle.json ~/.kaggle/
  1. 为此文件分配所需的权限:
!chmod 600 ~/.kaggle/kaggle.json
  1. 从Kaggle下载数据集:
!kaggle competitions download optiver-realized-volatility-prediction
  1. 创建一个目录以存储解压后的数据:
!mkdir train
  1. 在新目录中解压数据:
!unzip optiver-realized-volatility-prediction.zip -d train
  1. 导入我们需要的所有其他库:
import glob
import numpy as np
import pandas as pd
from cudf import DataFrame
import matplotlib.pyplot as plt
from matplotlib import style
from collections import defaultdict
from IPython.display import display
import gc
import time
import warnings
%matplotlib inline
  1. 设置 Pandas 选项:
pd.set_option("display.max_colwidth", None)
pd.set_option("display.max_columns", None)
warnings.filterwarnings("ignore")

print("Threshold:", gc.get_threshold())
print("Count:", gc.get_count())
  1. 定义参数:
# 包含文件的数据目录
DIR = "/content/train/"

# 执行循环的次数
ROUNDS = 30
  1. 获取文件:
# 获取订单和交易书
order_files = glob.glob(DIR + "book_train.parquet" + "/*")
trade_files = glob.glob(DIR + "trade_train.parquet" + "/*")
print(order_files[:5])
print("\n")
print(trade_files[:5])
print("\n")

# 将 stock_ids 作为列表获取
stock_ids = sorted([int(file.split('=')[1]) for file in order_files])
print(f"{len(stock_ids)} 个股票:\n {stock_ids} \n")

现在,我们的笔记本已经准备好运行所有数据帧任务并进行特征工程。

特征工程

本节将讨论 Pandas 数据帧和 cuDF 上的 13 种典型工程操作。我们将看到这些操作需要多长时间以及使用多少内存。让我们首先加载数据。

1. 加载数据

def load_dataframe(files, dframe=0):

   print("加载数据帧", "\n")
  
   # 加载 Pandas 数据帧
   if dframe == 0:
     print("正在加载 Pandas 数据帧..", "\n")
     start = time.time()
     df_pandas = pd.read_parquet(files[0])
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"对于 Pandas 数据帧:\n 开始时间:{start} \n 结束时间:{end} \n 经过的时间:{elapsed_time} \n")
     return df_pandas, elapsed_time

   # 加载 cuDF 数据帧
   else:
     print("正在加载 cuDF 数据帧..", "\n")
     start = time.time()
     df_cudf = cudf.read_parquet(files[0])
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"对于 cuDF 数据帧:\n 开始时间:{start} \n 结束时间:{end} \n 经过的时间:{elapsed_time} \n ")

     return df_cudf, elapsed_time

当 dframe=0 时,数据将被加载为 Pandas 数据帧,否则为 cuDF。例如,

Pandas:

# 加载 pandas 订单数据帧并计算时间
df_pd_order, _ = load_dataframe(order_files, dframe=0)
display(df_pd_order.head())

这将返回 Order Book(book_[train/test].parquet)的前五条记录:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第7张

cuDF:

# 加载 cuDF book 数据帧并计算时间
df_cudf_order, _ = load_dataframe(order_files, dframe=1)
display(df_cudf_order.head())

输出:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第8张 图-7:将数据加载为 cuDF(作者提供)

让我们从 Pandas 版本中获取有关订单簿数据的信息:

# 订单数据帧信息
display(df_pd_order.info())

输出:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第9张 图8:第一只股票的订单簿数据信息(作者提供的图像)

上述图像告诉我们,第一只股票有大约 140 万条记录,占用 47.8 MB 的内存空间。为了减少空间并提高速度,我们应该将数据类型转换为较小的格式,这是我们稍后要做的。

以类似的方式,我们在两个数据帧库中加载交易簿(trade_[train/test].parquet)数据,就像我们对订单簿数据所做的那样。数据及其信息如下所示:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第10张 图9:第一只股票的交易簿数据和数据信息(作者提供的图像)

第一只股票的交易数据为 3.7 MB,有超过 276 千条记录。

在两个文件(订单簿和交易簿)中,并不是每个时间窗口都有 600 秒的交易点。换句话说,在 10 分钟的时间间隔内,特定的时间桶可能仅在某些秒数上有交易或出价。这让我们在两个文件中面对稀疏数据,其中某些秒数是缺失的。我们应该通过向前填充所有缺失秒的所有列来解决它。虽然 Pandas 允许我们向前填充,但 cuDF 没有这个功能。因此,我们将在 Pandas 中进行向前填充,并从填充后的 Pandas 数据帧重新创建 cuDF。我们对此感到懊悔,因为本博客的核心目标是展示 cuDF 如何优于 Pandas。我曾多次在过去的事情中进行了核查,但据我所知,我无法找到在 Pandas 中实现的 cuDF 方法。因此,我们可以按以下方式进行向前填充[4]:

# 前向填充数据
def ffill(df, df_name="order"):
  
   # 前向填充
   df_pandas = df.set_index(['time_id', 'seconds_in_bucket'])

   if df_name == "order":
     df_pandas = df_pandas.reindex(pd.MultiIndex.from_product([df_pandas.index.levels[0], np.arange(0,600)], names = ['time_id', 'seconds_in_bucket']), method='ffill')
     df_pandas = df_pandas.reset_index()
    
   else:
     df_pandas = df_pandas.reindex(pd.MultiIndex.from_product([df_pandas.index.levels[0], np.arange(0,600)], names = ['time_id', 'seconds_in_bucket']))
     # 用 0 填充 nan 值
     df_pandas = df_pandas.fillna(0)
     df_pandas = df_pandas.reset_index()   

   # 转换为 cudf 数据帧
   df_cudf = cudf.DataFrame.from_pandas(df_pandas)

   return df_pandas, df_cudf

让我们以订单数据为例,看看它是如何处理的:

# 前向填充订单数据帧
expanded_df_pd_order, expanded_df_cudf_order = ffill(df_pd_order, df_name="order")
display(expanded_df_cudf_order.head())

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第11张 图10:前向填充订单数据(作者提供的图像)

与展示 7 中的数据不同,展示 10 中的前向填充数据在时间桶“5”中具有所有 600 秒,从 0 到 599,含。我们也会对交易数据执行相同的操作。

2. 合并数据帧

我们有两个数据集,订单和交易,两者都是前向填充的。两个数据集都用 Pandas 和 cuDF 框架表示。接下来,我们将在 time_id 和 seconds_in_buckets 上合并订单和交易数据集。

def merge_dataframes(df1, df2, dframe=0):

   print("MERGING DATA FRAMES", "\n")
  
   if dframe == 0:
     df_type = "Pandas"
   else:
     df_type = "cuDF"

   # 合并数据帧
   print(f"合并 {df_type} 数据帧..", "\n")
   start = time.time()
   df = df1.merge(df2, how="left", on=["time_id", "seconds_in_bucket"], sort=True)
   end = time.time()
   elapsed_time = round(end-start, 3)
   print(f"对于 {df_type} 数据帧:\n 开始时间:{start} \n 结束时间:{end} \n 经过时间:{elapsed_time} \n")

   return df, elapsed_time

cuDF将执行以下命令:

# 合并cuDF订单和交易数据框
df_cudf, cudf_merge_time = merge_dataframes(expanded_df_cudf_order, expanded_df_cudf_trade, dframe=1)
display(df_cudf.head())

expanded_df_cudf_trade是前向填充的交易数据,与expanded_df_pd_order或expanded_df_cudf_order的获得方式相同。合并操作将创建一个组合数据框,如下所示:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第12张 Exhibit-11: 合并数据框(作者提供的图片)

两个数据集的所有列都合并成一个。Pandas数据框也会重复合并操作。

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第13张

3. 更改数据类型

我们想要更改一些列的数据类型以减少内存空间并增加计算速度。

# 进行数据类型更改
def change_dtype(df, dframe=0):

   print("CHANGING DTYPES", "\n")

   convert_dict = {"time_id": "int16",
                   "seconds_in_bucket": "int16",
                   "bid_size1": "int16",
                   "ask_size1": "int16",
                   "bid_size2": "int16",
                   "ask_size2": "int16",
                   "size": "int16",
                   "order_count": "int16"
                   } 

   df = df.astype(convert_dict)

   return df, dframe

当我们执行以下命令时:

# 为cuDF数据框更改数据类型
df_cudf, _ = change_dtype(df_cudf)
display(df_cudf.info())

我们得到以下输出:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第14张 Exhibit-12: 更改数据类型(作者提供的图片)

如果没有进行数据类型转换,展示 12 中的数据将使用更多的内存空间。它仍然有78.9 MB,但这是在前向填充和合并操作之后得到的,这导致了13个列和230万条记录。

我们对Pandas DF和cuDF的每个特征工程任务都进行了完善。这里,我们只展示了cuDF的一个示例。

4. 获取唯一的时间ID

在本节中,我们将使用unique方法提取time_ids。

# 获取time_id列中的唯一值并将它们放入列表中
def get_unique_timeids(df, dframe=0):

   global time_ids

   print("GETTING UNIQUE VALUES", "\n")

   # 获取唯一的time_ids
   if dframe == 0:
     print(f"从Pandas数据框中获取已排序的唯一time_ids..", "\n")
     start = time.time()
     time_ids = sorted(df['time_id'].unique().tolist())
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"来自Pandas数据框的唯一time_ids: \n 开始时间: {start} \n 结束时间: {end} \n 经过时间: {elapsed_time} \n")

   else:
     print(f"从cuDF数据框中获取已排序的唯一time_ids..", "\n")
     start = time.time()
     time_ids = sorted(df['time_id'].unique().to_arrow().to_pylist())
     end = time.time()
     elapsed_time = round(end-start, 3)
     print(f"来自cuDF数据框的唯一time_ids: \n 开始时间: {start} \n 结束时间: {end} \n 经过时间: {elapsed_time} \n")

   print(f"{len(time_ids)} 时间桶: \n {time_ids[:10]}...")
   print("\n")

   return df, time_ids

以上代码将从Pandas DF和cuDF中获取唯一的time_ids。

# 从cuDF数据框中获取time_ids
time_ids = get_unique_timeids(df_cudf_order, dframe=1)

cuDF的输出如下:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第15张 Exhibit-13: 获取唯一的时间ID(作者提供的图片)

5. 检查空值

接下来,我们将检查数据框中的空值。

# 检查 df 的空值
def check_null_values(df, dframe=0):

   print("检查空值", "\n")

   print("检查数据框的空值..", "\n")
   display(df.isna().values.any())
   display(df.isnull().sum())

   return df, dframe

以 cuDF 为例检查空值:

# 检查 cuDF 数据框的空值
df_cudf, _ = check_null_values(df_cudf, dframe=0)

输出结果为:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第16张 Exhibit-14: 检查空值(作者提供的图片)

6. 添加列

我们希望创建更多的特征,因此添加一些列。

# 添加列
def add_column(df, dframe=0):

   print("添加列", "\n")

   # 计算 WAP
   df['wap1'] = (df['bid_price1'] * df['ask_size1'] + df['ask_price1'] * df['bid_size1']) / (df['bid_size1'] + df['ask_size1'])
   df['wap2'] = (df['bid_price2'] * df['ask_size2'] + df['ask_price2'] * df['bid_size2']) / (df['bid_size2'] + df['ask_size2'])

   # 计算订单量
   df['bid1_volume'] = df['bid_price1'] * df['bid_size1']
   df['bid2_volume'] = df['bid_price2'] * df['bid_size2']
   df['ask1_volume'] = df['ask_price1'] * df['ask_size1']
   df['ask2_volume'] = df['ask_price2'] * df['ask_size2']

   # 计算量失衡
   df['imbalance'] = np.absolute((df['ask_size1'] + df['ask_size2']) - (df['bid_size1'] + df['bid_size2']))

   # 计算交易量失衡
   df['volume_imbalance'] = np.absolute((df['bid_price1'] * df['bid_size1']) - (df['ask_price1'] * df['ask_size1']))

   return df, dframe

这将创建新的特征,例如加权平均价格(wap1 和 wap2)、订单量和量失衡。通过执行以下操作,将向数据框添加八列:

# 在 cuDF 数据框中添加一列
df_cudf, _ = add_column(df_cudf)
display(df_cudf.head())

结果如下:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第17张 Exhibit-15: 添加列和特征(作者提供的图片)

7. 删除列

我们决定通过删除它们的列来摆脱两个特征 wap1 和 wap2:

# 删除列
def drop_column(df, dframe=0):

   print("删除列", "\n")

   df.drop(columns=['wap1', 'wap2'], inplace=True)

   return df, dframe

删除列的实现方式如下:

# 在 cuDF 数据框中删除列
df_cudf, _ = drop_column(df_cudf)
display(df_cudf.head())

这样,我们得到的数据框中 wap1 和 wap2 列就被删除了!

8. 按组计算统计量

接下来,我们按 time_id 计算某些特征的平均值、中位数、最大值、最小值、标准差和总和。为此,我们将使用 groupby 和 agg 方法。

# 计算所选特征的统计量
def calc_agg_stats(df, dframe=0):

   print("计算统计量", "\n")

   # 需要进行的统计计算
   operations = ["mean", "median", "max", "min", "std", "sum"]

   # 将进行统计计算的特征
   features_list = ["bid1_volume", "bid2_volume", "ask1_volume", "ask2_volume"]

   # 创建一个字典来存储特征-计算对
   stats_dict = defaultdict(list)
   for feature in features_list:
       stats_dict[feature].extend(operations)

   # 计算聚合统计量
   df_stats = df.groupby('time_id', as_index=False, sort=True).agg(stats_dict)

   return df, df_stats

我们创建了一个名为features_list的列表,用于指定将执行数学计算的特征。

#在cuDF数据框中选择特定特征进行统计计算
_, df_cudf_stats = calc_agg_stats(df_cudf)
display(df_cudf_stats.head())

作为回报,我们得到了以下输出:使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第18张 Exhibit-16: 计算统计信息(作者提供的图像)

返回的表格是一个新的数据框。我们应该将其与原始数据框(df_cudf)合并。我们将通过Pandas完成:

#将数据框与统计信息合并
def merge_dataframes_2(df, dframe=0):

   if dframe == 0:
     df = df.merge(df_pd_stats, how="left", on="time_id", sort=True)
  
   else:
     df = df.to_pandas()
     df = df.merge(df_pd_stats, how="left", on="time_id", sort=True)
     df = cudf.DataFrame.from_pandas(df)

   return df, dframe


#合并cuDF数据框
df_cudf, _ = merge_dataframes_2(df_cudf, dframe=1)
display(df_cudf.head())

上面的代码将df_pd_stats和df_pd放入一个数据框中,并将其保存为df_cudf。

像往常一样,我们重复了Pandas的相同任务。

接下来的步骤是计算两个列之间的相关性:

#计算两个选定特征之间的相关性
def calc_corr(df, dframe=0):

 correlation = df[["bid1_volume", "ask1_volume"]].corr()
 print(f"Correlation between 'bid1_volume' and 'ask1_volume' is {correlation} \n")

 return df, correlation

此代码

#在cuDF数据框中计算相关性
_ = calc_corr(df_cudf)

将返回以下输出:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第19张 Exhibit-17: 计算两个特征之间的相关性(作者提供的图像)

9. 重命名列

为了消除任何混淆,我们应该重命名两个列。

#重命名列
def rename_cols(df, dframe=0):

   print("重命名列", "\n")

   df = df.rename(columns={"imbalance": "volume_imbalance", "volume_imbalance": "trade_volume_imbalance"})

   return df, dframe

列imbalance和volume_imbalance将分别重命名为volume_imbalance和trade_volume_imbalance。

10. 列分箱

我们要进行的另一个数据操作是对bid1_volume进行分箱,并将分箱存储在一个新列中。

#对选定的列进行分箱
def bin_col(df, dframe=0):

   print("列分箱", "\n")

   if dframe == 0:
     df['bid1_volume_cut'] = pd.cut(df["bid1_volume"], bins=5, labels=["very high", "high", "average", "low", "very low"], ordered=True)

   else:
     df['bid1_volume_cut'] = cudf.cut(df["bid1_volume"], bins=5, labels=["very high", "high", "average", "low", "very low"], ordered=True)

   return df, dframe

通过运行以下行

#在cuDF数据框中对选定的列进行分箱
df_cudf, _ = bin_col(df_cudf, dframe=1)
display(df_cudf.head())

我们将得到一个数据框作为输出,我们可以看到其中的一部分如下所示:

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第20张 Exhibit-18: 列分箱(作者提供的图像)

11. 显示数据框

特征工程步骤完成后,我们可以呈现数据框。本节包括三个操作:显示数据框,获取关于它的信息并描述它。

# 显示数据框
def display_df(df, dframe=0):

   print("显示数据框", "\n")

   display(df.head())
   print("\n")

   return df, dframe


# 显示数据框信息
def display_info(df, dframe=0):

   print("显示数据框信息", "\n")

   display(df.info())
   print("\n")

   return df, dframe


# 描述数据框
def describe_df(df, dframe=0):

   print("描述数据框", "\n")

   display(df.describe())
   print("\n")

   return df, dframe

以下代码将完成这三个任务:

# 显示cuDF数据框和信息
_, _ = display_df(df_cudf, dframe=1)
_, _ = display_info(df_cudf, dframe=1)
_, _ = describe_df(df_cudf, dframe=1)

我们已经完成了特征工程。

单次运行执行

总的来说,我们的特征工程工作主要集中在以下几个任务上:

  1. 加载数据框
  2. 合并数据框
  3. 更改数据类型
  4. 获取唯一的time_ids
  5. 检查空值
  6. 添加列
  7. 删除列
  8. 计算统计量
  9. 计算相关性
  10. 重命名列
  11. 将列分成区间
  12. 显示数据框
  13. 显示数据信息
  14. 描述数据框

虽然总共有13个任务,但我们在这里将“计算相关性”作为一个单独的任务提出来。现在,我们想要依次在单次运行中运行这些任务,如下所示:

def run_and_report():

   # 创建一个字典来存储经过的时间
   time_dict = defaultdict(list)

   # 列出要执行的操作
   labels = ["更改数据类型",
             "获取唯一的time_ids",
             "检查空值",
             "添加列",
             "删除列",
             "计算聚合统计",
             "合并数据框",
             "重命名列",
             "将列分成区间",
             "计算相关性",
             "显示数据框",
             "显示数据信息",
             "描述数据框"]

   # 加载pandas order数据框并计算时间
   df_pd_order, pd_order_loading_time = load_dataframe(order_files, dframe=0)
   print("-"*150, "\n")

   # 加载cuDF book数据框并计算时间
   df_cudf_order, cudf_order_loading_time = load_dataframe(order_files, dframe=1)
   print("-"*150, "\n")

   # 加载pandas trade数据框并计算时间
   df_pd_trade, pd_trade_loading_time = load_dataframe(trade_files, dframe=0)
   print("-"*150, "\n")

   # 加载cuDF trade数据框并计算时间
   df_cudf_trade, cudf_trade_loading_time = load_dataframe(trade_files, dframe=1)
   print("-"*150, "\n")

   # 从Pandas数据框中获取time_ids
   _, time_ids = get_unique_timeids(df_pd_order, dframe=0)
   print("-"*150, "\n")

   # 从cuDF数据框中获取time_ids
   _, time_ids = get_unique_timeids(df_cudf_order, dframe=1)
   print("-"*150, "\n")

   # 存储加载时间
   time_dict["加载数据框"].extend([pd_order_loading_time, cudf_order_loading_time])

   # 对order数据框进行前向填充
   expanded_df_pd_order, expanded_df_cudf_order = ffill(df_pd_order, df_name="order")

   # 对trade数据框进行前向填充
   expanded_df_pd_trade, expanded_df_cudf_trade = ffill(df_pd_trade, df_name="trade")

   # 合并pandas order和trade数据框
   df_pd, pd_merge_time = merge_dataframes(expanded_df_pd_order, expanded_df_pd_trade, dframe=0)
   print("-"*150, "\n")

   # 合并cuDF order和trade数据框
   df_cudf, cudf_merge_time = merge_dataframes(expanded_df_cudf_order, expanded_df_cudf_trade, dframe=1)
   print("-"*150, "\n")

   # 存储合并时间
   time_dict["合并数据框"].extend([pd_merge_time, cudf_merge_time])

   # 应用函数
   functions = [change_dtype,
                get_unique_timeids,
                check_null_values,
                add_column,
                drop_column,
                calc_agg_stats,
                merge_dataframes_2,
                rename_cols,
                bin_col,
                calc_corr,
                display_df,
                display_info,
                describe_df]

   for label, function in enumerate(functions):

     # pandas函数
     start_pd = time.time()
     df_pd, x = function(df_pd, dframe=0)
     end_pd = time.time()
     elapsed_time_for_pd = round(end_pd-start_pd, 3)
     print(f"对于Pandas数据框:\n 开始时间:{start_pd} \n 结束时间:{end_pd} \n 经过时间:{elapsed_time_for_pd} \n")     

     # cuDF函数
     start_cudf = time.time()
     df_cudf, x = function(df_cudf, dframe=1)
     end_cudf = time.time()
     elapsed_time_for_cudf = round(end_cudf-start_cudf, 3)
     print(f"对于cuDF数据框:\n 开始时间:{start_cudf} \n 结束时间:{end_cudf} \n 经过时间:{elapsed_time_for_cudf} \n")
     print("-"*150, "\n")

     # 存储经过的时间
     time_dict[labels[label]].extend([elapsed_time_for_pd, elapsed_time_for_cudf])

   # 删除不需要的时间
   del time_dict["合并数据框"]
   labels.remove("合并数据框")
   labels.insert(0, "合并数据框")
   labels.insert(0, "加载数据框")

   print(time_dict)

   return time_dict, labels, df_pd, df_cudf

run_and_report函数将以单个执行命令的完整报告方式给出与以前相同的输出。它将在Pandas和cuDF上执行14个任务,并记录它们对于两个数据框所需的时间。

time_dict,labels,df_pd,df_cudf = run_and_report()

我们可能需要运行多个周期以更明显地看到两个数据库之间的相对性能差异。

最终评估

如果我们多次运行run_and_report,例如在轮次中,我们可以更好地了解Pandas和cuDF之间性能差异。因此,我们将轮次设置为30。然后,我们记录每个操作,轮次和数据库的所有时间持续时间,最后评估结果:

def calc_exec_times():

   exec_times_by_round = {}

   # 计算每轮操作的执行时间
   for round_no in range(1, ROUNDS+1):
     # cycle_no += 1
     time_dict, labels, df_pd, df_cudf = run_and_report()
     exec_times_by_round[round_no] = time_dict

   print("exec_times_by_round: ", exec_times_by_round)

   # 按数据框获取每个操作的持续时间
   pd_summary, cudf_summary = get_statistics(exec_times_by_round, labels)

   # 按数据框获取每轮的持续时间
   round_total = get_total(exec_times_by_round)
   print("\n"*3)

   # 绘制持续时间
   plt.style.use('dark_background')
   X_axis = np.arange(len(labels))

   # 绘制操作的平均持续时间
   plot_avg_by_df(pd_summary, cudf_summary, labels, X_axis)
   print("\n"*3)

   # 绘制持续时间总数和差异
   plot_diff_by_df(pd_summary, cudf_summary, labels)
   print("\n"*3)

   # 绘制持续时间总数和差异
   plot_total_by_df(round_total)
   print("\n"*3)

calc_exec_times函数执行一些任务。首先,它调用get_statistics来获取每个数据库在30轮中的“操作的平均和总时间持续时间”。

def get_statistics(exec_times_by_round, labels):

   # 按数据框分离并存储持续时间统计信息
   pd_performance = defaultdict(list)
   cudf_performance = defaultdict(list)

   # 获取并存储每个操作的持续时间
   for label in labels:
     for key, values in exec_times_by_round.items():

       pd_performance[label].append(values[label][0])
       cudf_performance[label].append(values[label][1])

   print("pd_performance: ", pd_performance)
   print("cudf_performance: ", cudf_performance)

   # 计算每个数据框的每个操作的平均和总持续时间
   pd_summary = {key: [round(sum(value), 3), round(np.average(value), 3)] for key, value in pd_performance.items()}
   cudf_summary = {key: [round(sum(value), 3), round(np.average(value), 3)] for key, value in cudf_performance.items()}

   print("pd_summary: ", pd_summary)
   print("cudf_summary: ", cudf_summary) 

   return pd_summary, cudf_summary

接下来,它计算每个数据框的“每轮总持续时间”。

def get_total(exec_times_by_round):

   def get_round_total(stat_list):

     # 获取每个数据框每轮的总持续时间
     pd_round_total = round(sum([x[0] for x in stat_list]), 3)
     cudf_round_total = round(sum([x[1] for x in stat_list]), 3)

     return pd_round_total, cudf_round_total

   # 收集每轮的总持续时间
   for key, value in exec_times_by_round.items():
     round_total = {key: get_round_total(list(value.values())) for key, value in exec_times_by_round.items()}

   print("round_total", round_total)

   return round_total

最后,它绘制结果。这里,第一个图是两个库的“按操作的平均持续时间”。

def plot_avg_by_df(pd_summary, cudf_summary, labels, X_axis):

   # 图形大小
   fig = plt.subplots(figsize =(10, 4))

   # 每个数据框的操作平均持续时间
   pd_avg = [value[1] for key, value in pd_summary.items()]
   cudf_avg = [value[1] for key, value in cudf_summary.items()]

   plt.bar(X_axis - 0.2, pd_avg, 0.4, color = '#5A5AAF', label = 'pandas', align='center')
   plt.bar(X_axis + 0.2, cudf_avg, 0.4, color = '#C8C8FF', label = 'cuDF', align='center')

   plt.xticks(X_axis, labels, fontsize=9, rotation=90)
   plt.yticks(fontsize=9)
   plt.xlabel("操作", fontsize=10)
   plt.ylabel("平均持续时间(秒)", fontsize=10)
   plt.grid(axis='y', color="#E4E4E4", alpha=0.5)
   plt.title("按数据框绘制操作的平均持续时间", fontsize=12)
   plt.legend()
   plt.show()

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第21张 图19:Pandas数据框和cuDF的操作平均持续时间(作者提供的图片)

第二个图是“总持续时间按操作”,显示每个任务在所有30个回合中所花费的总时间。

def plot_diff_by_df(pd_summary, cudf_summary, labels):

   # 图像大小
   fig = plt.subplots(figsize =(12, 6))

   # 每个数据框操作的总持续时间
   pd_total = [value[0] for key, value in pd_summary.items()]
   cudf_total = [value[0] for key, value in cudf_summary.items()]

   # 每个数据框操作的持续时间差
   diff = [x[0]-x[1] for x in zip(pd_total, cudf_total)]

   # 设置条形图的宽度
   barWidth = 0.25

   # 设置条形图在X轴上的位置
   br1 = np.arange(len(labels))
   br2 = [x + barWidth for x in br1]
   br3 = [x + barWidth for x in br2]

   plt.bar(br1, pd_total, barWidth, color = '#5A5AAF', label = 'pandas', align='center')
   plt.bar(br2, cudf_total, barWidth, color = '#C8C8FF', label = 'cuDF', align='center')
   plt.bar(br3, diff, barWidth, color = '#AA1E1E', label = '差异', align='center')

   plt.xticks([r + barWidth for r in range(len(labels))], labels, fontsize=9, rotation=90)
   plt.yticks(fontsize=9)
   plt.xlabel("操作", fontsize=10)
   plt.ylabel("总持续时间(秒)", fontsize=10)
   plt.grid(axis='y', color="#E4E4E4", alpha=0.5)
   plt.title("数据框的操作总持续时间", fontsize=12)
   plt.legend()
   plt.show()

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第22张 图20:Pandas数据框和cuDF的操作在30轮中的总持续时间(作者提供的图片)

最后一个图是“每轮的总持续时间”,显示所有操作在每一轮中一起花费的总时间。

def plot_total_by_df(round_total):

    # 图像大小
   fig = plt.subplots(figsize =(10, 6))

   X_axis = np.arange(1, ROUNDS+1)

   # 每个数据框操作的每轮总持续时间
   pd_round_total = [value[0] for key, value in round_total.items()]
   cudf_round_total = [value[1] for key, value in round_total.items()]

   # 每个数据框操作的每轮持续时间差
   diff = [x[0]-x[1] for x in zip(pd_round_total, cudf_round_total)]

   plt.plot(X_axis, pd_round_total, linestyle="-", linewidth=3, color = '#5A5AAF', label = "pandas")
   plt.plot(X_axis, cudf_round_total, linestyle="-", linewidth=3, color = '#B0B05A', label = "cuDF")
   plt.plot(X_axis, diff, linestyle="--", linewidth=3, color = '#AA1E1E', label = "差异")

   plt.xticks(X_axis, fontsize=9)
   plt.yticks(fontsize=9)
   plt.xlabel("轮数", fontsize=10)
   plt.ylabel("总持续时间(秒)", fontsize=10)
   plt.grid(axis='y', color="#E4E4E4", alpha=0.5)
   plt.title("每轮的总持续时间", fontsize=12)
   plt.legend()
   plt.show()

使用 RAPIDS cuDF 利用 GPU 进行特征工程 数据科学 第23张 图21:Pandas数据框和cuDF每轮的所有操作的总持续时间(作者提供的图片)

尽管我们没有涵盖数据集上执行的所有特征工程任务,但它们与我们展示的任务相同或相似。通过单独解释14个操作,我们试图记录Pandas数据框和cuDF的相对性能,并实现可重复性。

除了相关性计算和数据框显示之外,cuDF在所有情况下都胜过Pandas。在诸如groupby、merge、agg和describe等复杂任务中,这种性能优势变得更加显著。另一个点是,随着回合数的增加,Pandas DF变得疲劳,而cuDF则遵循更稳定的模式。

回想一下,我们只是以一个股票为例进行了回顾。如果我们处理所有112支股票,我们可能会看到一个更大的性能差距,有利于使用cuDF。如果股票人口增加到数百个,cuDF的性能甚至可以更加显著。在大数据的情况下,如果可以执行并行任务,那么一种分布式框架,例如Dask-cuDF,它将并行计算扩展到cuDF GPU DataFrames,可能是正确的工具。

参考资料

[1] RAPIDS 定义,https://www.heavy.ai/technical-glossary/rapids

[2] 10分钟了解cuDF和Dask-cuDF,https://docs.rapids.ai/api/cudf/stable/user_guide/10min/

[3] Optiver 实现波动率预测,https://www.kaggle.com/competitions/optiver-realized-volatility-prediction/data

[4] 前向填充图书数据,https://www.kaggle.com/competitions/optiver-realized-volatility-prediction/discussion/251277 Hasan Serdar Altan 是数据科学家和AWS云架构师助理。

Leave a Reply

Your email address will not be published. Required fields are marked *