Press "Enter" to skip to content

多变量概率时间序列预测与Informer

多变量概率时间序列预测与Informer 四海 第1张

介绍

几个月前,我们介绍了时间序列变换器,它是将传统的Transformer(Vaswani等人,2017年)应用于预测,并展示了单变量概率预测任务的示例(即单独预测每个时间序列的一维分布)。在本文中,我们介绍了Informer模型(Zhou, Haoyi等人,2021年),这是AAAI21最佳论文,现在已经在🤗 Transformers中可用。我们将展示如何使用Informer模型进行多变量概率预测任务,即预测未来时间序列目标值的向量分布。需要注意的是,这也适用于传统的时间序列变换器模型。

多变量概率时间序列预测

就概率预测的建模而言,当处理多变量时间序列时,Transformer/Informer不需要进行任何更改。在单变量和多变量设置中,模型将接收一个向量序列,因此唯一的变化在于输出或发射方面。

对于高维数据的完整条件分布建模可能会导致计算开销过大,因此方法会采用一些分布的近似方法,最简单的方法是将数据建模为来自同一族分布的独立分布,或者对完整协方差进行低秩近似等。在这里,我们只采用独立(或对角)发射,这在我们实现的分布族中是支持的。

Informer – 内部原理

Informer基于传统的Transformer(Vaswani等人,2017年),引入了两个主要改进。为了理解这些改进,让我们回顾一下传统Transformer的缺点:

  1. 经典自注意力的二次计算:传统Transformer的计算复杂度为O(T^2D),其中T是时间序列长度,D是隐藏状态的维度。对于长序列时间序列预测(也称为LSTF问题),这可能会导致计算开销非常大。为了解决这个问题,Informer采用了一种称为ProbSparse注意力的新的自注意机制,其时间和空间复杂度为O(T log T)。
  2. 堆叠层时的内存瓶颈:当堆叠N个编码器/解码器层时,传统Transformer的内存使用量为O(NT^2),这限制了模型处理长序列的能力。Informer使用了一种称为Distilling操作的方法,将层之间的输入大小减小到其一半。通过这样做,可以将整体内存使用量减小为O(N⋅T log T)。

正如您所看到的,Informer模型的动机类似于Longformer(Beltagy等人,2020年),Sparse Transformer(Child等人,2019年)和其他自然语言处理论文,用于减少自注意机制的二次复杂度,特别是在输入序列较长时。现在,让我们深入了解ProbSparse注意力和Distilling操作,并附带代码示例。

ProbSparse注意力

ProbSparse的主要思想是经典自注意力分数形成了一个长尾分布,其中“活跃”的查询位于“头部”分数中,而“懒惰”的查询位于“尾部”区域中。通过“活跃”查询,我们指的是一个查询qi,使得点积⟨qi, ki⟩对主要注意力产生贡献,而“懒惰”的查询形成的点积生成的注意力是微不足道的。这里,qi和ki分别是Q和K注意力矩阵中的第i行。

在理解“活跃”和“懒惰”查询的思想之后,ProbSparse注意力选择“活跃”查询,并创建一个缩减的查询矩阵Qreduced,用于在O(T log T)的时间内计算注意力权重。让我们通过代码示例更详细地了解这一点。

回顾一下规范的自注意力公式:

注意力(Q,K,V)= softmax(QKT / dk)V

其中 Q ∈ R LQ × d,K ∈ R LK × d,V ∈ R LV × d。请注意,在实践中,查询和键的输入长度在自注意力计算中通常是相等的,即 LQ = LK = T,其中 T 是时间序列的长度。因此,QKT 乘法的计算复杂度为 O(T^2 · d)。在 ProbSparse 注意力中,我们的目标是创建一个新的 Qreduce 矩阵,并定义如下:

ProbSparseAttention(Q,K,V)= softmax(QreduceKT / dk)V

其中 Qreduce 矩阵仅选择 Top u 个“活跃”查询。这里,u = c · log LQ,c 被称为 ProbSparse 注意力的采样因子超参数。由于 Qreduce 仅选择了 Top u 个查询,其大小为 c · log LQ × d,因此乘法 QreduceKT 仅需要 O(LK log LQ) = O(T log T)。

这很好!但是我们如何选择这些“活跃”查询来创建 Qreduce 呢?让我们定义查询稀疏度量。

查询稀疏度量

查询稀疏度量 M(qi, K) 用于选择 Q 中的 Top u 个“活跃”查询 qi,以创建 Qreduce。理论上,主导的 ⟨qi, ki⟩ 对鼓励“活跃”查询 qi 的概率分布远离均匀分布,如下图所示。因此,实际查询分布与均匀分布之间的 KL 散度被用来定义稀疏度量。

在实践中,这个度量被定义为:

M(qi, K) = max_j (qikjT / √d – 1/Lk ∑_{j=1}^{Lk} qikjT / √d)

这里需要理解的重要事情是,当 M ( q i , K ) M(q_i, K) M ( q i ​ , K ) 更大时,查询 q i q_i q i ​ 应该在 Q r e d u c e Q_{reduce} Q r e d u c e ​ 中,反之亦然。

但是我们如何以非二次时间计算术语 q i k j T q_ik_j^T q i ​ k j T ​ 呢?请记住,大多数点积 ⟨ q i , k i ⟩ \langle q_i,k_i \rangle ⟨ q i ​ , k i ​ ⟩ 生成的是平凡的注意力(即长尾分布属性),因此随机从 K K K 中抽样一部分键,将其称为代码中的 K_sample 即可。

现在,我们准备看一下 probsparse_attention 的代码:

from torch import nn
import math


def probsparse_attention(query_states, key_states, value_states, sampling_factor=5):
    """
    计算稀疏自注意力。
    输入形状:Batch x Time x Channel

    注意额外的 `sampling_factor` 输入。
    """
    # 使用对数获得输入大小
    L_K = key_states.size(1)
    L_Q = query_states.size(1)
    log_L_K = np.ceil(np.log1p(L_K)).astype("int").item()
    log_L_Q = np.ceil(np.log1p(L_Q)).astype("int").item()

    # 计算要从 K 中切片的样本子集并创建 Q_K_sample
    U_part = min(sampling_factor * L_Q * log_L_K, L_K)

    # 创建 Q_K_sample(稀疏度测量中的 q_i * k_j^T 项)
    index_sample = torch.randint(0, L_K, (U_part,))
    K_sample = key_states[:, index_sample, :]
    Q_K_sample = torch.bmm(query_states, K_sample.transpose(1, 2))

    # 使用 Q_K_sample 计算查询的稀疏度测量
    M = Q_K_sample.max(dim=-1)[0] - torch.div(Q_K_sample.sum(dim=-1), L_K)

    # 计算 u 以找到稀疏度测量下的 Top-u 查询
    u = min(sampling_factor * log_L_Q, L_Q)
    M_top = M.topk(u, sorted=False)[1]

    # 将查询_states[:, M_top] 计算为 Q_reduce
    dim_for_slice = torch.arange(query_states.size(0)).unsqueeze(-1)
    Q_reduce = query_states[dim_for_slice, M_top]  # 大小:c*log_L_Q x channel

    # 现在,与标准方法相同
    d_k = query_states.size(-1)
    attn_scores = torch.bmm(Q_reduce, key_states.transpose(-2, -1))  # Q_reduce x K^T
    attn_scores = attn_scores / math.sqrt(d_k)
    attn_probs = nn.functional.softmax(attn_scores, dim=-1)
    attn_output = torch.bmm(attn_probs, value_states)

    return attn_output, attn_scores

请注意,在实现中,U p a r t U_{part} U p a r t ​ 在计算中包含了 L Q L_Q L Q ​,用于稳定性问题(有关更多信息,请参见此讨论)。

我们做到了!请注意,这只是 probsparse_attention 的部分实现,完整的实现可以在🤗 Transformers 中找到。

蒸馏

由于 ProbSparse 自注意力,编码器的特征图中存在一些可以去除的冗余。因此,蒸馏操作用于将编码器层之间的输入大小减少到其一半的切片,从理论上去除这种冗余。实际上,Informer 的 “蒸馏” 操作只是在每个编码器层之间添加了 1D 卷积层和最大池化。假设 X n X_n X n ​ 是第 n n n 个编码器层的输出,则蒸馏操作定义如下:

X n + 1 = MaxPool ( ELU ( Conv1d ( X n ) ) X_{n+1} = \textrm{MaxPool} ( \textrm{ELU}(\textrm{Conv1d}(X_n)) X n + 1 ​ = MaxPool ( ELU ( Conv1d ( X n ​ ) )

让我们来看看代码:

from torch import nn

# ConvLayer 是一个应用 ELU 和 MaxPool1d 的类的前向传递
def informer_encoder_forward(x_input, num_encoder_layers=3, distil=True):
    # 初始化卷积层
    if distil:
        conv_layers = nn.ModuleList([ConvLayer() for _ in range(num_encoder_layers - 1)])
        conv_layers.append(None)
    else:
        conv_layers = [None] * num_encoder_layers
    
    # 在每个编码器层之间应用 conv_layer
    for encoder_layer, conv_layer in zip(encoder_layers, conv_layers):
        output = encoder_layer(x_input)
        if conv_layer is not None:
            output = conv_layer(loutput)
    
    return output

通过将每层的输入减少一半,我们得到了一个内存使用量为 O(N⋅T logT) 而不是 O(N⋅T^2),其中 N 是编码器/解码器层数。这正是我们想要的!

现在,Informer 模型在 🤗 Transformers 库中可用,并简称为 InformerModel。在下面的章节中,我们将展示如何在自定义的多变量时间序列数据集上训练该模型。

设置环境

首先,让我们安装必要的库:🤗 Transformers、🤗 Datasets、🤗 Evaluate、🤗 Accelerate 和 GluonTS。

正如我们将要展示的,GluonTS 将用于将数据转换为特征,并创建适当的训练、验证和测试批次。

!pip install -q transformers datasets evaluate accelerate gluonts ujson

加载数据集

在本博文中,我们将使用 Hugging Face Hub 上提供的 traffic_hourly 数据集。该数据集包含 Lai 等人 (2017) 使用的 San Francisco Traffic 数据集。它包含了从 2015 年到 2016 年在旧金山湾区高速公路上的 862 个小时级别时间序列,显示了道路占用率在 [0, 1] 范围内的情况。

该数据集是 Monash 时间序列预测库的一部分,该库包含了来自多个领域的时间序列数据集。它可以看作是时间序列预测的 GLUE 基准。

from datasets import load_dataset

dataset = load_dataset("monash_tsf", "traffic_hourly")

如您所见,数据集包含 3 个拆分:训练集、验证集和测试集。

dataset

>>> DatasetDict({
        train: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
        test: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
        validation: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
    })

每个示例包含几个关键字,其中 starttarget 是最重要的。让我们看一下数据集中的第一个时间序列:

train_example = dataset["train"][0]
train_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

start 简单地表示时间序列的开始(作为日期时间),而 target 包含时间序列的实际值。

start 将用于将与时间相关的特征添加到时间序列值中,作为模型的额外输入(例如「年份的月份」)。由于我们知道数据的频率是「每小时」,我们知道例如第二个值具有时间戳 2015-01-01 01:00:012015-01-01 02:00:01 等。

print(train_example["start"])
print(len(train_example["target"]))

>>> 2015-01-01 00:00:01
    17448

验证集包含与训练集相同的数据,只是时间长度比训练集多一个 prediction_length。这样我们就可以将模型的预测与真实值进行验证。

测试集与验证集相比,又多了一个 prediction_length 的数据长度(或者相对于训练集来说,多个滚动窗口的长度)。

validation_example = dataset["validation"][0]
validation_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

初始值与相应的训练示例完全相同。但是,与训练示例相比,该示例有prediction_length=48(48小时或2天)额外的值。让我们进行验证。

freq = "1H"
prediction_length = 48

assert len(train_example["target"]) + prediction_length == len(
    dataset["validation"][0]["target"]
)

让我们将其可视化:

import matplotlib.pyplot as plt

num_of_samples = 150

figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
    validation_example["target"][-num_of_samples - prediction_length :],
    color="red",
    alpha=0.5,
)

plt.show()

多变量概率时间序列预测与Informer 四海 第2张

让我们将数据拆分为:

train_dataset = dataset["train"]
test_dataset = dataset["test"]

start更新为pd.Period

我们首先要做的是使用数据的freq将每个时间序列的start特征转换为pandas的Period索引:

from functools import lru_cache

import pandas as pd
import numpy as np


@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
    return pd.Period(date, freq)


def transform_start_field(batch, freq):
    batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
    return batch

现在,我们使用datasetsset_transform功能来实时地进行此转换:

from functools import partial

train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))

现在,让我们使用GluonTS中的MultivariateGrouper将数据集转换为多变量时间序列。此grouper将把各个一维时间序列转换为单个二维矩阵。

from gluonts.dataset.multivariate_grouper import MultivariateGrouper

num_of_variates = len(train_dataset)

train_grouper = MultivariateGrouper(max_target_dim=num_of_variates)
test_grouper = MultivariateGrouper(
    max_target_dim=num_of_variates,
    num_test_dates=len(test_dataset) // num_of_variates, # 滚动测试窗口的数量
)

multi_variate_train_dataset = train_grouper(train_dataset)
multi_variate_test_dataset = test_grouper(test_dataset)

请注意,目标现在是二维的,第一维是变量的数量(时间序列的数量),第二维是时间序列的值(时间维度):

multi_variate_train_example = multi_variate_train_dataset[0]
print("multi_variate_train_example["target"].shape =", multi_variate_train_example["target"].shape)

>>> multi_variate_train_example["target"].shape = (862, 17448)

定义模型

接下来,让我们实例化一个模型。模型将从头开始训练,因此我们不会在这里使用from_pretrained方法,而是从config随机初始化模型。

我们对模型指定了一些额外的参数:

  • prediction_length(在我们的例子中为48小时):这是Informer解码器学习预测的时间范围;
  • context_length:如果没有指定context_length,模型将将context_length(编码器的输入)设置为prediction_length
  • 给定频率的lags:这些指定了一种有效的“回顾”机制,其中我们将过去的值与当前值连接起来作为附加特征,例如对于Daily频率,我们可以考虑[1, 7, 30, ...]的回顾,对于Minute数据,我们可以考虑[1, 30, 60, 60*24, ...]等等;
  • 时间特征的数量:在我们的例子中,将添加HourOfDayDayOfWeek等特征,因此为5

让我们检查GluonTS提供的默认滞后值(”hourly”):

from gluonts.time_feature import get_lags_for_frequency

lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)

>>> [1, 2, 3, 4, 5, 6, 7, 23, 24, 25, 47, 48, 49, 71, 72, 73, 95, 96, 97, 119, 120, 
     121, 143, 144, 145, 167, 168, 169, 335, 336, 337, 503, 504, 505, 671, 672, 673, 719, 720, 721]

这意味着每个时间步骤将回溯到721小时(约30天),作为额外的特征。然而,生成的特征向量的大小将为len(lags_sequence)*num_of_variates,对于我们的情况将是34480!这是行不通的,所以我们将使用自己合理的滞后值。

让我们也检查GluonTS提供的默认时间特征:

from gluonts.time_feature import time_features_from_frequency_str

time_features = time_features_from_frequency_str(freq)
print(time_features)

>>> [<function hour_of_day at 0x7f3809539240>, <function day_of_week at 0x7f3809539360>, <function day_of_month at 0x7f3809539480>, <function day_of_year at 0x7f38095395a0>]

在这种情况下,有四个额外的特征,即”hour of day”,”day of week”,”day of month”和”day of year”。这意味着对于每个时间步骤,我们将将这些特征作为标量值添加进去。例如,考虑时间戳2015-01-01 01:00:01。这四个额外的特征将是:

from pandas.core.arrays.period import period_array

timestamp = pd.Period("2015-01-01 01:00:01", freq=freq)
timestamp_as_index = pd.PeriodIndex(data=period_array([timestamp]))
additional_features = [
    (time_feature.__name__, time_feature(timestamp_as_index))
    for time_feature in time_features
]
print(dict(additional_features))

>>> {'hour_of_day': array([-0.45652174]), 'day_of_week': array([0.]), 'day_of_month': array([-0.5]), 'day_of_year': array([-0.5])}

请注意,小时和天被编码为GluonTS中的[-0.5, 0.5]之间的值。有关time_features的更多信息,请参见此处。除了这四个特征外,我们还将添加一个”age”特征,稍后我们将在数据转换中看到。

我们现在有了定义模型的所有内容:

from transformers import InformerConfig, InformerForPrediction

config = InformerConfig(
    # 在多变量设置下,input_size是每个时间步长中时间序列的变量数
    input_size=num_of_variates,
    # 预测长度:
    prediction_length=prediction_length,
    # 上下文长度:
    context_length=prediction_length * 2,
    # 滞后值从1周前复制:
    lags_sequence=[1, 24 * 7],
    # 我们将添加5个时间特征("hour_of_day", ...和"age"):
    num_time_features=len(time_features) + 1,
    
    # informer参数:
    dropout=0.1,
    encoder_layers=6,
    decoder_layers=4,
    # 从num_of_variates*len(lags_sequence)+num_time_features将输入投影到:
    d_model=64,
)

model = InformerForPrediction(config)

默认情况下,模型使用对角线学生t分布(但这是可配置的):

model.config.distribution_output

>>> 'student_t'

定义转换

接下来,我们定义数据的转换,特别是用于创建时间特征(基于数据集或通用特征)的转换。

再次,我们将使用GluonTS库。我们定义了一系列的转换(与torchvision.transforms.Compose用于图像的转换类似)。它允许我们将多个转换组合成一个单一的流水线。

from gluonts.time_feature import TimeFeature
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

下面的转换附有注释,以解释它们的功能。从高层次来看,我们将遍历数据集中的每个时间序列,并添加/删除字段或特征:

from transformers import PretrainedConfig


def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    # 创建要稍后删除的字段列表
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)

    return Chain(
        # 步骤1:如果未指定,则删除静态/动态字段
        [RemoveFields(field_names=remove_field_names)]
        # 步骤2:将数据转换为NumPy格式(可能不需要)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # 对于多变量情况,我们期望有一个额外的维度:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # 步骤3:通过将目标值填充为零来处理NaN值,并返回掩码
            # 掩码表示观测到的值为true,NaN值为false
            # 解码器使用此掩码(对于未观测到的值不会产生损失)
            # 参见xxxForPrediction模型中的loss_weights
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # 步骤4:根据数据集的频率添加时间特征,作为位置编码
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # 步骤5:添加另一个时间特征(只是一个单独的数字)
            # 告诉模型时间序列值在生命周期中的位置
            # 类似于运行计数器
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # 步骤6:将所有时间特征在FEAT_TIME键中垂直堆叠
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # 步骤7:重命名以匹配HuggingFace的名称
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

定义InstanceSplitter

对于训练/验证/测试,我们接下来创建一个InstanceSplitter,用于从数据集中采样窗口(因为由于时间和内存的限制,我们无法将整个历史值传递给模型)。

实例分割器从数据中随机抽样大小为context_length的窗口和大小为prediction_length的子窗口,并为各自窗口的时间键添加past_future_键。这样可以确保将values拆分为past_values和后续的future_values键,分别用作编码器和解码器的输入。对于time_series_fields参数中的任何键,也会发生同样的情况:

from gluonts.transform.sampler import InstanceSampler
from typing import Optional


def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

创建数据加载器

接下来,是时候创建数据加载器了,它允许我们拥有(输入,输出)对的批次,或者换句话说( past_values , future_values )。

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)

    # 我们初始化一个Training实例
    instance_splitter = create_instance_splitter(config, "train")

    # 实例分割器将从目标时间序列中随机抽样一个长度为context length + lags + prediction length(在我们的情况下为1)的窗口
    # 并返回一个迭代器。
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(
        stream, is_train=True
    )
    
    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )

def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)

    # 我们创建一个测试实例分割器,它将仅对编码器采样训练期间看到的最后一个上下文窗口。
    instance_sampler = create_instance_splitter(config, "test")

    # 我们在测试模式下应用变换
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

train_dataloader = create_train_dataloader(
    config=config,
    freq=freq,
    data=multi_variate_train_dataset,
    batch_size=256,
    num_batches_per_epoch=100,
    num_workers=2,
)

test_dataloader = create_test_dataloader(
    config=config,
    freq=freq,
    data=multi_variate_test_dataset,
    batch_size=32,
)

让我们检查第一批:

batch = next(iter(train_dataloader))
for k, v in batch.items():
    print(k, v.shape, v.type())

>>> past_time_features torch.Size([256, 264, 5]) torch.FloatTensor
    past_values torch.Size([256, 264, 862]) torch.FloatTensor
    past_observed_mask torch.Size([256, 264, 862]) torch.FloatTensor
    future_time_features torch.Size([256, 48, 5]) torch.FloatTensor
    future_values torch.Size([256, 48, 862]) torch.FloatTensor
    future_observed_mask torch.Size([256, 48, 862]) torch.FloatTensor

可以看到,我们没有将input_idsattention_mask输入到编码器中(这对于自然语言处理模型来说是一种情况),而是使用past_values,以及past_observed_maskpast_time_featuresstatic_real_features

解码器的输入包括future_valuesfuture_observed_maskfuture_time_features。可以将future_values视为自然语言处理中的decoder_input_ids的等价物。

有关每个输入的详细解释,请参阅文档。

前向传递

让我们使用刚刚创建的批次执行单次前向传递:

# 执行前向传递
outputs = model(
    past_values=batch["past_values"],
    past_time_features=batch["past_time_features"],
    past_observed_mask=batch["past_observed_mask"],
    static_categorical_features=batch["static_categorical_features"]
    if config.num_static_categorical_features > 0
    else None,
    static_real_features=batch["static_real_features"]
    if config.num_static_real_features > 0
    else None,
    future_values=batch["future_values"],
    future_time_features=batch["future_time_features"],
    future_observed_mask=batch["future_observed_mask"],
    output_hidden_states=True,
)

print("损失:", outputs.loss.item())

>>> 损失: -1071.5718994140625

请注意,模型返回了损失。由于解码器自动将future_values向右移动一个位置以获得标签,因此可以计算预测值和标签之间的损失。损失是预测分布与实际值之间的负对数似然,趋向于负无穷大。

还请注意,解码器使用因果掩码来防止查看未来,因为它需要预测的值在future_values张量中。

训练模型

是时候训练模型了!我们将使用一个标准的PyTorch训练循环。

我们将在这里使用🤗 Accelerate库,它会自动将模型、优化器和数据加载器放置在适当的设备上。

from accelerate import Accelerator
from torch.optim import AdamW

epochs = 25
loss_history = []

accelerator = Accelerator()
device = accelerator.device

model.to(device)
optimizer = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)

model, optimizer, train_dataloader = accelerator.prepare(
    model,
    optimizer,
    train_dataloader,
)

model.train()
for epoch in range(epochs):
    for idx, batch in enumerate(train_dataloader):
        optimizer.zero_grad()
        outputs = model(
            static_categorical_features=batch["static_categorical_features"].to(device)
            if config.num_static_categorical_features > 0
            else None,
            static_real_features=batch["static_real_features"].to(device)
            if config.num_static_real_features > 0
            else None,
            past_time_features=batch["past_time_features"].to(device),
            past_values=batch["past_values"].to(device),
            future_time_features=batch["future_time_features"].to(device),
            future_values=batch["future_values"].to(device),
            past_observed_mask=batch["past_observed_mask"].to(device),
            future_observed_mask=batch["future_observed_mask"].to(device),
        )
        loss = outputs.loss

        # 反向传播
        accelerator.backward(loss)
        optimizer.step()

        loss_history.append(loss.item())
        if idx % 100 == 0:
            print(loss.item())

>>> -1081.978515625
    ...
    -2877.723876953125

# 查看训练情况
loss_history = np.array(loss_history).reshape(-1)
x = range(loss_history.shape[0])
plt.figure(figsize=(10, 5))
plt.plot(x, loss_history, label="train")
plt.title("损失", fontsize=15)
plt.legend(loc="upper right")
plt.xlabel("迭代次数")
plt.ylabel("负对数似然")
plt.show()

多变量概率时间序列预测与Informer 四海 第3张

推理

在推理时,建议使用generate()方法进行自回归生成,类似于NLP模型。

预测涉及从测试实例采样器中获取数据,该采样器将从数据集中的每个时间序列中采样最后一个大小为context_length的窗口值,并将其传递给模型。请注意,我们将已知的future_time_features提前传递给解码器。

模型将自回归地从预测分布中采样出一定数量的值,并将其传递回解码器以返回预测输出:

model.eval()

forecasts_ = []

for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts_.append(outputs.sequences.cpu().numpy())

模型输出一个形状为(batch_sizenumber of samplesprediction lengthinput_size)的张量。

在这种情况下,我们为每个862个时间序列(对于批次中的每个示例,批次大小为1,因为我们只有一个多元时间序列)获取下一个48小时的100个可能值:

forecasts_[0].shape

>>> (1, 100, 48, 862)

我们将它们垂直堆叠,以获得测试数据集中所有时间序列的预测(以防测试集中有更多时间序列):

forecasts = np.vstack(forecasts_)
print(forecasts.shape)

>>> (1, 100, 48, 862)

我们可以使用基准测试集中的真实样本值来评估生成的预测结果。为此,我们将使用🤗 Evaluate库,其中包括MASE和sMAPE指标。

我们为数据集中的每个时间序列变量计算两个指标:

from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")

forecast_median = np.median(forecasts, 1).squeeze(0).T

mase_metrics = []
smape_metrics = []

for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id],
        references=np.array(ground_truth),
        training=np.array(training_data),
        periodicity=get_seasonality(freq),
    )
    mase_metrics.append(mase["mase"])

    smape = smape_metric.compute(
        predictions=forecast_median[item_id],
        references=np.array(ground_truth),
    )
    smape_metrics.append(smape["smape"])

print(f"MASE: {np.mean(mase_metrics)}")

>>> MASE: 1.1913437728068093

print(f"sMAPE: {np.mean(smape_metrics)}")

>>> sMAPE: 0.5322665081607634

plt.scatter(mase_metrics, smape_metrics, alpha=0.2)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

多变量概率时间序列预测与Informer 四海 第4张

为了绘制任何时间序列变量相对于基准测试数据的预测结果,我们定义以下辅助函数:

import matplotlib.dates as mdates

def plot(ts_index, mv_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=multi_variate_test_dataset[ts_index][FieldName.START],
        periods=len(multi_variate_test_dataset[ts_index][FieldName.TARGET]),
        freq=multi_variate_test_dataset[ts_index][FieldName.START].freq,
    ).to_timestamp()

    ax.xaxis.set_minor_locator(mdates.HourLocator())

    ax.plot(
        index[-2 * prediction_length :],
        multi_variate_test_dataset[ts_index]["target"][mv_index, -2 * prediction_length :],
        label="实际",
    )

    ax.plot(
        index[-prediction_length:],
        forecasts[ts_index, ..., mv_index].mean(axis=0),
        label="平均",
    )
    ax.fill_between(
        index[-prediction_length:],
        forecasts[ts_index, ..., mv_index].mean(0)
        - forecasts[ts_index, ..., mv_index].std(axis=0),
        forecasts[ts_index, ..., mv_index].mean(0)
        + forecasts[ts_index, ..., mv_index].std(axis=0),
        alpha=0.2,
        interpolate=True,
        label="+/- 1-std",
    )
    ax.legend()
    fig.autofmt_xdate()

例如:

plot(0, 344)

多变量概率时间序列预测与Informer 四海 第5张

结论

我们如何与其他模型进行比较?Monash时间序列库有一个测试集MASE指标的比较表,我们可以添加进去:

可以看出,也许对一些人来说令人惊讶的是,多变量预测通常比单变量预测更差,原因是难以估计跨系列的相关性/关系。估计值增加的额外方差往往会损害结果预测,或者模型学习到了虚假的相关性。我们推荐这篇论文进行深入阅读。当使用大量数据进行训练时,多变量模型通常效果良好。

所以,在这里,传统的Transformer表现最好!在未来,我们希望能够更好地在一个中心位置对这些模型进行基准测试,以便更轻松地复现几篇论文的结果。请继续关注更多内容!

资源

我们推荐查看Informer文档和本博客文章顶部链接的示例笔记本。

Leave a Reply

Your email address will not be published. Required fields are marked *