Press "Enter" to skip to content

训练你的第一个决策变形器

在以前的帖子中,我们宣布在transformers库中推出了Decision Transformers。这种新技术使用Transformer作为决策模型越来越受欢迎。

所以今天,您将学习如何从头开始训练第一个离线Decision Transformer模型,使半猎豹奔跑。我们将直接在Google Colab上进行训练,您可以在这里找到:👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb

*在Gym HalfCheetah环境中使用离线RL学习的“专家”Decision Transformers模型。

听起来很令人兴奋吗?让我们开始吧!

  • 什么是Decision Transformers?
  • 训练Decision Transformers
    • 加载数据集并构建自定义数据整理器
    • 使用🤗 transformers Trainer训练Decision Transformer模型
  • 结论
  • 接下来是什么?
  • 参考文献

什么是Decision Transformers?

Decision Transformer模型是由陈L.等人在“Decision Transformer: Reinforcement Learning via Sequence Modeling”一文中介绍的。它将强化学习抽象为一个条件序列建模问题

其主要思想是,我们不再使用RL方法(如拟合值函数来确定最大化回报的动作),而是使用一个序列建模算法(Transformer),给定期望的回报、过去的状态和动作,生成未来的动作以实现期望的回报。它是一个自回归模型,以期望的回报、过去的状态和动作为条件生成未来的动作,以实现期望的回报。

这是强化学习范式的彻底转变,因为我们使用生成轨迹建模(建模状态、动作和奖励序列的联合分布)来替代传统的RL算法。这意味着在Decision Transformers中,我们不是最大化回报,而是生成一系列未来的动作以实现期望的回报。

整个过程如下:

  1. 我们将最近的K个时间步作为输入提供给Decision Transformer,输入包括:
    • 目标回报
    • 状态
    • 动作
  2. 输入被嵌入,如果状态是向量,则使用线性层进行嵌入,如果是帧,则使用CNN编码器进行嵌入。
  3. 输入通过GPT-2模型进行处理,该模型通过自回归建模预测未来的动作。

训练你的第一个决策变形器 四海 第1张

Decision Transformer架构。状态、动作和回报通过模态特定的线性嵌入进行输入,然后添加一个位置序列编码。令牌通过GPT架构进行输入,该架构使用因果自注意掩码自回归地预测动作。图片来自[1]。

有不同类型的Decision Transformers,但今天我们将训练一个离线Decision Transformer,这意味着我们只使用从其他代理或人类演示中收集到的数据。代理不与环境交互。如果您想了解离线和在线强化学习之间的区别,请查看本文。

现在我们了解了离线Decision Transformers背后的理论,让我们看看如何在实践中训练一个。

训练Decision Transformers

在之前的帖子中,我们演示了如何使用transformers Decision Transformer模型并从🤗 hub加载预训练权重。

在这部分中,我们将使用🤗 Trainer和自定义的数据整理器从头开始训练一个Decision Transformer模型,使用在🤗 hub上托管的离线RL数据集。您可以在此Colab笔记本中找到此教程的代码。

我们将执行离线RL来学习在mujoco halfcheetah环境中的以下行为。

*在Gym HalfCheetah环境中使用离线RL学习的“专家”Decision Transformers模型。

加载数据集并构建自定义数据整理器

我们在hub上托管了一些离线RL数据集。今天我们将使用halfcheetah“专家”数据集,在此处托管。

首先,我们需要从🤗数据集包中导入load_dataset函数,并将数据集下载到我们的机器上。

from datasets import load_dataset
dataset = load_dataset("edbeeching/decision_transformer_gym_replay", "halfcheetah-expert-v2")

虽然大多数Hub上的数据集都可以直接使用,但有时我们希望对数据集进行一些额外的处理或修改。在这种情况下,我们希望与作者的实现相匹配,也就是说我们需要:

  • 通过减去平均值并除以标准差来对每个特征进行归一化。
  • 为每个轨迹预先计算折扣回报。
  • 将奖励和回报乘以1000。
  • 增加数据集采样分布以考虑专家代理的轨迹长度。

为了执行这种数据集预处理,我们将使用自定义的🤗数据集整理器。

现在让我们开始离线强化学习的自定义数据集整理器。

@dataclass
class DecisionTransformerGymDataCollator:
    return_tensors: str = "pt"
    max_len: int = 20 # 用于训练的每个子集的长度
    state_dim: int = 17  # 状态空间的大小
    act_dim: int = 6  # 行动空间的大小
    max_ep_len: int = 1000 # 数据集中的最大轨迹长度
    scale: float = 1000.0  # 奖励/回报的标准化
    state_mean: np.array = None  # 存储状态均值
    state_std: np.array = None  # 存储状态标准差
    p_sample: np.array = None  # 采样分布以考虑轨迹长度
    n_traj: int = 0 # 存储数据集中的轨迹数量

    def __init__(self, dataset) -> None:
        self.act_dim = len(dataset[0]["actions"][0])
        self.state_dim = len(dataset[0]["observations"][0])
        self.dataset = dataset
        # 计算用于状态归一化的数据集统计数据
        states = []
        traj_lens = []
        for obs in dataset["observations"]:
            states.extend(obs)
            traj_lens.append(len(obs))
        self.n_traj = len(traj_lens)
        states = np.vstack(states)
        self.state_mean, self.state_std = np.mean(states, axis=0), np.std(states, axis=0) + 1e-6
        
        traj_lens = np.array(traj_lens)
        self.p_sample = traj_lens / sum(traj_lens)

    def _discount_cumsum(self, x, gamma):
        discount_cumsum = np.zeros_like(x)
        discount_cumsum[-1] = x[-1]
        for t in reversed(range(x.shape[0] - 1)):
            discount_cumsum[t] = x[t] + gamma * discount_cumsum[t + 1]
        return discount_cumsum

    def __call__(self, features):
        batch_size = len(features)
        # 这是一个小技巧,以便能够从非均匀分布中进行采样
        batch_inds = np.random.choice(
            np.arange(self.n_traj),
            size=batch_size,
            replace=True,
            p=self.p_sample,  # 重新加权,使得采样按照时间步骤进行
        )
        # 数据集特征的一个批次
        s, a, r, d, rtg, timesteps, mask = [], [], [], [], [], [], []
        
        for ind in batch_inds:
            # 对于 features 中的每个特征:
            feature = self.dataset[int(ind)]
            si = random.randint(0, len(feature["rewards"]) - 1)

            # 从数据集中获取序列
            s.append(np.array(feature["observations"][si : si + self.max_len]).reshape(1, -1, self.state_dim))
            a.append(np.array(feature["actions"][si : si + self.max_len]).reshape(1, -1, self.act_dim))
            r.append(np.array(feature["rewards"][si : si + self.max_len]).reshape(1, -1, 1))

            d.append(np.array(feature["dones"][si : si + self.max_len]).reshape(1, -1))
            timesteps.append(np.arange(si, si + s[-1].shape[1]).reshape(1, -1))
            timesteps[-1][timesteps[-1] >= self.max_ep_len] = self.max_ep_len - 1  # 填充截断
            rtg.append(
                self._discount_cumsum(np.array(feature["rewards"][si:]), gamma=1.0)[
                    : s[-1].shape[1]   # TODO 检查此处删除的 +1
                ].reshape(1, -1, 1)
            )
            if rtg[-1].shape[1] < s[-1].shape[1]:
                print("if true")
                rtg[-1] = np.concatenate([rtg[-1], np.zeros((1, 1, 1))], axis=1)

            # 填充和状态 + 奖励归一化
            tlen = s[-1].shape[1]
            s[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, self.state_dim)), s[-1]], axis=1)
            s[-1] = (s[-1] - self.state_mean) / self.state_std
            a[-1] = np.concatenate(
                [np.ones((1, self.max_len - tlen, self.act_dim)) * -10.0, a[-1]],
                axis=1,
            )
            r[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), r[-1]], axis=1)
            d[-1] = np.concatenate([np.ones((1, self.max_len - tlen)) * 2, d[-1]], axis=1)
            rtg[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), rtg[-1]], axis=1) / self.scale
            timesteps[-1] = np.concatenate([np.zeros((1, self.max_len - tlen)), timesteps[-1]], axis=1)
            mask.append(np.concatenate([np.zeros((1, self.max_len - tlen)), np.ones((1, tlen))], axis=1))

        s = torch.from_numpy(np.concatenate(s, axis=0)).float()
        a = torch.from_numpy(np.concatenate(a, axis=0)).float()
        r = torch.from_numpy(np.concatenate(r, axis=0)).float()
        d = torch.from_numpy(np.concatenate(d, axis=0))
        rtg = torch.from_numpy(np.concatenate(rtg, axis=0)).float()
        timesteps = torch.from_numpy(np.concatenate(timesteps, axis=0)).long()
        mask = torch.from_numpy(np.concatenate(mask, axis=0)).float()

        return {
            "states": s,
            "actions": a,
            "rewards": r,
            "returns_to_go": rtg,
            "timesteps": timesteps,
            "attention_mask": mask,
        }

这是很多的代码,简而言之,我们定义了一个类,该类接受我们的数据集,执行所需的预处理,并将返回我们的状态动作奖励回报时间步长掩码的批次。这些批次可以直接用于使用🤗 transformers Trainer训练一个决策Transformer模型。

使用🤗 transformers Trainer训练决策Transformer模型。

为了使用🤗 Trainer类对模型进行训练,我们首先需要确保它返回的字典包含一个损失,即模型动作预测和目标的L-2范数。我们通过创建一个TrainableDT类来实现这一点,该类继承自决策Transformer模型。

class TrainableDT(DecisionTransformerModel):
    def __init__(self, config):
        super().__init__(config)

    def forward(self, **kwargs):
        output = super().forward(**kwargs)
        # 添加DT损失
        action_preds = output[1]
        action_targets = kwargs["actions"]
        attention_mask = kwargs["attention_mask"]
        act_dim = action_preds.shape[2]
        action_preds = action_preds.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
        action_targets = action_targets.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
        
        loss = torch.mean((action_preds - action_targets) ** 2)

        return {"loss": loss}

    def original_forward(self, **kwargs):
        return super().forward(**kwargs)

transformers Trainer类需要一些参数,这些参数在TrainingArguments类中定义。我们使用与作者原始实现中相同的超参数,但迭代次数较少。在Colab笔记本中,训练大约需要40分钟,所以在等待时喝杯咖啡或阅读🤗 Annotated Diffusion博文。作者训练大约需要3小时,所以我们在这里得到的结果可能不如他们的好。

training_args = TrainingArguments(
    output_dir="output/",
    remove_unused_columns=False,
    num_train_epochs=120,
    per_device_train_batch_size=64,
    learning_rate=1e-4,
    weight_decay=1e-4,
    warmup_ratio=0.1,
    optim="adamw_torch",
    max_grad_norm=0.25,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    data_collator=collator,
)

trainer.train()

现在我们已经解释了决策Transformer、Trainer的理论以及如何训练它。你已经准备好从头开始训练你的第一个离线决策Transformer模型,让半猎豹跑起来👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb Colab包括训练模型的可视化,以及如何在🤗 hub上保存模型的方法。

结论

本文演示了如何在🤗 datasets上使用离线RL数据集训练决策Transformer。我们使用了🤗 transformers Trainer和自定义数据整理器。

除了决策Transformer外,我们希望支持深度强化学习社区的更多用例和工具。因此,我们很想听到关于决策Transformer模型的反馈,以及任何我们可以与您一起构建的对RL有用的内容。请随时与我们联系

接下来是什么?

在接下来的几周和几个月中,我们计划支持生态系统中的其他工具

  • 使用在线环境进行训练或微调的决策Transformer模型的仓库扩展[2]
  • 集成Sample Factory 2.0版本

保持联系的最佳方式是加入我们的Discord服务器,与我们和社区交流。

参考资料

[1] Chen, Lili, 等人。”Decision transformer: Reinforcement learning via sequence modeling.” Advances in neural information processing systems 34 (2021)。

[2] 郑庆庆、张艾米、格罗弗,阿迪塔《在线决策变换器》(arXiv预印本,2022年)

Leave a Reply

Your email address will not be published. Required fields are marked *