介绍
几个月前,我们介绍了时间序列变换器,它是将传统的Transformer(Vaswani等人,2017年)应用于预测,并展示了单变量概率预测任务的示例(即单独预测每个时间序列的一维分布)。在本文中,我们介绍了Informer模型(Zhou, Haoyi等人,2021年),这是AAAI21最佳论文,现在已经在🤗 Transformers中可用。我们将展示如何使用Informer模型进行多变量概率预测任务,即预测未来时间序列目标值的向量分布。需要注意的是,这也适用于传统的时间序列变换器模型。
多变量概率时间序列预测
就概率预测的建模而言,当处理多变量时间序列时,Transformer/Informer不需要进行任何更改。在单变量和多变量设置中,模型将接收一个向量序列,因此唯一的变化在于输出或发射方面。
对于高维数据的完整条件分布建模可能会导致计算开销过大,因此方法会采用一些分布的近似方法,最简单的方法是将数据建模为来自同一族分布的独立分布,或者对完整协方差进行低秩近似等。在这里,我们只采用独立(或对角)发射,这在我们实现的分布族中是支持的。
Informer – 内部原理
Informer基于传统的Transformer(Vaswani等人,2017年),引入了两个主要改进。为了理解这些改进,让我们回顾一下传统Transformer的缺点:
- 经典自注意力的二次计算:传统Transformer的计算复杂度为O(T^2D),其中T是时间序列长度,D是隐藏状态的维度。对于长序列时间序列预测(也称为LSTF问题),这可能会导致计算开销非常大。为了解决这个问题,Informer采用了一种称为ProbSparse注意力的新的自注意机制,其时间和空间复杂度为O(T log T)。
- 堆叠层时的内存瓶颈:当堆叠N个编码器/解码器层时,传统Transformer的内存使用量为O(NT^2),这限制了模型处理长序列的能力。Informer使用了一种称为Distilling操作的方法,将层之间的输入大小减小到其一半。通过这样做,可以将整体内存使用量减小为O(N⋅T log T)。
正如您所看到的,Informer模型的动机类似于Longformer(Beltagy等人,2020年),Sparse Transformer(Child等人,2019年)和其他自然语言处理论文,用于减少自注意机制的二次复杂度,特别是在输入序列较长时。现在,让我们深入了解ProbSparse注意力和Distilling操作,并附带代码示例。
ProbSparse注意力
ProbSparse的主要思想是经典自注意力分数形成了一个长尾分布,其中“活跃”的查询位于“头部”分数中,而“懒惰”的查询位于“尾部”区域中。通过“活跃”查询,我们指的是一个查询qi,使得点积⟨qi, ki⟩对主要注意力产生贡献,而“懒惰”的查询形成的点积生成的注意力是微不足道的。这里,qi和ki分别是Q和K注意力矩阵中的第i行。
在理解“活跃”和“懒惰”查询的思想之后,ProbSparse注意力选择“活跃”查询,并创建一个缩减的查询矩阵Qreduced,用于在O(T log T)的时间内计算注意力权重。让我们通过代码示例更详细地了解这一点。
回顾一下规范的自注意力公式:
注意力(Q,K,V)= softmax(QKT / dk)V
其中 Q ∈ R LQ × d,K ∈ R LK × d,V ∈ R LV × d。请注意,在实践中,查询和键的输入长度在自注意力计算中通常是相等的,即 LQ = LK = T,其中 T 是时间序列的长度。因此,QKT 乘法的计算复杂度为 O(T^2 · d)。在 ProbSparse 注意力中,我们的目标是创建一个新的 Qreduce 矩阵,并定义如下:
ProbSparseAttention(Q,K,V)= softmax(QreduceKT / dk)V
其中 Qreduce 矩阵仅选择 Top u 个“活跃”查询。这里,u = c · log LQ,c 被称为 ProbSparse 注意力的采样因子超参数。由于 Qreduce 仅选择了 Top u 个查询,其大小为 c · log LQ × d,因此乘法 QreduceKT 仅需要 O(LK log LQ) = O(T log T)。
这很好!但是我们如何选择这些“活跃”查询来创建 Qreduce 呢?让我们定义查询稀疏度量。
查询稀疏度量
查询稀疏度量 M(qi, K) 用于选择 Q 中的 Top u 个“活跃”查询 qi,以创建 Qreduce。理论上,主导的 ⟨qi, ki⟩ 对鼓励“活跃”查询 qi 的概率分布远离均匀分布,如下图所示。因此,实际查询分布与均匀分布之间的 KL 散度被用来定义稀疏度量。
在实践中,这个度量被定义为:
M(qi, K) = max_j (qikjT / √d – 1/Lk ∑_{j=1}^{Lk} qikjT / √d)
这里需要理解的重要事情是,当 M ( q i , K ) M(q_i, K) M ( q i , K ) 更大时,查询 q i q_i q i 应该在 Q r e d u c e Q_{reduce} Q r e d u c e 中,反之亦然。
但是我们如何以非二次时间计算术语 q i k j T q_ik_j^T q i k j T 呢?请记住,大多数点积 ⟨ q i , k i ⟩ \langle q_i,k_i \rangle ⟨ q i , k i ⟩ 生成的是平凡的注意力(即长尾分布属性),因此随机从 K K K 中抽样一部分键,将其称为代码中的 K_sample
即可。
现在,我们准备看一下 probsparse_attention
的代码:
from torch import nn
import math
def probsparse_attention(query_states, key_states, value_states, sampling_factor=5):
"""
计算稀疏自注意力。
输入形状:Batch x Time x Channel
注意额外的 `sampling_factor` 输入。
"""
# 使用对数获得输入大小
L_K = key_states.size(1)
L_Q = query_states.size(1)
log_L_K = np.ceil(np.log1p(L_K)).astype("int").item()
log_L_Q = np.ceil(np.log1p(L_Q)).astype("int").item()
# 计算要从 K 中切片的样本子集并创建 Q_K_sample
U_part = min(sampling_factor * L_Q * log_L_K, L_K)
# 创建 Q_K_sample(稀疏度测量中的 q_i * k_j^T 项)
index_sample = torch.randint(0, L_K, (U_part,))
K_sample = key_states[:, index_sample, :]
Q_K_sample = torch.bmm(query_states, K_sample.transpose(1, 2))
# 使用 Q_K_sample 计算查询的稀疏度测量
M = Q_K_sample.max(dim=-1)[0] - torch.div(Q_K_sample.sum(dim=-1), L_K)
# 计算 u 以找到稀疏度测量下的 Top-u 查询
u = min(sampling_factor * log_L_Q, L_Q)
M_top = M.topk(u, sorted=False)[1]
# 将查询_states[:, M_top] 计算为 Q_reduce
dim_for_slice = torch.arange(query_states.size(0)).unsqueeze(-1)
Q_reduce = query_states[dim_for_slice, M_top] # 大小:c*log_L_Q x channel
# 现在,与标准方法相同
d_k = query_states.size(-1)
attn_scores = torch.bmm(Q_reduce, key_states.transpose(-2, -1)) # Q_reduce x K^T
attn_scores = attn_scores / math.sqrt(d_k)
attn_probs = nn.functional.softmax(attn_scores, dim=-1)
attn_output = torch.bmm(attn_probs, value_states)
return attn_output, attn_scores
请注意,在实现中,U p a r t U_{part} U p a r t 在计算中包含了 L Q L_Q L Q ,用于稳定性问题(有关更多信息,请参见此讨论)。
我们做到了!请注意,这只是 probsparse_attention
的部分实现,完整的实现可以在🤗 Transformers 中找到。
蒸馏
由于 ProbSparse 自注意力,编码器的特征图中存在一些可以去除的冗余。因此,蒸馏操作用于将编码器层之间的输入大小减少到其一半的切片,从理论上去除这种冗余。实际上,Informer 的 “蒸馏” 操作只是在每个编码器层之间添加了 1D 卷积层和最大池化。假设 X n X_n X n 是第 n n n 个编码器层的输出,则蒸馏操作定义如下:
X n + 1 = MaxPool ( ELU ( Conv1d ( X n ) ) X_{n+1} = \textrm{MaxPool} ( \textrm{ELU}(\textrm{Conv1d}(X_n)) X n + 1 = MaxPool ( ELU ( Conv1d ( X n ) )
让我们来看看代码:
from torch import nn
# ConvLayer 是一个应用 ELU 和 MaxPool1d 的类的前向传递
def informer_encoder_forward(x_input, num_encoder_layers=3, distil=True):
# 初始化卷积层
if distil:
conv_layers = nn.ModuleList([ConvLayer() for _ in range(num_encoder_layers - 1)])
conv_layers.append(None)
else:
conv_layers = [None] * num_encoder_layers
# 在每个编码器层之间应用 conv_layer
for encoder_layer, conv_layer in zip(encoder_layers, conv_layers):
output = encoder_layer(x_input)
if conv_layer is not None:
output = conv_layer(loutput)
return output
通过将每层的输入减少一半,我们得到了一个内存使用量为 O(N⋅T logT) 而不是 O(N⋅T^2),其中 N 是编码器/解码器层数。这正是我们想要的!
现在,Informer 模型在 🤗 Transformers 库中可用,并简称为 InformerModel
。在下面的章节中,我们将展示如何在自定义的多变量时间序列数据集上训练该模型。
设置环境
首先,让我们安装必要的库:🤗 Transformers、🤗 Datasets、🤗 Evaluate、🤗 Accelerate 和 GluonTS。
正如我们将要展示的,GluonTS 将用于将数据转换为特征,并创建适当的训练、验证和测试批次。
!pip install -q transformers datasets evaluate accelerate gluonts ujson
加载数据集
在本博文中,我们将使用 Hugging Face Hub 上提供的 traffic_hourly
数据集。该数据集包含 Lai 等人 (2017) 使用的 San Francisco Traffic 数据集。它包含了从 2015 年到 2016 年在旧金山湾区高速公路上的 862 个小时级别时间序列,显示了道路占用率在 [0, 1] 范围内的情况。
该数据集是 Monash 时间序列预测库的一部分,该库包含了来自多个领域的时间序列数据集。它可以看作是时间序列预测的 GLUE 基准。
from datasets import load_dataset
dataset = load_dataset("monash_tsf", "traffic_hourly")
如您所见,数据集包含 3 个拆分:训练集、验证集和测试集。
dataset
>>> DatasetDict({
train: Dataset({
features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
num_rows: 862
})
test: Dataset({
features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
num_rows: 862
})
validation: Dataset({
features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
num_rows: 862
})
})
每个示例包含几个关键字,其中 start
和 target
是最重要的。让我们看一下数据集中的第一个时间序列:
train_example = dataset["train"][0]
train_example.keys()
>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])
start
简单地表示时间序列的开始(作为日期时间),而 target
包含时间序列的实际值。
start
将用于将与时间相关的特征添加到时间序列值中,作为模型的额外输入(例如「年份的月份」)。由于我们知道数据的频率是「每小时」,我们知道例如第二个值具有时间戳 2015-01-01 01:00:01
、2015-01-01 02:00:01
等。
print(train_example["start"])
print(len(train_example["target"]))
>>> 2015-01-01 00:00:01
17448
验证集包含与训练集相同的数据,只是时间长度比训练集多一个 prediction_length
。这样我们就可以将模型的预测与真实值进行验证。
测试集与验证集相比,又多了一个 prediction_length
的数据长度(或者相对于训练集来说,多个滚动窗口的长度)。
validation_example = dataset["validation"][0]
validation_example.keys()
>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])
初始值与相应的训练示例完全相同。但是,与训练示例相比,该示例有prediction_length=48
(48小时或2天)额外的值。让我们进行验证。
freq = "1H"
prediction_length = 48
assert len(train_example["target"]) + prediction_length == len(
dataset["validation"][0]["target"]
)
让我们将其可视化:
import matplotlib.pyplot as plt
num_of_samples = 150
figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
validation_example["target"][-num_of_samples - prediction_length :],
color="red",
alpha=0.5,
)
plt.show()
让我们将数据拆分为:
train_dataset = dataset["train"]
test_dataset = dataset["test"]
将start
更新为pd.Period
我们首先要做的是使用数据的freq
将每个时间序列的start
特征转换为pandas的Period
索引:
from functools import lru_cache
import pandas as pd
import numpy as np
@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
return pd.Period(date, freq)
def transform_start_field(batch, freq):
batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
return batch
现在,我们使用datasets
的set_transform
功能来实时地进行此转换:
from functools import partial
train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))
现在,让我们使用GluonTS中的MultivariateGrouper
将数据集转换为多变量时间序列。此grouper将把各个一维时间序列转换为单个二维矩阵。
from gluonts.dataset.multivariate_grouper import MultivariateGrouper
num_of_variates = len(train_dataset)
train_grouper = MultivariateGrouper(max_target_dim=num_of_variates)
test_grouper = MultivariateGrouper(
max_target_dim=num_of_variates,
num_test_dates=len(test_dataset) // num_of_variates, # 滚动测试窗口的数量
)
multi_variate_train_dataset = train_grouper(train_dataset)
multi_variate_test_dataset = test_grouper(test_dataset)
请注意,目标现在是二维的,第一维是变量的数量(时间序列的数量),第二维是时间序列的值(时间维度):
multi_variate_train_example = multi_variate_train_dataset[0]
print("multi_variate_train_example["target"].shape =", multi_variate_train_example["target"].shape)
>>> multi_variate_train_example["target"].shape = (862, 17448)
定义模型
接下来,让我们实例化一个模型。模型将从头开始训练,因此我们不会在这里使用from_pretrained
方法,而是从config
随机初始化模型。
我们对模型指定了一些额外的参数:
prediction_length
(在我们的例子中为48
小时):这是Informer解码器学习预测的时间范围;context_length
:如果没有指定context_length
,模型将将context_length
(编码器的输入)设置为prediction_length
;- 给定频率的
lags
:这些指定了一种有效的“回顾”机制,其中我们将过去的值与当前值连接起来作为附加特征,例如对于Daily
频率,我们可以考虑[1, 7, 30, ...]
的回顾,对于Minute
数据,我们可以考虑[1, 30, 60, 60*24, ...]
等等; - 时间特征的数量:在我们的例子中,将添加
HourOfDay
、DayOfWeek
等特征,因此为5
。
让我们检查GluonTS提供的默认滞后值(”hourly”):
from gluonts.time_feature import get_lags_for_frequency
lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)
>>> [1, 2, 3, 4, 5, 6, 7, 23, 24, 25, 47, 48, 49, 71, 72, 73, 95, 96, 97, 119, 120,
121, 143, 144, 145, 167, 168, 169, 335, 336, 337, 503, 504, 505, 671, 672, 673, 719, 720, 721]
这意味着每个时间步骤将回溯到721小时(约30天),作为额外的特征。然而,生成的特征向量的大小将为len(lags_sequence)*num_of_variates
,对于我们的情况将是34480!这是行不通的,所以我们将使用自己合理的滞后值。
让我们也检查GluonTS提供的默认时间特征:
from gluonts.time_feature import time_features_from_frequency_str
time_features = time_features_from_frequency_str(freq)
print(time_features)
>>> [<function hour_of_day at 0x7f3809539240>, <function day_of_week at 0x7f3809539360>, <function day_of_month at 0x7f3809539480>, <function day_of_year at 0x7f38095395a0>]
在这种情况下,有四个额外的特征,即”hour of day”,”day of week”,”day of month”和”day of year”。这意味着对于每个时间步骤,我们将将这些特征作为标量值添加进去。例如,考虑时间戳2015-01-01 01:00:01
。这四个额外的特征将是:
from pandas.core.arrays.period import period_array
timestamp = pd.Period("2015-01-01 01:00:01", freq=freq)
timestamp_as_index = pd.PeriodIndex(data=period_array([timestamp]))
additional_features = [
(time_feature.__name__, time_feature(timestamp_as_index))
for time_feature in time_features
]
print(dict(additional_features))
>>> {'hour_of_day': array([-0.45652174]), 'day_of_week': array([0.]), 'day_of_month': array([-0.5]), 'day_of_year': array([-0.5])}
请注意,小时和天被编码为GluonTS中的[-0.5, 0.5]
之间的值。有关time_features
的更多信息,请参见此处。除了这四个特征外,我们还将添加一个”age”特征,稍后我们将在数据转换中看到。
我们现在有了定义模型的所有内容:
from transformers import InformerConfig, InformerForPrediction
config = InformerConfig(
# 在多变量设置下,input_size是每个时间步长中时间序列的变量数
input_size=num_of_variates,
# 预测长度:
prediction_length=prediction_length,
# 上下文长度:
context_length=prediction_length * 2,
# 滞后值从1周前复制:
lags_sequence=[1, 24 * 7],
# 我们将添加5个时间特征("hour_of_day", ...和"age"):
num_time_features=len(time_features) + 1,
# informer参数:
dropout=0.1,
encoder_layers=6,
decoder_layers=4,
# 从num_of_variates*len(lags_sequence)+num_time_features将输入投影到:
d_model=64,
)
model = InformerForPrediction(config)
默认情况下,模型使用对角线学生t分布(但这是可配置的):
model.config.distribution_output
>>> 'student_t'
定义转换
接下来,我们定义数据的转换,特别是用于创建时间特征(基于数据集或通用特征)的转换。
再次,我们将使用GluonTS库。我们定义了一系列的转换(与torchvision.transforms.Compose用于图像的转换类似)。它允许我们将多个转换组合成一个单一的流水线。
from gluonts.time_feature import TimeFeature
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
AddAgeFeature,
AddObservedValuesIndicator,
AddTimeFeatures,
AsNumpyArray,
Chain,
ExpectedNumInstanceSampler,
InstanceSplitter,
RemoveFields,
SelectFields,
SetField,
TestSplitSampler,
Transformation,
ValidationSplitSampler,
VstackFeatures,
RenameFields,
)
下面的转换附有注释,以解释它们的功能。从高层次来看,我们将遍历数据集中的每个时间序列,并添加/删除字段或特征:
from transformers import PretrainedConfig
def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
# 创建要稍后删除的字段列表
remove_field_names = []
if config.num_static_real_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_REAL)
if config.num_dynamic_real_features == 0:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
if config.num_static_categorical_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_CAT)
return Chain(
# 步骤1:如果未指定,则删除静态/动态字段
[RemoveFields(field_names=remove_field_names)]
# 步骤2:将数据转换为NumPy格式(可能不需要)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT,
expected_ndim=1,
dtype=int,
)
]
if config.num_static_categorical_features > 0
else []
)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_REAL,
expected_ndim=1,
)
]
if config.num_static_real_features > 0
else []
)
+ [
AsNumpyArray(
field=FieldName.TARGET,
# 对于多变量情况,我们期望有一个额外的维度:
expected_ndim=1 if config.input_size == 1 else 2,
),
# 步骤3:通过将目标值填充为零来处理NaN值,并返回掩码
# 掩码表示观测到的值为true,NaN值为false
# 解码器使用此掩码(对于未观测到的值不会产生损失)
# 参见xxxForPrediction模型中的loss_weights
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
# 步骤4:根据数据集的频率添加时间特征,作为位置编码
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(freq),
pred_length=config.prediction_length,
),
# 步骤5:添加另一个时间特征(只是一个单独的数字)
# 告诉模型时间序列值在生命周期中的位置
# 类似于运行计数器
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=config.prediction_length,
log_scale=True,
),
# 步骤6:将所有时间特征在FEAT_TIME键中垂直堆叠
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if config.num_dynamic_real_features > 0
else []
),
),
# 步骤7:重命名以匹配HuggingFace的名称
RenameFields(
mapping={
FieldName.FEAT_STATIC_CAT: "static_categorical_features",
FieldName.FEAT_STATIC_REAL: "static_real_features",
FieldName.FEAT_TIME: "time_features",
FieldName.TARGET: "values",
FieldName.OBSERVED_VALUES: "observed_mask",
}
),
]
)
定义InstanceSplitter
对于训练/验证/测试,我们接下来创建一个InstanceSplitter
,用于从数据集中采样窗口(因为由于时间和内存的限制,我们无法将整个历史值传递给模型)。
实例分割器从数据中随机抽样大小为context_length
的窗口和大小为prediction_length
的子窗口,并为各自窗口的时间键添加past_
或future_
键。这样可以确保将values
拆分为past_values
和后续的future_values
键,分别用作编码器和解码器的输入。对于time_series_fields
参数中的任何键,也会发生同样的情况:
from gluonts.transform.sampler import InstanceSampler
from typing import Optional
def create_instance_splitter(
config: PretrainedConfig,
mode: str,
train_sampler: Optional[InstanceSampler] = None,
validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
assert mode in ["train", "validation", "test"]
instance_sampler = {
"train": train_sampler
or ExpectedNumInstanceSampler(
num_instances=1.0, min_future=config.prediction_length
),
"validation": validation_sampler
or ValidationSplitSampler(min_future=config.prediction_length),
"test": TestSplitSampler(),
}[mode]
return InstanceSplitter(
target_field="values",
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=instance_sampler,
past_length=config.context_length + max(config.lags_sequence),
future_length=config.prediction_length,
time_series_fields=["time_features", "observed_mask"],
)
创建数据加载器
接下来,是时候创建数据加载器了,它允许我们拥有(输入,输出)对的批次,或者换句话说( past_values
, future_values
)。
from typing import Iterable
import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches
def create_train_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
num_batches_per_epoch: int,
shuffle_buffer_length: Optional[int] = None,
cache_data: bool = True,
**kwargs,
) -> Iterable:
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
"future_values",
"future_observed_mask",
]
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=True)
if cache_data:
transformed_data = Cached(transformed_data)
# 我们初始化一个Training实例
instance_splitter = create_instance_splitter(config, "train")
# 实例分割器将从目标时间序列中随机抽样一个长度为context length + lags + prediction length(在我们的情况下为1)的窗口
# 并返回一个迭代器。
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(
stream, is_train=True
)
return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
def create_test_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
**kwargs,
):
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=False)
# 我们创建一个测试实例分割器,它将仅对编码器采样训练期间看到的最后一个上下文窗口。
instance_sampler = create_instance_splitter(config, "test")
# 我们在测试模式下应用变换
testing_instances = instance_sampler.apply(transformed_data, is_train=False)
return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
train_dataloader = create_train_dataloader(
config=config,
freq=freq,
data=multi_variate_train_dataset,
batch_size=256,
num_batches_per_epoch=100,
num_workers=2,
)
test_dataloader = create_test_dataloader(
config=config,
freq=freq,
data=multi_variate_test_dataset,
batch_size=32,
)
让我们检查第一批:
batch = next(iter(train_dataloader))
for k, v in batch.items():
print(k, v.shape, v.type())
>>> past_time_features torch.Size([256, 264, 5]) torch.FloatTensor
past_values torch.Size([256, 264, 862]) torch.FloatTensor
past_observed_mask torch.Size([256, 264, 862]) torch.FloatTensor
future_time_features torch.Size([256, 48, 5]) torch.FloatTensor
future_values torch.Size([256, 48, 862]) torch.FloatTensor
future_observed_mask torch.Size([256, 48, 862]) torch.FloatTensor
可以看到,我们没有将input_ids
和attention_mask
输入到编码器中(这对于自然语言处理模型来说是一种情况),而是使用past_values
,以及past_observed_mask
,past_time_features
和static_real_features
。
解码器的输入包括future_values
,future_observed_mask
和future_time_features
。可以将future_values
视为自然语言处理中的decoder_input_ids
的等价物。
有关每个输入的详细解释,请参阅文档。
前向传递
让我们使用刚刚创建的批次执行单次前向传递:
# 执行前向传递
outputs = model(
past_values=batch["past_values"],
past_time_features=batch["past_time_features"],
past_observed_mask=batch["past_observed_mask"],
static_categorical_features=batch["static_categorical_features"]
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"]
if config.num_static_real_features > 0
else None,
future_values=batch["future_values"],
future_time_features=batch["future_time_features"],
future_observed_mask=batch["future_observed_mask"],
output_hidden_states=True,
)
print("损失:", outputs.loss.item())
>>> 损失: -1071.5718994140625
请注意,模型返回了损失。由于解码器自动将future_values
向右移动一个位置以获得标签,因此可以计算预测值和标签之间的损失。损失是预测分布与实际值之间的负对数似然,趋向于负无穷大。
还请注意,解码器使用因果掩码来防止查看未来,因为它需要预测的值在future_values
张量中。
训练模型
是时候训练模型了!我们将使用一个标准的PyTorch训练循环。
我们将在这里使用🤗 Accelerate库,它会自动将模型、优化器和数据加载器放置在适当的设备上。
from accelerate import Accelerator
from torch.optim import AdamW
epochs = 25
loss_history = []
accelerator = Accelerator()
device = accelerator.device
model.to(device)
optimizer = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)
model, optimizer, train_dataloader = accelerator.prepare(
model,
optimizer,
train_dataloader,
)
model.train()
for epoch in range(epochs):
for idx, batch in enumerate(train_dataloader):
optimizer.zero_grad()
outputs = model(
static_categorical_features=batch["static_categorical_features"].to(device)
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"].to(device)
if config.num_static_real_features > 0
else None,
past_time_features=batch["past_time_features"].to(device),
past_values=batch["past_values"].to(device),
future_time_features=batch["future_time_features"].to(device),
future_values=batch["future_values"].to(device),
past_observed_mask=batch["past_observed_mask"].to(device),
future_observed_mask=batch["future_observed_mask"].to(device),
)
loss = outputs.loss
# 反向传播
accelerator.backward(loss)
optimizer.step()
loss_history.append(loss.item())
if idx % 100 == 0:
print(loss.item())
>>> -1081.978515625
...
-2877.723876953125
# 查看训练情况
loss_history = np.array(loss_history).reshape(-1)
x = range(loss_history.shape[0])
plt.figure(figsize=(10, 5))
plt.plot(x, loss_history, label="train")
plt.title("损失", fontsize=15)
plt.legend(loc="upper right")
plt.xlabel("迭代次数")
plt.ylabel("负对数似然")
plt.show()
推理
在推理时,建议使用generate()
方法进行自回归生成,类似于NLP模型。
预测涉及从测试实例采样器中获取数据,该采样器将从数据集中的每个时间序列中采样最后一个大小为context_length
的窗口值,并将其传递给模型。请注意,我们将已知的future_time_features
提前传递给解码器。
模型将自回归地从预测分布中采样出一定数量的值,并将其传递回解码器以返回预测输出:
model.eval()
forecasts_ = []
for batch in test_dataloader:
outputs = model.generate(
static_categorical_features=batch["static_categorical_features"].to(device)
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"].to(device)
if config.num_static_real_features > 0
else None,
past_time_features=batch["past_time_features"].to(device),
past_values=batch["past_values"].to(device),
future_time_features=batch["future_time_features"].to(device),
past_observed_mask=batch["past_observed_mask"].to(device),
)
forecasts_.append(outputs.sequences.cpu().numpy())
模型输出一个形状为(batch_size
,number of samples
,prediction length
,input_size
)的张量。
在这种情况下,我们为每个862
个时间序列(对于批次中的每个示例,批次大小为1
,因为我们只有一个多元时间序列)获取下一个48
小时的100
个可能值:
forecasts_[0].shape
>>> (1, 100, 48, 862)
我们将它们垂直堆叠,以获得测试数据集中所有时间序列的预测(以防测试集中有更多时间序列):
forecasts = np.vstack(forecasts_)
print(forecasts.shape)
>>> (1, 100, 48, 862)
我们可以使用基准测试集中的真实样本值来评估生成的预测结果。为此,我们将使用🤗 Evaluate库,其中包括MASE和sMAPE指标。
我们为数据集中的每个时间序列变量计算两个指标:
from evaluate import load
from gluonts.time_feature import get_seasonality
mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")
forecast_median = np.median(forecasts, 1).squeeze(0).T
mase_metrics = []
smape_metrics = []
for item_id, ts in enumerate(test_dataset):
training_data = ts["target"][:-prediction_length]
ground_truth = ts["target"][-prediction_length:]
mase = mase_metric.compute(
predictions=forecast_median[item_id],
references=np.array(ground_truth),
training=np.array(training_data),
periodicity=get_seasonality(freq),
)
mase_metrics.append(mase["mase"])
smape = smape_metric.compute(
predictions=forecast_median[item_id],
references=np.array(ground_truth),
)
smape_metrics.append(smape["smape"])
print(f"MASE: {np.mean(mase_metrics)}")
>>> MASE: 1.1913437728068093
print(f"sMAPE: {np.mean(smape_metrics)}")
>>> sMAPE: 0.5322665081607634
plt.scatter(mase_metrics, smape_metrics, alpha=0.2)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()
为了绘制任何时间序列变量相对于基准测试数据的预测结果,我们定义以下辅助函数:
import matplotlib.dates as mdates
def plot(ts_index, mv_index):
fig, ax = plt.subplots()
index = pd.period_range(
start=multi_variate_test_dataset[ts_index][FieldName.START],
periods=len(multi_variate_test_dataset[ts_index][FieldName.TARGET]),
freq=multi_variate_test_dataset[ts_index][FieldName.START].freq,
).to_timestamp()
ax.xaxis.set_minor_locator(mdates.HourLocator())
ax.plot(
index[-2 * prediction_length :],
multi_variate_test_dataset[ts_index]["target"][mv_index, -2 * prediction_length :],
label="实际",
)
ax.plot(
index[-prediction_length:],
forecasts[ts_index, ..., mv_index].mean(axis=0),
label="平均",
)
ax.fill_between(
index[-prediction_length:],
forecasts[ts_index, ..., mv_index].mean(0)
- forecasts[ts_index, ..., mv_index].std(axis=0),
forecasts[ts_index, ..., mv_index].mean(0)
+ forecasts[ts_index, ..., mv_index].std(axis=0),
alpha=0.2,
interpolate=True,
label="+/- 1-std",
)
ax.legend()
fig.autofmt_xdate()
例如:
plot(0, 344)
结论
我们如何与其他模型进行比较?Monash时间序列库有一个测试集MASE指标的比较表,我们可以添加进去:
可以看出,也许对一些人来说令人惊讶的是,多变量预测通常比单变量预测更差,原因是难以估计跨系列的相关性/关系。估计值增加的额外方差往往会损害结果预测,或者模型学习到了虚假的相关性。我们推荐这篇论文进行深入阅读。当使用大量数据进行训练时,多变量模型通常效果良好。
所以,在这里,传统的Transformer表现最好!在未来,我们希望能够更好地在一个中心位置对这些模型进行基准测试,以便更轻松地复现几篇论文的结果。请继续关注更多内容!
资源
我们推荐查看Informer文档和本博客文章顶部链接的示例笔记本。