潜入人工智能的世界-从零开始构建一个深度强化学习的健身房。
目录
如果您已经掌握了强化学习和深度Q-learning的概念,请随时跳转到逐步教程。在那里,您将拥有构建深度强化学习健身房所需的所有资源和代码,包括环境,代理和训练协议。
介绍
为什么选择强化学习?你将获得什么什么是强化学习?深度Q-learning
逐步教程
1. 初始设置2. 大局观3. 环境:初始基础4. 实施代理:神经架构与策略5. 影响环境:结束6. 从经验中学习:经验回放7. 定义代理的学习过程:拟合NN8. 执行训练循环:将其整合在一起9. 结束10. 附加:优化状态表示
为什么选择强化学习?
最近广泛采用了先进的AI系统,如ChatGPT、Bard、Midjourney、Stable Diffusion等,这引发了对人工智能、机器学习和神经网络领域的兴趣,但由于实施这些系统的技术性质,往往无法满足人们的好奇心。
对于那些想要开始进入人工智能领域(或继续深入研究的)的人来说,使用深度Q-learning构建一个强化学习健身房是一个很好的开始,因为它不需要高级知识来实现,可以轻松扩展解决复杂问题,并可以立即洞察人工智能是如何变得“智能”的。
你将获得什么
假设您对Python有基本的了解,在深度强化学习的介绍结束时,您将没有使用高级强化学习框架开发自己的健身房,以训练一个代理解决一个简单的问题-将自己从起点移动到目标点!
这并不是非常光彩,但您将亲自体验建立环境,定义奖励结构和基本的神经架构,调整环境参数以观察不同的学习行为,并在决策过程中找到探索和利用的平衡。
然后,您将拥有实施自己更复杂的环境和系统所需的所有工具,并且将有充分准备深入探讨神经网络和强化学习中的高级优化策略等主题。
您还将获得自信和理解,能够有效利用诸如OpenAI Gym这样的预构建工具,因为系统的每个组件都是从头开始实现的并得到了解密。这使您能够无缝地将这些强大的资源整合到自己的AI项目中。
什么是强化学习?
强化学习(Reinforcement Learning,RL)是机器学习(Machine Learning,ML)的一个子领域,专门研究智能体(做出决策的实体)如何在环境中采取行动以完成目标。
其实现包括:
- 游戏
- 自动驾驶车辆
- 机器人
- 金融(算法交易)
- 自然语言处理
- 等等。
强化学习的理念基于行为心理学的基本原理,即动物或人通过行动的后果来学习。如果一种行动导致了良好的结果,那么智能体将得到奖励;如果不是,则受到惩罚或没有奖励。
在继续之前,了解一些常用术语很重要:
- 环境:这是智能体操作的世界。它设定了智能体必须导航的规则、边界和奖励。
- 智能体:环境中的决策者。智能体根据自身对当前状态的理解来采取行动。
- 状态:智能体在环境中的当前情况的详细快照,包括用于决策的相关度量或感官信息。
- 行动:智能体与环境交互的具体措施,例如移动、收集物品或发起交互。
- 奖励:作为智能体行动结果的环境反馈,可以是积极的、消极的或中立的,引导学习过程。
- 状态/行动空间:智能体可能遇到的所有状态和可以在环境中采取的所有行动的组合。这定义了智能体必须学会导航的决策和情境范围。
基本上,在程序的每一步(回合)中,智能体接收到来自环境的状态,选择一个行动,接收奖励或惩罚,然后更新环境或完成回合。每一步之后收到的信息被保存为用于后续训练的“经验”。
为了更具体的例子,想象一下你在下国际象棋。棋盘即为环境,您是智能体。在每一步(或回合)中,您查看棋盘的状态,从所有可能移动组成的行动空间中选择具有最高未来奖励的行动。移动结束后,您评估这是否是一个好的行动,并学习以在下次表现更好。
一开始可能感觉有很多信息,但在构建自己的项目时,这些术语会变得非常自然。
深度Q学习
Q学习是机器学习中使用的一种算法,”Q”代表”质量”,即智能体可以采取的行动的价值。它通过创建一个Q值表,记录了行动和其对应的质量,来估计在给定状态下采取某个行动的预期未来奖励。
智能体接收到环境的状态,检查表格以查看是否遇到过这个状态,然后选择具有最高奖励值的行动。
然而,Q-Learning有一些缺点。为了获得良好的结果,必须探索每个状态和动作对。如果状态和动作空间(所有可能的状态和动作的集合)太大,那么将它们存储在表中是不可行的。
这就是深度Q-Learning(DQL)作为Q-Learning的演进的地方。DQL利用深度神经网络(NN)来近似Q值函数,而不是将它们保存在表中。这使得能够处理具有高维状态空间(如来自相机的图像输入)的环境对于传统的Q-Learning来说是不实际的。
神经网络能够对相似的状态和动作进行泛化,即使它没有在确切情况下进行训练,也可以选择一个理想的移动方式,从而消除了大表的需要。
神经网络实现这一点的方式超出了本教程的范围。幸运的是,没有必要深入理解才能有效地实施深度Q-Learning。
构建强化学习环境
1. 初始设置
在我们开始编写AI代理的代码之前,建议您对Python中的面向对象编程(OOP)原则有深入的了解。
如果您还没有安装Python,以下是作者Bhargav Bachina提供的一个简单教程,供您入门学习。我将使用的版本是3.11.6。
安装和开始学习Python
新手指南和任何想要开始学习Python的人
VoAGI.com
您唯一需要的依赖库是TensorFlow,这是由Google开发的开源机器学习库,我们将使用它来构建和训练我们的神经网络。这可以通过终端中的pip进行安装。我的版本是2.14.0。
pip install tensorflow
如果不起作用:
pip3 install tensorflow
您还需要NumPy库,但这应该已包含在TensorFlow中。如果遇到问题,请运行pip install numpy
。
还建议为每个类别创建一个新文件(例如environment.py)。这样做可以避免过多,便于排除任何可能遇到的错误。
供您参考,这是包含完整代码的GitHub存储库:https://github.com/HestonCV/rl-gym-from-scratch。请随意克隆、探索和以此作为参考!
2. 整体构架
要真正理解这些概念而不只是复制代码,理解我们将要构建的不同部分以及它们是如何相互配合的至关重要。这样,每个部分都有在整体图景中的位置。
以下是一个训练循环的代码,包含5000个episode。Episode实质上是代理与环境之间的一次完整交互,从开始到结束。
此时不应该实施或完全理解这段代码。在构建每个部分时,如果想要查看特定类或方法将如何使用,请参考此代码。
from environment import Environment
from agent import Agent
from experience_replay import ExperienceReplay
import time
if __name__ == '__main__':
grid_size = 5
environment = Environment(grid_size=grid_size, render_on=True)
agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01)
# agent.load(f'models/model_{grid_size}.h5')
experience_replay = ExperienceReplay(capacity=10000, batch_size=32)
# Number of episodes to run before training stops
episodes = 5000
# Max number of steps in each episode
max_steps = 200
for episode in range(episodes):
# Get the initial state of the environment and set done to False
state = environment.reset()
# Loop until the episode finishes
for step in range(max_steps):
print('Episode:', episode)
print('Step:', step)
print('Epsilon:', agent.epsilon)
# Get the action choice from the agents policy
action = agent.get_action(state)
# Take a step in the environment and save the experience
reward, next_state, done = environment.step(action)
experience_replay.add_experience(state, action, reward, next_state, done)
# If the experience replay has enough memory to provide a sample, train the agent
if experience_replay.can_provide_sample():
experiences = experience_replay.sample_batch()
agent.learn(experiences)
# Set the state to the next_state
state = next_state
if done:
break
# time.sleep(0.5)
agent.save(f'models/model_{grid_size}.h5')
每个内循环都被认为是一个步骤。
在每一步中:
- 从环境中获取状态。
- 代理根据该状态选择一个动作。
- 对环境进行操作,返回奖励、执行动作后的结果状态以及是否完成该情节。
- 然后将初始的
state
、action
、reward
、next_state
和done
存储到experience_replay
中,作为一种类似长期记忆(经验)的方式。 - 然后,代理会对这些经验的随机样本进行训练。
在每个情节结束时,或者您喜欢的频率上,模型的权重会被保存到 models 文件夹中。这样以后就可以预加载它们,以避免每次都从头开始训练。然后在下一个情节开始时重置环境。
这种基本结构基本上就足够创建一个智能代理来解决各种各样的问题!
如介绍中所述,我们为代理设定了一个非常简单的问题:从网格的初始位置到达指定的目标位置。
3. 环境:初步基础
开发这个系统最明显的起点就是环境。
要有一个可以正常工作的强化学习环境,环境需要完成几个任务:
- 维护世界的当前状态。
- 跟踪目标和代理。
- 允许代理对世界进行更改。
- 以模型可以理解的形式返回状态。
- 以我们可以理解的方式呈现,观察代理。
这将成为代理度过其整个生命周期的地方。我们将把环境定义为一个简单的正方形矩阵/2D 数组,或者在 Python 中是一个列表的列表。
这个环境将有一个离散的状态空间,也就是代理可能遇到的状态是独立且可计数的。每个状态是环境中的一种特定情况或场景,不同于连续状态空间,其中状态可以以无限、流动的方式变化 — 认为是下棋和控制汽车的区别。
DQL 是专门设计用于离散动作空间(有限数量的动作)的方法 — 这是我们将重点关注的。对于连续动作空间,会使用其他方法。
在网格中,空的空间将用 0 表示,代理将用 1 表示,目标将用 -1 表示。环境的大小可以任意选择,但是随着环境变得更大,所有可能状态(状态空间)的集合会呈指数增长。这会显著降低训练时间。
当呈现时,网格看起来像这样:
[0, 1, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, 0, 0][0, 0, 0, -1, 0][0, 0, 0, 0, 0]
构建 Environment
类和 reset
方法我们将首先实现 Environment
类,并且为初始化环境提供一种方法。目前,它将接受一个整数 grid_size
,但我们很快会进行扩展。
import numpy as npclass Environment: def __init__(self, grid_size): self.grid_size = grid_size self.grid = [] def reset(self): # 使用 0 的二维列表初始化空的网格 self.grid = np.zeros((self.grid_size, self.grid_size))
创建新实例时,Environment
会保存 grid_size
并初始化一个空的网格。
reset
方法使用 np.zeros((self.grid_size, self.grid_size))
来填充网格,该方法接受一个元组形状,然后输出一个由该形状组成的仅包含零的二维 NumPy 数组。
NumPy数组是一种类似于Python列表的网格状数据结构,它能够高效地存储和操作数值数据。它支持向量化操作,意味着操作会自动应用于数组中的所有元素,无需显式循环。
与标准的Python列表相比,这使得对大型数据集的计算速度更快、更高效。而且,这种数据结构也是我们的代理人神经网络架构所期望的!
为什么叫做reset(重置)?好吧,这个方法将被调用来重置环境,并最终返回网格的初始状态。
添加代理人和目标接下来,我们将构建用于向网格添加代理人和目标的方法。
import randomdef add_agent(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 代理人用1表示 self.grid[location[0]][location[1]] = 1 return locationdef add_goal(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 获取一个随机位置,直到它未被占用 while self.grid[location[0]][location[1]] == 1: location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 目标用-1表示 self.grid[location[0]][location[1]] = -1 return location
代理人和目标的位置将由一个元组(x,y)表示。这两个方法都在网格的边界内随机选择值,并返回位置。主要区别在于add_goal
确保它不选择已被代理人占用的位置。
我们将代理人和目标放置在随机的起始位置,以增加每个回合的变化性。这有助于代理人学习从不同的起始点导航环境,而不是记住一个路径。
最后,我们将添加一个方法,在控制台中渲染世界,以便我们能够看到代理人与环境的交互。
def render(self): # 将元素转换为整数列表以改善格式 grid = self.grid.astype(int).tolist() for row in grid: print(row) print('') # 为每个步骤添加一些空间
render
做了三件事:将self.grid
的元素转换为int类型,将其转换为Python列表,并逐行打印。
我们之所以不直接打印NumPy数组的每一行,只是因为它看起来不那么漂亮。
将所有内容综合起来..
import numpy as npimport randomclass Environment: def __init__(self, grid_size): self.grid_size = grid_size self.grid = [] def reset(self): # 将空网格初始化为一个二维数组,元素均为0 self.grid = np.zeros((self.grid_size, self.grid_size)) def add_agent(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 代理人用1表示 self.grid[location[0]][location[1]] = 1 return location def add_goal(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 获取一个随机位置,直到它未被占用 while self.grid[location[0]][location[1]] == 1: location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 目标用-1表示 self.grid[location[0]][location[1]] = -1 return location def render(self): # 将元素转换为整数列表以改善格式 grid = self.grid.astype(int).tolist() for row in grid: print(row) print('') # 为每个步骤添加一些空间# 测试环境env = Environment(5)env.reset()agent_location = env.add_agent()goal_location = env.add_goal()env.render()print(f'代理人位置: {agent_location}')print(f'目标位置: {goal_location}')
当查看位置时,可能会出现一些错误,但它们应该从左上到右下作为(row, column)来阅读。同时,要记住坐标是从零开始的。
好的,环境已经定义好了。接下来呢?
扩展reset
让我们编辑reset方法来处理为我们放置代理和目标。顺便说一句,我们也将对render进行自动化。
class Environment: def __init__(self, grid_size, render_on=False): self.grid_size = grid_size self.grid = [] # 确保添加新属性 self.render_on = render_on self.agent_location = None self.goal_location = None def reset(self): # 初始化空的网格,作为2D数组的0 self.grid = np.zeros((self.grid_size, self.grid_size)) # 向网格添加代理和目标 self.agent_location = self.add_agent() self.goal_location = self.add_goal() if self.render_on: self.render()
现在,当调用reset
时,代理和目标将被添加到网格中,它们的初始位置将被保存,如果render_on
设置为true,它将渲染网格。
...# 测试环境env = Environment(5, render_on=True)env.reset()# 现在要访问代理和目标位置,您可以使用环境的属性print(f'代理位置:{env.agent_location}')print(f'目标位置:{env.goal_location}')
定义环境的状态目前,我们将实现的最后一个方法是get_state
。乍一看,状态似乎简单地可以是网格本身,但是这种方法的问题是它不是神经网络所期望的。
神经网络通常需要一维输入,而不是目前以二维形状表示的网格。我们可以使用NumPy的内置flatten
方法来解决这个问题。这将把每一行放入同一个数组中。
def get_state(self): # 将网格从2D展平为1D state = self.grid.flatten() return state
这将进行转换:
[0, 0, 0, 0, 0][0, 0, 0, 1, 0][0, 0, 0, 0, 0][0, 0, 0, 0, -1][0, 0, 0, 0, 0]
转换为:
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]
如您所见,哪些单元格是哪些并不是立即明显的,但这对于深度神经网络来说不是什么问题。
现在,我们可以更新reset
在grid
被填充后立即返回状态。其他的不会改变。
def reset(self): ... # 返回网格的初始状态 return self.get_state()
迄今为止的完整代码..
import randomclass Environment: def __init__(self, grid_size, render_on=False): self.grid_size = grid_size self.grid = [] self.render_on = render_on self.agent_location = None self.goal_location = None def reset(self): # 初始化空的网格,作为2D数组的0 self.grid = np.zeros((self.grid_size, self.grid_size)) # 向网格添加代理和目标 self.agent_location = self.add_agent() self.goal_location = self.add_goal() if self.render_on: self.render() # 返回网格的初始状态 return self.get_state() def add_agent(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 代理通过1来表示 self.grid[location[0]][location[1]] = 1 return location def add_goal(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 获取随机位置,直到它不被占用 while self.grid[location[0]][location[1]] == 1: location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 目标通过-1来表示 self.grid[location[0]][location[1]] = -1 return location def render(self): # 转换为int型列表以便改善格式 grid = self.grid.astype(int).tolist() for row in grid: print(row) print('') # 为每一步渲染之间添加一些间距 def get_state(self): # 将网格从2D展平为1D state = self.grid.flatten() return state
您已成功实现了环境的基础!尽管如此,如果您还没有注意到,我们还不能与之交互。代理程序被困在原地。
待Agent
类提供更好的上下文后,我们将在之后返回解决这个问题。
4. 实现代理的神经结构和策略
如先前所述,代理是一个实体,它接收环境的状态,本例中是世界网格的扁平化版本,并根据行动空间决定采取何种行动。
再次重申,行动空间是所有可能行动的集合,在这种情况下,代理可以向上、向下、向左和向右移动,因此行动空间的大小为4。
状态空间是所有可能状态的集合。这取决于环境和代理的视角,可能是一个巨大的数字。在我们的例子中,如果世界是一个5×5的网格,存在600个可能的状态,但如果世界是一个25×25的网格,则存在390,000个状态,进而极大地增加了训练时间。
要使代理有效地学习完成目标,需要以下几个要素:
- 神经网络以近似Q值(对行动的预测未来回报总量)在DQL中的情况。
- 策略或代理程序用于选择行动的方法。
- 来自环境的奖励信号,告诉代理表现如何。
- 能够根据过去的经验进行训练。
可以实施两种不同的策略:
- 贪婪策略:选择当前状态中具有最高Q值的行动。
- Epsilon-Greedy策略:选择当前状态中具有最高Q值的行动,但也有一定概率(通常用ϵ表示)选择随机行动。如果ϵ = 0.02,则有2%的机会选择随机行动。
我们将实现的是Epsilon-Greedy策略。
为什么随机行动有助于代理学习?探索。
当代理开始时,它可能会学习到达目标的次优路径,并继续做出这个选择,而不会改变或学习新的路径。
从一个较大的epsilon值开始,并逐渐降低它,允许代理在利用学到的策略之前彻底探索环境。随着时间的推移降低epsilon的量称为epsilon decay,这很快就会变得更加清晰。
就像我们在环境中所做的那样,我们将使用一个类来表示代理。
现在,在我们实现策略之前,我们需要一种方法来获得Q值。这就是我们代理的大脑——或神经网络——发挥作用的地方。
神经网络在这里不要跑题,神经网络只是一个巨大的函数。值进去,经过每个层并进行转换,最终出现一些不同的值。就只有如此而已。神奇的地方在于训练开始时。
这个想法是给NN大量带标签的数据,比如“这是一个输入,这是你应该输出的结果”。它通过每个训练步骤中的神经元之间调整值,缓慢地找到与给定输出尽可能接近的值,找到数据中的模式,并帮助我们预测神经网络从未见过的输入。
代理类和定义神经结构现在我们将使用TensorFlow定义神经结构,并关注数据的“前向传递”。
from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequentialclass Agent: def __init__(self, grid_size): self.grid_size = grid_size self.model = self.build_model() def build_model(self): # 创建一个具有3个层的串行模型 model = Sequential([ # 输入层预期是一个扁平化的网格,因此输入形状是网格大小的平方 Dense(128, activation='relu', input_shape=(self.grid_size**2,)), Dense(64, activation='relu'), # 输出层具有4个单元,对应可能的行动(向上、向下、向左、向右) Dense(4, activation='linear') ]) model.compile(optimizer='adam', loss='mse') return model
如果你对神经网络不熟悉,不要太纠结于这一部分。虽然我们在模型中使用了”relu”和”linear”等激活函数,但详细探讨激活函数超出了本文的范围。
你只需要知道的是模型以状态作为输入,每一层中的值都经过变换,然后输出对应于每个动作的四个Q值。
在构建代理的神经网络时,我们从一个处理网格状态的输入层开始,网格以一维数组(大小为grid_size²
)的形式表示。这是因为我们对网格进行了平铺,以简化输入。该层本身是我们的输入,不需要在架构中定义,因为它不需要输入。
接下来,我们有两个隐藏层。虽然我们看不见这些值,但在我们的模型学习过程中,它们对于获得Q值函数的更近似是重要的:
- 第一个隐藏层有128个神经元,
Dense(128, activation='relu')
,它以平铺的网格作为输入。 - 第二个隐藏层由64个神经元组成,
Dense(64, activation='relu')
,进一步处理信息。
最后,输出层Dense(4, activation='linear')
由4个神经元组成,对应于四个可能的动作(上、下、左、右)。该层输出Q值——每个动作的未来奖励的估计。
通常来说,您需要解决的问题越复杂,就需要更多的隐藏层和神经元。对于我们的简单用例来说,两个隐藏层应该已经足够了。
神经元和隐藏层数量可以进行实验,以找到速度和结果之间的平衡,每个隐藏层和神经元的增加都会增加网络捕捉和学习数据细微差别的能力。和状态空间一样,神经网络越大,训练速度越慢。
贪婪策略使用这个神经网络,我们现在能够得到一个Q值的预测,虽然还不是很好,但可以做出一个决策。
import numpy as np def get_action(self, state): # 为输入的状态添加一个额外的维度,创建一个包含一个实例的批次 state = np.expand_dims(state, axis=0) # 使用模型预测给定状态的Q值(动作值) q_values = self.model.predict(state, verbose=0) # 选择并返回具有最高Q值的动作 action = np.argmax(q_values[0]) # 从第一个(也是唯一的)项中获取动作 return action
TensorFlow神经网络架构要求输入状态以批次形式。当你有大量的输入并且希望获得完整批次的输出时,这非常有用,但是当你只有一个输入要预测时,可能会有一点困惑。
state = np.expand_dims(state, axis=0)
我们可以通过使用NumPy的expand_dims
方法并指定axis=0
来解决这个问题。这样做的效果就是将它变成一个包含一个输入的批次。例如,大小为5×5的网格状态:
[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]
变成:
[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]]
在训练模型时,通常会使用批次大小为32或更大的批次。它看起来像这样:
[[0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0], [0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, -1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0], ... [0, 0, 0, 0, 0, 0, 0, 0,
。然后,如果设置< code> render_on 为true,则呈现网格。现在,我们已经按照正确的格式准备好了模型的输入,我们可以预测每个动作的Q值并选择最高的值。
...# 使用模型预测给定状态的Q值(动作值)q_values = self.model.predict(state, verbose=0)# 选择并返回具有最高Q值的动作action = np.argmax(q_values[0]) # 从第一(也是唯一)个条目中选择动作...
我们只需将状态提供给模型,它就会输出一批预测结果。请记住,由于我们给网络输入了一个批次大小为1的样本,所以它会返回一个批次大小为1的结果。此外,
verbose=0
确保每次调用预测函数时控制台不会显示常规的调试消息。最后,我们使用
np.argmax
来选择并返回批次中具有最高值的动作的索引,这个批次目前只有一个条目。在我们的例子中,索引0、1、2和3将分别对应向上、向下、向左和向右。
贪婪策略总是选择具有当前Q值的最高奖励的动作,但这并不总是导致最佳的长期结果。
ε-贪婪策略我们已经实现了贪婪策略,但我们想要的是ε-贪婪策略。这种策略在代理选择动作时引入了随机性,以便探索状态空间。
简单回顾一下,ε是随机选择一个动作的概率。我们还希望随着代理的学习,以某种方式逐渐减小ε的值,从而利用其学到的策略。如前所述,这被称为ε衰减。
ε衰减值应该被设置为小于1的小数,它用于在代理采取每个步骤之后逐渐减小ε的值。
通常ε将从1开始,ε衰减将是一个非常接近1的值,比如0.998。在训练过程中的每一步都要将ε乘以ε衰减。
为了说明这一点,以下是ε在训练过程中的变化情况。
初始化值:ε = 1ε衰减 = 0.998-----------------第1步:ε = 1ε = 1 * 0.998 = 0.998-----------------第2步:ε = 0.998ε = 0.998 * 0.998 = 0.996-----------------第3步:ε = 0.996ε = 0.996 * 0.998 = 0.994-----------------第4步:ε = 0.994ε = 0.994 * 0.998 = 0.992-----------------...-----------------第1000步:ε = 1 * (0.998)^1000 = 0.135-----------------...以此类推
如您所见,ε会随每一步逐渐接近零。在第1000步,随机选择一个动作的概率为13.5%。ε衰减是一个需要根据状态空间调整的值。对于较大的状态空间,可能需要更多的探索或更高的ε衰减。
即使代理已经受过良好训练,保持一个小的ε值也是有益的。我们应该定义一个停止点,ε不再减小,即ε结尾。这个值可以是0.1、0.01,甚至是0.001,具体取决于使用情况和任务的复杂性。
在上图中,您会注意到ε在0.1处停止减小,这是预定义的ε结尾。
让我们更新我们的代理类以融入ε。
import numpy as npclass Agent: def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01): self.grid_size = grid_size self.epsilon = epsilon self.epsilon_decay = epsilon_decay self.epsilon_end = epsilon_end ... ... def get_action(self, state): # rand()返回一个介于0和1之间的随机值 if np.random.rand() <= self.epsilon: # 探索:随机动作 action = np.random.randint(0, 4) else: # 在状态上添加一个额外的维度,以创建一个批次只有一个实例 state = np.expand_dims(state, axis=0) # 使用模型预测给定状态的Q值(动作值) q_values = self.model.predict(state, verbose=0) # 选择并返回具有最高Q值的动作 action = np.argmax(q_values[0]) # 从第一(也是唯一)个条目中选择动作 # 衰减ε值以减少探索次数 if self.epsilon > self.epsilon_end: self.epsilon *= self.epsilon_decay return action
我们为
epsilon
、epsilon_decay
和epsilon_end
设置了默认值分别为 1、0.998 和 0.01。请记住 epsilon 及其相关值是超参数,用于控制学习过程的参数。通过实验可以调整它们以获得最佳结果。
方法
get_action
已更新以纳入 epsilon。如果由np.random.rand
得到的随机值小于或等于epsilon
,则选择随机动作。否则,过程与之前相同。最后,如果
epsilon
尚未达到epsilon_end
,我们通过将其乘以epsilon_decay
来更新它,如下所示 —self.epsilon *= self.epsilon_decay
。
Agent
到目前为止:from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequentialimport numpy as npclass Agent: def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01): self.grid_size = grid_size self.epsilon = epsilon self.epsilon_decay = epsilon_decay self.epsilon_end = epsilon_end self.model = self.build_model() def build_model(self): # 创建一个具有 3 层的顺序模型 model = Sequential([ # 输入层期望一个展平的网格,因此输入形状是 grid_size 的平方 Dense(128, activation='relu', input_shape=(self.grid_size**2,)), Dense(64, activation='relu'), # 输出层有 4 个单位,对应可能的动作(上、下、左、右) Dense(4, activation='linear') ]) model.compile(optimizer='adam', loss='mse') return model def get_action(self, state): # rand() 返回一个介于 0 和 1 之间的随机值 if np.random.rand() <= self.epsilon: # 探索:随机动作 action = np.random.randint(0, 4) else: # 为状态添加一个额外的维度,以创建一个包含一个实例的批次 state = np.expand_dims(state, axis=0) # 使用模型预测给定状态的 Q 值(动作值) q_values = self.model.predict(state, verbose=0) # 选择并返回具有最高 Q 值的动作 action = np.argmax(q_values[0]) # 从第一个(也是唯一的)条目中取出动作 # 通过乘以 epsilon_decay 来衰减 epsilon 值,以减少随时间的探索 if self.epsilon > self.epsilon_end: self.epsilon *= self.epsilon_decay return action
我们已有效实现了ε-贪婪策略,现在几乎可以开始让代理学习了!
5. 作用于环境:完成
Environment
目前拥有重置网格、添加代理和目标、提供当前状态以及在控制台上打印网格等方法。为了使环境完整,我们不仅需要使代理能够对其产生影响,还需要以奖励的形式提供反馈信息。
定义奖励结构设计良好的奖励结构是强化学习的主要挑战。即使您的问题完全符合模型的能力,如果奖励结构设置不正确,它也可能永远无法学习。
奖励的目标是鼓励特定的行为。在我们的情况下,我们希望将代理引导到由 -1 定义的目标单元。
与网络中的层和神经元、epsilon 及其相关值类似,定义奖励结构可以有很多正确(和错误)的方式。
奖励结构主要分为两种类型:
- 稀疏:奖励只在少数状态下给出。
- 密集:奖励在整个状态空间中普遍存在。
对于稀疏奖励,代理几乎没有反馈来指导它。这就像为每一步简单地给予一组惩罚,如果代理达到目标,则给予一个较大的奖励。
代理肯定可以学会到达目标,但是根据状态空间的大小,可能需要很长时间,而且可能会陷入次优策略。
这与密集奖励结构相反,后者允许代理更快地进行训练并更可预测地行为。
稠密的奖励结构或者
- 拥有多个目标。
- 在整个过程中给出提示。
这样一来,智能体就有更多机会学习到所期望的行为。
例如,假设你正在训练一个智能体利用身体行走,而唯一的奖励是到达目标。智能体可能会学会通过在地面上不断蠕动或滚动来达到目标,或者根本不学习。
相反,如果你奖励智能体朝着目标前进、保持站立、一步一脚地向前迈步并保持挺直,你将得到一种更加自然有趣的步态,并且还能提高学习效果。
允许智能体影响环境要获得奖励,必须允许智能体与其世界进行互动。让我们重新审视一下
Environment
类以定义这种互动。...def move_agent(self, action): # 将智能体的动作映射到正确的移动上 移动 = { 0: (-1, 0), # 上 1: (1, 0), # 下 2: (0, -1), # 左 3: (0, 1) # 右 } 上一个位置 = self.agent_location # 应用动作后,确定新位置 移动值 = 移动[action] 新位置 = (上一个位置[0] + 移动值[0], 上一个位置[1] + 移动值[1]) # 检查移动是否有效 if self.is_valid_location(新位置): # 将智能体从旧位置移除 self.grid[上一个位置[0]][上一个位置[1]] = 0 # 将智能体加入新位置 self.grid[新位置[0]][新位置[1]] = 1 # 更新智能体的位置 self.agent_location = 新位置 def is_valid_location(self, location): # 检查位置是否在网格边界内 if (0 <= location[0] < self.grid_size) and (0 <= location[1] < self.grid_size): return True else: return False
上面的代码首先定义了每个动作值相关联的坐标变化。如果选择动作0,那么坐标会减小1。
请记住,在这种情况下,坐标被解释为(行,列)。如果行减1,智能体向上移动一格;如果列减1,智能体向左移动一格。
然后根据移动计算新的位置。如果新位置有效,更新
agent_location
。否则,agent_location
保持不变。此外,
is_valid_location
只是检查新位置是否在网格边界内。这很简单,但我们缺了什么?反馈!
提供反馈环境需要提供适当的奖励以及该回合是否完成。
让我们首先加入
done
标志,以指示回合是否结束。...def move_agent(self, action): ... done = False # 默认情况下回合未结束 # 检查移动是否有效 if self.is_valid_location(新位置): # 将智能体从旧位置移除 self.grid[上一个位置[0]][上一个位置[1]] = 0 # 将智能体加入新位置 self.grid[新位置[0]][新位置[1]] = 1 # 更新智能体的位置 self.agent_location = 新位置 # 检查新位置是否为奖励位置 if self.agent_location == self.goal_location: # 回合结束 done = True return done...
我们将
done
默认设置为false。如果新的agent_location
与goal_location
相同,那么将done
设置为true。最后,我们返回该值。我们已经准备好奖励结构了。首先,我将展示稀疏奖励结构的实现。这对于大约5x5的网格是可满足的,但我们将更新它以适应更大的环境。
稀疏奖励实现稀疏奖励非常简单。我们主要需要给达到目标的移动提供奖励。
让我们还给每个未达到目标的步骤一个小的负奖励,并给撞到边界的步骤一个更大的负奖励。这将鼓励我们的智能体优先选择最短路径。
...def move_agent(self, action): ... done = False # 默认情况下,该回合未结束 reward = 0 # 初始化奖励 # 检查是否为有效移动 if self.is_valid_location(new_location): # 从旧位置移除代理 self.grid[previous_location[0]][previous_location[1]] = 0 # 将代理添加到新位置 self.grid[new_location[0]][new_location[1]] = 1 # 更新代理位置 self.agent_location = new_location # 检查新位置是否为目标位置 if self.agent_location == self.goal_location: # 达到目标的奖励 reward = 100 # 回合结束 done = True else: # 对未达到目标的有效移动施加小的惩罚 reward = -1 else: # 对无效移动施加较大的惩罚 reward = -3 return reward, done...
确保初始化
reward
变量,这样可以在 if 块之后访问。同时,仔细检查每种情况:有效移动并达到目标、有效移动但未达到目标,以及无效移动。密集化奖励我们实施密集化奖励系统仍然非常简单,只需要更频繁地提供反馈。
哪种方式是逐步奖励代理向目标移动的好方法?
第一种方式是返回曼哈顿距离的负值。曼哈顿距离是指在行方向上的距离加上在列方向上的距离,而不是直线距离。以下是代码演示:
reward = -(np.abs(self.goal_location[0] - new_location[0]) + \ np.abs(self.goal_location[1] - new_location[1]))
因此,行方向上的步骤数加上列方向上的步骤数,取负值。
另一种方式是根据代理移动的方向提供奖励:如果它远离目标,则提供负奖励;如果它朝向目标,则提供正奖励。
我们可以通过将新曼哈顿距离减去先前的曼哈顿距离来计算这个奖励。它的结果只能是 1 或 -1,因为代理每步只能移动一个单元格。
在我们的情况下,选择第二个选项是最合理的。这应该能提供更好的结果,因为它会根据每一步提供即时反馈,而不是更一般的奖励。
这个选项的代码如下:
...def move_agent(self, action): ... if self.agent_location == self.goal_location: ... else: # 在移动之前计算距离 previous_distance = np.abs(self.goal_location[0] - previous_location[0]) + \ np.abs(self.goal_location[1] - previous_location[1]) # 在移动之后计算距离 new_distance = np.abs(self.goal_location[0] - new_location[0]) + \ np.abs(self.goal_location[1] - new_location[1]) # 如果新位置更接近目标,奖励为 1,如果更远,奖励为 -1 reward = (previous_distance - new_distance) ...
如您所见,如果代理未达到目标,我们会计算
previous_distance
、new_distance
,然后将reward
定义为它们的差值。根据表现情况,可能适合对其及系统中的任何奖励进行缩放。您可以简单地乘以一个数字(例如 0.01、2、100),如果需要更高的奖励。它们的比例需要有效地指导代理达到目标。例如,为向目标更靠近移动奖励 1,为目标本身奖励 0.1 并不合理。
奖励是成比例的。如果将每个正奖励和负奖励乘以相同的因子,通常不会影响训练,除非是非常大或非常小的值。
总结一下,如果代理距离目标还有 10 步,并且它移到一个距离为 11 步的空间中,那么
reward
将为 -1。以下是更新的
move_agent
。def move_agent(self, action): # 将代理的动作映射到正确的移动上 moves = { 0: (-1, 0), # 上方移动 1: (1, 0), # 下方移动 2: (0, -1), # 左侧移动 3: (0, 1) # 右侧移动 } previous_location = self.agent_location # 根据动作确定应用动作之后的新位置 move = moves[action] new_location = (previous_location[0] + move[0], previous_location[1] + move[1]) done = False # 默认情况下,该回合未结束 reward = 0 # 初始化奖励 # 检查是否为有效移动 if self.is_valid_location(new_location): # 从旧位置移除代理 self.grid[previous_location[0]][previous_location[1]] = 0 # 将代理添加到新位置 self.grid[new_location[0]][new_location[1]] = 1 # 更新代理位置 self.agent_location = new_location # 检查新位置是否为目标位置 if self.agent_location == self.goal_location: # 达到目标的奖励 reward = 100 # 回合结束 done = True else: # 在移动之前计算距离 previous_distance = np.abs(self.goal_location[0] - previous_location[0]) + \ np.abs(self.goal_location[1] - previous_location[1]) # 在移动之后计算距离 new_distance = np.abs(self.goal_location[0] - new_location[0]) + \ np.abs(self.goal_location[1] - new_location[1]) # 如果新位置更接近目标,奖励为 1,如果更远,奖励为 -1 reward = (previous_distance - new_distance) else: # 对无效移动施加较大的惩罚 reward = -3 return reward, done
完成目标并尝试无效移动的奖励应该保持与该结构相同。
步骤惩罚我们只缺一件事。
目前代理人不会因为到达目标所需的时间而受到惩罚。我们实施的奖励结构有许多净中性循环。它可能在两个位置之间来回移动,无限累积任何惩罚。我们可以通过每一步减去一个小值来修复这个问题,导致远离的惩罚大于靠近的奖励。这个示例应该更清楚。
想象一下代理人从最左边的节点开始并必须做出决策。如果没有步骤惩罚,它可以选择前进,然后根据需要返回多少次,并在最后移动到目标前总共获得1的奖励。
因此,在数学上,循环1000次然后移动到目标与直接移动到目标一样有效。
试着在任一情况下想象循环并看看如何累积惩罚(或不累积惩罚)。
让我们实施这个。
...# 如果新位置更接近目标,奖励= 0.9,如果更远,奖励= -1.1reward =(previous_distance - new_distance)- 0.1...
就是这样。现在应该激励代理者采取最短路径,防止循环行为。
好了,但有什么意义?此时,您可能会认为定义奖励系统并训练代理执行可以使用更简单的算法完成的任务是浪费时间。
你是正确的。
我们这样做的原因是要学习如何思考如何引导代理人达到目标。在这种情况下,这似乎是微不足道的,但是如果代理人的环境包括物品拾取,敌人战斗,通过障碍物等等呢?
或者在现实世界中有几十个传感器和电机需要协调以顺序导航复杂多样的环境的机器人?
使用传统编程来设计执行这些任务的系统将会非常困难,并且几乎肯定无法像使用RL和良好的奖励结构一样有机地运作或普遍地鼓励代理人学习最佳策略。
强化学习在定义完成任务所需的确切步骤序列由于环境的复杂性和多变性而困难或不可能的应用程序中最有用。让RL工作所需的唯一条件是能够定义什么是有用的行为以及应该避免的行为。
最终环境方法 -
step
。使用Environment
的每个组件,我们现在可以定义代理和环境之间的交互的核心部分。好消息是,这非常简单。
def step(self, action): # 将操作应用于环境,并记录观察结果 reward,done = self.move_agent(action) next_state = self.get_state() # 在每个步骤渲染网格 if self.render_on: self.render() 返回奖励,next_state,done
step
首先在环境中移动代理并记录reward
和done
。然后获取立即在此交互之后的状态< code> next_state最后,
step
返回记录的值< code> reward ,< code> next_state 和< code> done 。这些对于构建我们的代理将从中学习的经验至关重要。
恭喜!您已正式完成了为您的DRL健身房构建环境。
下面是已完成的
Environment
< strong>类。import randomimport numpy as npclass Environment: def __init__(self, grid_size, render_on=False): self.grid_size = grid_size self.render_on = render_on self.grid = [] self.agent_location = None self.goal_location = None def reset(self): # 将空网格初始化为2d 0s的数组 self.grid = np.zeros((self.grid_size, self.grid_size)) # 向网格添加代理和目标 self.agent_location = self.add_agent() self.goal_location = self.add_goal() # 渲染初始网格 if self.render_on: self.render() # 返回初始状态 return self.get_state() def add_agent(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) # 代理由1表示 self.grid [location[0]] [location[1]] = 1 return location def add_goal(self): # 选择一个随机位置 location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) #获取随机位置,直到不占用位置 while self.grid [location[0]] [location[1]] == 1: location = (random.randint(0, self.grid_size - 1), random.randint(0, self.grid_size - 1)) #目标由-1表示 self.grid [location[0]] [location[1]] = -1 return location def move_agent(self, action): # 将代理动作映射到正确的移动 moves = { 0: (-1, 0), #向上 1: (1, 0 ), #向下 2: (0,-1), #向左 3: (0, 1) #向右 } previous_location = self.agent_location # 在应用操作之后确定新位置 move = moves[action] new_location = (previous_location[0] + move[0], previous_location[1] + move[1]) done = False # 默认情况下,该集还未完成 reward = 0 #初始化奖励 # 检查有效移动 if self.is_valid_location(new_location): # 从旧位置移除代理 self.grid[previous_location[0]][previous_location[1]] = 0 # 将代理添加到新位置 self.grid[new_location[0]][new_location[1]] = 1 # 更新代理位置 self.agent_location = new_location # 检查新位置是否为奖励位置 if self.agent_location == self.goal_location: #获取目标的奖励 reward = 100 # 集已完成 done = True else: # 计算移动之前的距离 previous_distance = np.abs(self.goal_location[0] - previous_location[0]) + \ np.abs(self.goal_location[1] - previous_location[1]) # 计算移动后的距离 new_distance
我们现在已经经历了很多。在继续之前,可能有必要回到开始的时候,重新评估每个部分是如何相互作用的,利用你的新知识。
6. 从经验中学习:经验重现
智能体的模型和策略,以及环境的奖励结构和步骤执行机制都已完成,但我们需要一种记住过去的方式,以便智能体能够从中学习。
这可以通过保存经验来实现。
每个经验由一些内容组成:
- 状态:在执行动作之前的状态。
- 动作:在这个状态下采取了什么动作。
- 奖励:智能体根据动作从环境中获得的正面或负面反馈。
- 下一个状态:紧接在动作之后的状态,使智能体能够不仅基于当前状态的结果,而且基于多个状态的结果来行动。
- 完成:表示经验结束,让智能体知道任务是否已完成。在每个步骤中,它可以是真或假。
这些术语对你来说应该不陌生,但再次看到它们也没有坏处!
每个经验与智能体的一步相关联。这将为训练提供所需的所有上下文。
ExperienceReplay 类为了在需要时跟踪和提供这些经验,我们将定义最后一个类 ExperienceReplay。
from collections import deque, namedtupleclass ExperienceReplay: def __init__(self, capacity, batch_size): # 内存使用双端队列存储经验,所以如果容量超过限制,它会高效地移除最旧的项目 self.memory = deque(maxlen=capacity) # 批次大小指定一次采样的经验数量 self.batch_size = batch_size # 经验是一个命名元组,它存储了训练所需的相关信息 self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done'])
该类将接受一个整数值
capacity
,定义一次保存的经验的最大数量,和一个整数值batch_size
,确定每次训练采样的经验数量。批量处理经验如果你还记得,在 Agent 类中的神经网络采用输入的批量。虽然我们只使用了大小为一的批次进行预测,但这对于训练来说非常低效。通常,批次大小为 32 或更高。
将输入进行批量处理进行训练有两个好处:
- 提高效率,因为它允许同时处理多个数据点,减少计算负担,更好地利用 GPU 或 CPU 资源。
- 帮助模型更加稳定地学习,因为它同时从多个示例中学习,可以使其更好地处理新的未见过的数据。
内存
memory
将使用双端队列(deque)实现。这允许我们将新的经验添加到队列的前面,并且当达到由capacity
定义的最大长度时,队列会自动移除最旧的经验,而不需要像使用 Python 列表那样移动每个元素。当capacity
设为 10,000 或更大时,这样做可以大大提高速度。经验每个经验将定义为一个
namedtuple
。虽然其他许多数据结构也可以工作,但这样做可以提高可读性,因为我们可以按需提取每个部分进行训练。
add_experience
和sample_batch
的实现添加新的经验和采样批次都是相当简单的。import randomdef add_experience(self, state, action, reward, next_state, done): # 创建一个新的经验并将其存储在内存中 experience = self.Experience(state, action, reward, next_state, done) self.memory.append(experience)def sample_batch(self): # 批次将是内存中随机样本的大小为 batch_size 的样本 batch = random.sample(self.memory, self.batch_size) return batch
方法
add_experience
创建一个namedtuple
,其中包含经验的每个部分:state
、action
、reward
、next_state
和done
,并将其添加到memory
中。
sample_batch
同样简单。它从memory
中获取并返回一个大小为batch_size
的随机样本。最后一个所需的方法 -
can_provide_sample
最后,在尝试获取用于训练的批次之前,可以检查memory
中是否包含足够的经验来提供一个完整的样本。def can_provide_sample(self): # 判断memory的长度是否超过了batch_size return len(self.memory) >= self.batch_size
完成的ExperienceReplay类...
import random from collections import deque, namedtuple class ExperienceReplay: def __init__(self, capacity, batch_size): # Memory中以deque的形式存储经验,如果超过capacity,则会高效地删除最旧的项 self.memory = deque(maxlen=capacity) # batch_size指定一次性采样的经验量 self.batch_size = batch_size # Experience是一个namedtuple,其中存储了用于训练的相关信息 self.Experience = namedtuple('Experience', ['state', 'action', 'reward', 'next_state', 'done']) def add_experience(self, state, action, reward, next_state, done): # 创建一个新的经验并将其存储在memory中 experience = self.Experience(state, action, reward, next_state, done) self.memory.append(experience) def sample_batch(self): # batch将是从memory中随机采样的大小为batch_size的经验集合 batch = random.sample(self.memory, self.batch_size) return batch def can_provide_sample(self): # 判断memory的长度是否超过了batch_size return len(self.memory) >= self.batch_size
通过保存每个经验并从中采样的机制已经就位,我们可以回到
Agent
类,最终使其能够学习。7. 定义Agent的学习过程:拟合NN
在训练神经网络时,目标是使其产生的Q值准确表示每个选择提供的未来奖励。
基本上,我们希望网络学会预测每个决策的价值,不仅考虑即时奖励,还考虑未来可能带来的奖励。
整合未来奖励为了实现这一点,我们将次态的Q值纳入训练过程中。
当Agent采取一个动作并移动到一个新的状态时,我们查看这个新状态中的Q值,以帮助确定之前动作的价值。换句话说,潜在的未来奖励影响当前选择的感知价值。
learn方法
import numpy as np def learn(self, experiences): states = np.array([experience.state for experience in experiences]) actions = np.array([experience.action for experience in experiences]) rewards = np.array([experience.reward for experience in experiences]) next_states = np.array([experience.next_state for experience in experiences]) dones = np.array([experience.done for experience in experiences]) # 预测给定状态批次的Q值(动作值) current_q_values = self.model.predict(states, verbose=0) # 预测下一个状态批次的Q值 next_q_values = self.model.predict(next_states, verbose=0) ...
使用提供的批次
experiences
,我们将使用列表推导和之前在ExperienceReplay
中定义的namedtuple
值提取每个部分。然后,我们将每个部分转换为NumPy数组,以提高效率并与模型预期的方式保持一致,如前面所解释的。最后,我们使用模型来预测采取行动时的当前状态和紧随其后的状态的Q值。
在继续使用
learn
方法之前,我需要解释一下一个称为折扣因子的东西。未来奖励折现 - γ的作用直观上,我们知道当所有其他条件相等时,即时奖励通常是优先考虑的。(你是想要今天的薪水还是下周的薪水?)
数学上表示这点可能会显得不太直观。在考虑未来时,我们不希望未来与现在具有相等的重要性(权重)。我们通过折扣未来的幅度,或者降低其对每个决策的影响,来定义γ(通常用希腊字母γ表示)。
γ可以调整,较大的值鼓励规划,较小的值鼓励更加短视的行为。我们将使用默认值0.99。
折扣因子几乎总是介于0和1之间。折扣因子大于1,将未来优先于现在,会引入不稳定的行为,并且几乎没有实际应用。
实施γ并定义目标Q值回想一下,在训练神经网络的上下文中,这个过程依赖于两个关键要素:我们提供的输入数据和我们希望网络学习预测的相应输出。
我们需要为网络提供一些目标Q值,这些值基于环境在此特定状态和动作下给出的奖励以及下一个状态的最佳动作的折扣(由γ定义)预测奖励的更新。
我知道这听起来很复杂,但通过实现和示例来解释会更好理解。
import numpy as np...class Agent: def __init__(self, grid_size, epsilon=1, epsilon_decay=0.995, epsilon_end=0.01, gamma=0.99): ... self.gamma = gamma ... ... def learn(self, experiences): ... # 初始化目标Q值为当前Q值 target_q_values = current_q_values.copy() # 遍历批次中的每个经验 for i in range(len(experiences)): if dones[i]: # 如果该回合结束,没有下一个Q值 # [i, actions[i]]相当于[i][actions[i]] target_q_values[i, actions[i]] = rewards[i] else: # 更新后的Q值是奖励加上下一个状态的折扣最大Q值 # [i, actions[i]]相当于[i][actions[i]] target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i]) ...
我们定义了类属性
gamma
,默认值为0.99。然后,在上面实现的获取
state
和next_state
的预测值之后,我们将target_q_values
初始化为当前Q值。这些值将在下面的循环中进行更新。更新
target_q_values
:
- 如果该回合结束,该行为的
target_q_value
就是给定的奖励,因为没有相关的next_q_value
。- 否则,该回合未结束,该行为的
target_q_value
变为给定的奖励加上next_q_values
中预测的下一个动作的折扣Q值。当
done
为真时进行更新:target_q_values[i, actions[i]] = rewards[i]
当
done
为假时进行更新:target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i])
这里的语法
target_q_values[i, actions[i]]
可能看起来令人困惑,但实质上是第i个经验的Q值,对应动作actions[i]
。批次中的经验 环境给出的奖励 v vtarget_q_values[i, actions[i]] = rewards[i] ^ 选择的动作的索引
这是NumPy中等价于Python列表中的
[i][actions[i]]
。记住每个动作是一个索引(0到3)。如何更新
target_q_values
只是为了更清楚地说明,我将展示target_q_values
如何更接近于我们在训练过程中给予的实际奖励。记住我们正在处理一个批次,这将是一个简化的三个示例值的批次。此外,请确保您理解
experiences
中的条目是独立的。意思是这不是一系列步骤,而是从一系列独立经验中的一个随机样本。假设
actions
,rewards
,dones
,current_q_values
和next_q_values
的值如下所示。gamma = 0.99actions = [1, 2, 2] # (向下,向左,向左)rewards = [1, -1, 100] # 环境对动作给出的奖励dones = [False, False, True] # 表示该情节是否完整current_q_values = [ [2, 5, -2, -3], # 在此状态下,动作2(索引1)迄今为止是最佳的 [1, 3, 4, -1], # 这里,动作3(索引2)当前受欢迎 [-3, 2, 6, 1] # 在此状态中,动作3(索引2)具有最高的Q值]next_q_values = [ [1, 4, -1, -2], # 从第一个状态中进行每个动作后的未来Q值 [2, 2, 5, 0], # 来自第二个状态的未来Q值 [-2, 3, 7, 2] # 来自第三个状态的未来Q值]
然后,我们将
current_q_values
复制到target_q_values
进行更新。target_q_values = current_q_values
然后,对于批次中的每个经验,我们可以显示相关的值。
这不是代码,而只是每个阶段的值的示例。如果您迷失了,请确保回顾初始值以了解每个值的来源。
条目1
i = 0 # 这是批次中的第一个条目(第一次循环)# 相关值的第一个条目actions[i] = 1rewards[i] = 1dones[i] = False target_q_values[i] = [2, 5, -2, -3]next_q_values[i] = [1, 4, -1, -2]
因为此经验的
dones[i]
为false,我们需要考虑next_q_values
并应用gamma(0.99)。target_q_values[i, actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])
为什么要获取
next_q_values[i]
的最大值?因为那将是下一个选择的动作,我们希望得到估计的奖励(Q值)。然后,我们将对应于
actions[i]
的索引的第i个target_q_values
更新为此状态/动作对的奖励加上下一个状态/动作对的折扣奖励。这里是在更新后的此经验中的目标值。
# 更新的target_q_values[i]target_q_values[i] = [2, 4.96, -2, -3] ^ ^ i = 0 action[i] = 1
正如您所见,对于当前状态,选择1(向下)现在更加可取,因为值更高,这种行为已经得到了强化。
计算这些值可能会有所帮助,以便更加明确。
条目2
i = 1 # 这是批次中的第二个条目# 相关值的第二个条目actions[i] = 2rewards[i] = -1dones[i] = False target_q_values[i] = [1, 3, 4, -1]next_q_values[i] = [2, 2, 5, 0]
dones[i]
在这里也是false,因此我们确实需要考虑next_q_values
。target_q_values[i,actions[i]] = rewards[i] + 0.99 * max(next_q_values[i])
同样,在索引
actions[i]
更新第i个经验的target_q_values
。# 更新后的target_q_values[i]target_q_values[i] = [1, 3, 3.95, -1] ^ ^ i = 1 action[i] = 2
选择2(向左)现在不太理想,因为Q值较低,这种行为是不鼓励的。
条目3
最后,批处理中的最后一个条目。
i = 2 #这是批次中的第三个也是最后一个条目#关联值的第二个输入actions[i] = 2rewards[i] = 100dones[i] = True target_q_values[i] = [-3, 2, 6, 1]next_q_values[i] = [-2, 3, 7, 2]
对于该条目,
dones[i]
为true,表示该 episode 已完成,不会再进行进一步的操作。这意味着我们在更新时不考虑next_q_values
。target_q_values[i, actions[i]] = rewards[i]
请注意,我们简单地将
target_q_values[i, action[i]]
设置为rewards[i]
的值,因为不会再进行更多的动作 - 不考虑未来。# 更新后的target_q_values[i]target_q_values[i] = [-3, 2, 100, 1] ^ ^ i = 2 action[i] = 2
在这种情况以及类似的状态中选择2(向左)现在会更加理想。
这是目标在代理之左时,选择该动作会给予全面的奖励的状态。
尽管这可能看起来相当困惑,但思想很简单,就是生成更新的 Q 值,准确地表示环境给予的奖励,以提供给神经网络。这就是 NN 应该近似的内容。
试着逆向想象。因为达到目标的奖励非常大,它会在引导到代理实现目标的状态之间产生传播效应。这就是伽玛考虑下一个状态及其在奖励值中向后扩散的作用的力量。
上面是 Q 值的简化版本以及折扣因子的影响,仅考虑目标的奖励,而不是递增的奖励或惩罚。
在网格中选择任何一个单元格并移动到最高质量的相邻单元格。你会发现它总是提供了通往目标的最佳路径。
这种效果不是立即出现的。它需要代理探索状态和动作空间,逐渐学习和调整策略,建立对不同动作如何随时间变化而导致不同奖励的理解。
如果奖励结构经过精心构建,这将逐渐引导我们的代理采取更有优势的动作。
拟合神经网络对于
learn
方法,最后要做的事情是向代理的神经网络提供states
及其关联的target_q_values
。然后 TensorFlow 将处理更新权重,使其更接近预测出这些值。...def learn(self, experiences): states = np.array([experience.state for experience in experiences]) actions = np.array([experience.action for experience in experiences]) rewards = np.array([experience.reward for experience in experiences]) next_states = np.array([experience.next_state for experience in experiences]) dones = np.array([experience.done for experience in experiences]) #为给定的状态批次预测 Q 值(动作值) current_q_values = self.model.predict(states, verbose=0) #为 next_state 批次预测 Q 值 next_q_values = self.model.predict(next_states, verbose=0) #将当前 Q 值初始化为目标 Q 值 target_q_values = current_q_values.copy() #遍历批处理中的每个经验 for i in range(len(experiences)): if dones[i]: #如果 episode 完成,将没有下一个 Q 值 target_q_values[i, actions[i]] = rewards[i] else: #更新的 Q 值是奖励加上下一个状态的折扣最大 Q 值 #[i, actions[i]]是[i][actions[i]]的 numpy 等效 target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i]) #训练模型 self.model.fit(states, target_q_values, epochs=1, verbose=0)
唯一的新部分是
self.model.fit(states, target_q_values, epochs=1, verbose=0)
。fit
方法接受两个主要参数:输入数据和目标值。在这种情况下,我们的输入是一批states
,而目标值是每个状态的更新的 Q 值。
epochs=1
只是设置了网络尝试拟合数据的次数。这个次数对于通用化已经足够了,而不是拟合特定的批处理。verbose=0
只是告诉 TensorFlow 不打印调试信息,如进度条。现在,
Agent
类已经具备了从经验中学习的能力,但它还需要两个简单的方法 —save
和load
。保存和加载已训练模型保存和加载模型可以避免每次都需要进行完整训练的情况。我们可以使用简单的 TensorFlow 方法,只需要一个参数
file_path
。from tensorflow.keras.models import load_modeldef load(self, file_path): self.model = load_model(file_path)def save(self, file_path): self.model.save(file_path)
创建一个名为 models(或其他你喜欢的名字)的目录,然后可以在设定的间隔内保存已训练的模型。这些文件以 .h5 结尾。所以每当你想要保存你的模型时,只需调用
agent.save(‘models/model_name.h5’)
。加载模型时也是同样的方法。完整的
Agent
类from tensorflow.keras.layers import Densefrom tensorflow.keras.models import Sequential, load_modelimport numpy as npclass Agent: def __init__(self, grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01, gamma=0.99): self.grid_size = grid_size self.epsilon = epsilon self.epsilon_decay = epsilon_decay self.epsilon_end = epsilon_end self.gamma = gamma def build_model(self): # 创建一个带有 3 个层的序贯模型 model = Sequential([ # 输入层期望是一个平铺的网格,因此输入形状是 grid_size 的平方 Dense(128, activation='relu', input_shape=(self.grid_size**2,)), Dense(64, activation='relu'), # 输出层有 4 个单元,表示可能的动作(上、下、左、右) Dense(4, activation='linear') ]) model.compile(optimizer='adam', loss='mse') return model def get_action(self, state): # rand() 返回 0 到 1 之间的随机值 if np.random.rand() <= self.epsilon: # 探索:随机动作 action = np.random.randint(0, 4) else: # 为状态添加一个额外的维度,以创建一个批处理实例 state = np.expand_dims(state, axis=0) # 使用模型预测给定状态的 Q 值(动作值) q_values = self.model.predict(state, verbose=0) # 选择并返回具有最高 Q 值的动作 action = np.argmax(q_values[0]) # 从第一个(也是唯一的)条目中获得动作 # 随时间减小 epsilon 的值以减少探索 if self.epsilon > self.epsilon_end: self.epsilon *= self.epsilon_decay return action def learn(self, experiences): states = np.array([experience.state for experience in experiences]) actions = np.array([experience.action for experience in experiences]) rewards = np.array([experience.reward for experience in experiences]) next_states = np.array([experience.next_state for experience in experiences]) dones = np.array([experience.done for experience in experiences]) # 为给定状态批次预测 Q 值(动作值) current_q_values = self.model.predict(states, verbose=0) # 预测下一个状态批次的 Q 值 next_q_values = self.model.predict(next_states, verbose=0) # 将目标 Q 值初始化为当前 Q 值 target_q_values = current_q_values.copy() # 循环遍历批处理中的每个经验 for i in range(len(experiences)): if dones[i]: # 如果该集数已完成,则没有下一个 Q 值 target_q_values[i, actions[i]] = rewards[i] else: # 更新的 Q 值是奖励加上下一个状态的折扣最大 Q 值 # [i, actions[i]] 是 [i][actions[i]] 的 NumPy 等效形式 target_q_values[i, actions[i]] = rewards[i] + self.gamma * np.max(next_q_values[i]) # 训练模型 self.model.fit(states, target_q_values, epochs=1, verbose=0) def load(self, file_path): self.model = load_model(file_path) def save(self, file_path): self.model.save(file_path)
你的深度强化学习gym的每个类现在都已完整!你成功编码了
Agent
、Environment
和ExperienceReplay
。唯一剩下的就是主训练循环。8. 执行训练循环:把所有内容整合起来
我们来到了项目的最后阶段!我们编码好的每个部分,
Agent
、Environment
和ExperienceReplay
,都需要一种交互方式。这将是主程序,每个episode在这里运行,并在这里定义我们的超参数,如
epsilon
。虽然它相当简单,但我会在编码每个部分时进行拆解,使其更加清晰。
初始化每个部分首先,我们设置
grid_size
,并使用我们创建的类初始化每个实例。from environment import Environment from agent import Agent from experience_replay import ExperienceReplay if __name__ == '__main__': grid_size = 5 environment = Environment(grid_size=grid_size, render_on=True) agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01) experience_replay = ExperienceReplay(capacity=10000, batch_size=32) ...
现在,我们已经有了主训练循环所需的所有部分。
Episode和步骤上限接下来,我们将定义训练运行的episode数量和每个episode允许的步骤的最大数。
限制步骤的数量有助于确保我们的Agent不会陷入循环,并鼓励选择更短的路径。我们会相当慷慨,对于5x5的网格,我们将将最大步数设置为200。对于较大的环境,需要增加此值。
from environment import Environment from agent import Agent from experience_replay import ExperienceReplay if __name__ == '__main__': grid_size = 5 environment = Environment(grid_size=grid_size, render_on=True) agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01) experience_replay = ExperienceReplay(capacity=10000, batch_size=32) # 在训练停止之前运行的episode数量 episodes = 5000 # 每个episode中的最大步数 max_steps = 200 ...
Episode循环在每个episode中,我们将重置
environment
并保存初始state
。然后,我们执行每个步骤,直到done
为真或达到max_steps
。最后,我们保存模型。每个步骤的逻辑尚未实现。from environment import Environment from agent import Agent from experience_replay import ExperienceReplay if __name__ == '__main__': grid_size = 5 environment = Environment(grid_size=grid_size, render_on=True) agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01) experience_replay = ExperienceReplay(capacity=10000, batch_size=32) # 在训练停止之前运行的episode数量 episodes = 5000 # 每个episode中的最大步数 max_steps = 200 for episode in range(episodes): # 获取环境的初始状态,并将done设置为False state = environment.reset() # 循环直到episode结束 for step in range(max_steps): # 每个步骤的逻辑 ... if done: break agent.save(f'models/model_{grid_size}.h5')
请注意,我们使用
grid_size
来命名模型,因为不同输入大小的神经网络架构是不同的。试图将一个5x5的模型加载到一个10x10的架构中会抛出错误。步骤逻辑最后,在步骤循环中,我们将按照之前讨论的将每个部分进行交互。
from environment import Environment from agent import Agent from experience_replay import ExperienceReplay if __name__ == '__main__': grid_size = 5 environment = Environment(grid_size=grid_size, render_on=True) agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01) experience_replay = ExperienceReplay(capacity=10000, batch_size=32) # 在训练停止之前运行的episode数量 episodes = 5000 # 每个episode中的最大步数 max_steps = 200 for episode in range(episodes): # 获取环境的初始状态,并将done设置为False state = environment.reset() # 循环直到episode结束 for step in range(max_steps): print('Episode:', episode) print('Step:', step) print('Epsilon:', agent.epsilon) # 从agent的策略中获取动作选择 action = agent.get_action(state) # 在环境中执行一步,并保存经验 reward, next_state, done = environment.step(action) experience_replay.add_experience(state, action, reward, next_state, done) # 如果经验回放有足够的记忆来提供样本,训练agent if experience_replay.can_provide_sample(): experiences = experience_replay.sample_batch() agent.learn(experiences) # 将state设置为下一个状态 state = next_state if done: break agent.save(f'models/model_{grid_size}.h5')
对于每个步骤,我们首先打印剧集和步骤编号,以便了解我们在训练中的位置。此外,您可以打印
epsilon
,以查看代理的行动中有多少百分比是随机的。这也有助于停止代理的原因是您可以以相同的epsilon
值重新启动代理。打印完信息后,我们使用
agent
策略从此state
获取action
,以在environment
中采取一步,并记录返回的值。然后,我们将
state
、action
、reward
、next_state
和done
保存为一次经验。如果experience_replay
有足够的内存,我们会在experiences
的随机批次上训练agent
。最后,我们将
state
设置为next_state
并检查剧集是否done
。一旦运行至少一个剧集,您将保存了一个模型,您可以加载它并继续之前的位置或评估其性能。
在初始化
agent
之后,只需使用其加载方法,类似于我们所保存的方式 —agent.load(f'models/model_{grid_size}.h5')
当您评估模型时,在每个步骤上添加一点延迟使用时间 —
time.sleep(0.5)
。这会使每个步骤暂停半秒钟。确保包含import time
。完成的训练循环
from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__': grid_size = 5 environment = Environment(grid_size=grid_size, render_on=True) agent = Agent(grid_size=grid_size, epsilon=1, epsilon_decay=0.998, epsilon_end=0.01) # agent.load(f'models/model_{grid_size}.h5') experience_replay = ExperienceReplay(capacity=10000, batch_size=32) # 在训练停止之前要运行的剧集数 episodes = 5000 # 每个剧集中的最大步数 max_steps = 200 for episode in range(episodes): # 获取环境的初始状态并将done设置为False state = environment.reset() # 循环直到剧集结束 for step in range(max_steps): print('剧集:', episode) print('步数:', step) print('Epsilon:', agent.epsilon) # 从代理的策略中获取动作选择 action = agent.get_action(state) # 在环境中进行一步并保存经验 reward, next_state, done = environment.step(action) experience_replay.add_experience(state, action, reward, next_state, done) # 如果经验回放有足够的内存提供样本,训练代理 if experience_replay.can_provide_sample(): experiences = experience_replay.sample_batch() agent.learn(experiences) # 将状态设置为下一个状态 state = next_state if done: break # 可选择地,暂停半秒钟以评估模型 # time.sleep(0.5) agent.save(f'models/model_{grid_size}.h5')
当您需要
time.sleep
或agent.load
时,您可以将其注释掉。运行程序运行一下!您应该能够成功训练代理以完成目标,直到一个8x8左右的网格环境。任何比这大得多的网格大小,训练就会变得困难。
尝试看看您可以扩展的环境有多大。您可以做一些事情,例如向神经网络添加层和神经元,更改
epsilon_decay
或给予更多训练时间。这样做可以加深对每个部分的理解。例如,您可能会注意到
epsilon
会很快达到epsilon_end
。如果愿意,可以放心地将epsilon_decay
更改为0.9998或0.99998等值。随着网格大小的增长,网络接收到的状态会指数增长。
我在结尾处添加了一个简短的奖励部分,以修复这个问题,并展示了代理环境的多种表示方法。
9. 总结
恭喜你完成了这个全面的强化学习和深度Q学习的旅程!
虽然仍然有很多内容需要涵盖,但你可以离开这里获得重要的见解和技能。
在本指南中,你:
- 了解了强化学习的核心概念以及为什么它在人工智能领域是至关重要的。
- 构建了一个简单的环境,为代理的交互和学习奠定了基础。
- 为使用深度Q学习定义了代理的神经网络架构,使代理能够在比传统Q学习更复杂的环境中做出决策。
- 了解了在利用学到的策略之前为什么探索是重要的,并实现了Epsilon-Greedy策略。
- 实现了奖励系统,引导代理去达到目标,并学习了稀疏奖励和稠密奖励之间的区别。
- 设计了经验回放机制,使代理能够从过去的经验中学习。
- 亲自体验了拟合神经网络的过程,这是一个关键的过程,代理根据环境的反馈来改善其性能。
- 将这些组件放在一个训练循环中,观察代理学习的过程,并根据需要进行调整,以达到最佳性能。
到目前为止,你应该对强化学习和深度Q学习有信心了。你不仅在理论上建立了坚实的基础,还通过从头开始构建了一个DRL游戏来实际应用这些知识。
这些知识使你能够解决更复杂的强化学习问题,并为在这个令人兴奋的人工智能领域进行更深入的探索铺平了道路。
上面是一个受到Agar.io启发的网格游戏,代理被鼓励通过吃对方来增长大小。每一步将环境绘制在图形上,使用Python库Matplotlib。代理周围的方框是它们的视野。这被作为它们从环境中获取的状态作为一个扁平化的网格输入,类似于我们在系统中所做的。
像这样的游戏,以及其他无数的用途,可以通过对你在这里完成的内容进行简单的修改来创建。
但是需要记住,深度Q学习只适用于离散动作空间,即具有有限数量的不同动作的空间。对于连续动作空间,比如基于物理的环境,你需要在DRL领域探索其他方法。
10. 奖励:优化状态表示
你可能不信,目前我们正在使用的状态表示方法对于这个问题来说并不是最优的。
实际上非常低效。
对于一个100x100的网格,存在着99,990,000种可能的状态。不仅模型需要相当大的内存来处理这么大的输入 —— 10,000个值,还需要大量的训练数据。根据可用的计算资源,这可能需要几天甚至几周的时间。
另一个不足之处是灵活性。当前的模型只适用于一个特定大小的网格。如果你想使用一个不同尺寸的网格,你需要完全重新训练另一个模型。
我们需要一种能够显著减少状态空间并且适用于任意网格大小的状态表示方法。
更好的方式虽然有多种方法可以做到这一点,但最简单、可能也最有效的方法是使用与目标的相对距离。
相比于一个5x5网格的状态看起来像这样:
[0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, -1, 0, 0, 0, 0, 0]
它可以用两个值来表示:
[-2, -1]
使用这种方法可以将100x100网格的状态空间从99,990,000降低到39,601!
不仅如此,它还能更好地进行泛化。它只需学习到当第一个值为负数时向下移动是正确的选择,当第二个值为负数时向右移动是适当的选择,而正数值则相反。
这使得模型只需探索状态空间的一小部分。
上图显示了在一个25x25网格上训练的模型的学习过程。它显示了代理在每个单元格中根据颜色选择的情况,目标位于中心。
一开始,在探索阶段,代理的策略完全错误。你可以看到,它选择朝上移动当处于目标上方,朝下移动当处于目标下方,等等。
但不到10个episodes,它就学会了一种策略,使它能以最少的步骤从任何单元格达到目标。
这个策略也适用于目标位于任何位置的情况。
最后,它能够非常好地泛化学习。
这个模型只见过一个25x25的网格,但它可以在一个远大于此的环境中使用自己的策略——201x201。在这样的环境中,共有1,632,200,400个代理-目标组合!
让我们对代码进行这个彻底改进。
实施我们实际上不需要做太多工作来使其运作,很幸运。
第一件事是在
Environment
中更新get_state
。def get_state(self): # 计算行距离和列距离 relative_distance = (self.agent_location[0] - self.goal_location[0], self.agent_location[1] - self.goal_location[1]) # 将元组拆包成Numpy数组 state = np.array([*relative_distance]) return state
我们不再返回网格的扁平版本,而是计算与目标的距离并将其作为一个Numpy数组返回。
*
操作符简单地将元组拆分为各个组件。它的效果与执行以下操作相同:state = np.array([relative_distance[0], relative_distance[1])
。此外,在
move_agent
中,我们可以将撞到边界的惩罚更新为与远离目标的移动相同。这样,当您更改网格大小时,代理不会因为移出最初接受训练的区域而受到阻碍。def move_agent(self, action): ... else: # 无效移动的同样惩罚 reward = -1.1 return reward, done
更新神经网络结构目前,我们的TensorFlow模型看起来如下所示。为了简化,我省略了其他部分。
class Agent: def __init__(self, grid_size, ...): self.grid_size = grid_size ... self.model = self.build_model() def build_model(self): # 创建一个具有3层的序列模型 model = Sequential([ # 输入层期望一个扁平化的网格,因此输入形状是网格大小的平方 Dense(128, activation='relu', input_shape=(self.grid_size**2,)), Dense(64, activation='relu'), # 输出层有4个单元,代表可能的动作(上、下、左、右) Dense(4, activation='linear') ]) model.compile(optimizer='adam', loss='mse') return model ...
如果你还记得,我们的模型结构需要有一个一致的输入。在这种情况下,输入大小依赖于
grid_size
。通过更新我们的状态表示,每个状态将只有两个值,不管
grid_size
是什么。我们可以更新模型来期望这个。此外,我们可以完全去掉self.grid_size
,因为Agent
类不再依赖它。class Agent: def __init__(self, ...): ... self.model = self.build_model() def build_model(self): # 创建一个含有三个层的序列模型 model = Sequential([ # 输入层预期是一个平坦的网格,因此输入形状是grid_size的平方 Dense(64, activation='relu', input_shape=(2,)), Dense(32, activation='relu'), # 输出层有4个单元表示可能的动作(上、下、左、右) Dense(4, activation='linear') ]) model.compile(optimizer='adam', loss='mse') return model ...
input_shape
参数预期一个代表输入状态的元组。
(2,)
表示一个具有两个值的一维数组。看起来像这样:[-2, 0]
而它的二维形式
(2,1)
例如,指定了两行和一列。看起来像这样:[[-2], [0]]
最后,我们将隐藏层的神经元数量分别降低到64和32。对于这种简单的状态表示来说,可能仍然有些多余,但是应该足够快速运行。
在开始训练时,试着看看你需要多少个神经元让模型有效地学习。你甚至可以尝试移除第二个隐藏层。
修复主训练循环训练循环只需要很少的调整。让我们更新它以匹配我们的改动。
from environment import Environmentfrom agent import Agentfrom experience_replay import ExperienceReplayimport timeif __name__ == '__main__': grid_size = 5 environment = Environment(grid_size=grid_size, render_on=True) agent = Agent(epsilon=1, epsilon_decay=0.998, epsilon_end=0.01) # agent.load(f'models/model.h5') experience_replay = ExperienceReplay(capacity=10000, batch_size=32) # 在训练停止之前要运行的剧集数 episodes = 5000 # 每个剧集中的最大步数 max_steps = 200 for episode in range(episodes): # 获取环境的初始状态,并将done设置为False state = environment.reset() # 循环直到剧集结束 for step in range(max_steps): print('剧集:', episode) print('步数:', step) print('Epsilon:', agent.epsilon) # 从代理的策略中选择动作 action = agent.get_action(state) # 在环境中采取一步动作并保存经验 reward, next_state, done = environment.step(action) experience_replay.add_experience(state, action, reward, next_state, done) # 如果经验回放有足够的记忆来提供样本,就训练代理 if experience_replay.can_provide_sample(): experiences = experience_replay.sample_batch() agent.learn(experiences) # 将状态设置为next_state state = next_state if done: break # 可选择地暂停0.5秒以评估模型 # time.sleep(0.5) agent.save(f'models/model.h5')
因为
agent
不再需要grid_size
,我们可以删除它以防止出现任何错误。我们也不再需要为每个
grid_size
给模型指定不同的名称,因为现在一个模型可以适用于任何尺寸。如果你对
ExperienceReplay
感兴趣,它将保持不变。请注意,没有一个通用的状态表示适用于所有情况。在某些情况下,像我们一样提供完整的网格可能是有意义的,或者像第9节中的多智能体系统那样只提供一部分网格。目标是在简化状态空间和提供足够的信息让代理学习之间找到平衡。
超参数即使是像我们这样简单的环境也需要调整超参数。请记住,这些是我们可以更改影响训练的值。
我们讨论的每个都包括:
epsilon
,epsilon_decay
,epsilon_end
(探索/利用)gamma
(折扣因子)- 神经元和层数量
batch_size
,capacity
(经验回放)max_steps
还有很多其他的,但是我们将讨论的最后一个将对学习至关重要。
学习率学习率(LR)是神经网络模型的超参数。
它基本上告诉神经网络每次拟合数据时,要调整其权重(用于转换输入的值)的大小。
LR的值通常在1到0.0000001之间,最常见的值为0.01、0.001和0.0001。
如果学习率太低,可能不足以快速更新Q值以学习最优策略,这个过程称为收敛。如果你注意到学习似乎停滞不前,或者根本没有学习,这可能是学习率不够高的迹象。
尽管这些关于学习率的图表大大简化了,但它们应该能够传达基本思想。
另一方面,学习率过高可能导致值“爆炸”或变得越来越大。模型做出的调整太大,导致它发散-随着时间的推移变得更糟。
什么是完美的学习率?绳子有多长?
在许多情况下,你只能使用简单的试错方法。确定学习率是否是问题的一种好方法是检查模型的输出。
这正是我在训练这个模型时遇到的问题。在切换到简化的状态表示后,它拒绝学习。agent实际上会在广泛测试每个超参数后继续前往网格的右下角。
这对我来说没有意义,所以我决定查看由
Agent
的get_action
方法输出的Q值。步骤10[[ 0.29763165 0.28393078 -0.01633328 -0.45749056]]步骤50[[ 7.173178 6.3558702 -0.48632553 -3.1968129 ]]步骤100[[ 33.015953 32.89661 33.11674 -14.883122]]步骤200[[573.52844 590.95685 592.3647 531.27576]]...步骤5000[[37862352. 34156752. 35527612. 37821140.]]
这是爆炸值的一个例子。
在TensorFlow中,我们用于调整权重的优化器Adam的默认学习率是0.001。对于这个特定的案例来说,它实际上太高了。
在测试了各种数值后,最佳数值似乎是0.00001。
让我们来实施一下。
from tensorflow.keras.optimizers import Adamdef build_model(self): # 创建一个包含3个层次的序贯模型 model = Sequential([ # 输入层期望的是一个展平的网格,因此输入形状为grid_size的平方 Dense(64, activation='relu', input_shape=(2,)), Dense(32, activation='relu'), # 输出层有4个单元,对应可能的行动(up, down, left, right) Dense(4, activation='linear') ]) # 更新学习率 optimizer = Adam(learning_rate=0.00001) # 使用自定义优化器编译模型 model.compile(optimizer=optimizer, loss='mse') return model
随意调整这些值,观察Q值的变化。另外,记得导入Adam。
最后,您可以再次开始训练!
热图代码以下是以前展示的绘制热图的代码,如果您感兴趣,可以使用它自己绘制热图。
import matplotlib.pyplot as pltimport numpy as npfrom tensorflow.keras.models import load_modeldef generate_heatmap(episode, grid_size, model_path): # 加载模型 model = load_model(model_path) goal_location = (grid_size // 2, grid_size // 2) # 网格中心位置 # 初始化一个数组来存储颜色强度 heatmap_data = np.zeros((grid_size, grid_size, 3)) # 定义每个行动的颜色 colors = { 0: np.array([0, 0, 1]), # 上(蓝色) 1: np.array([1, 0, 0]), # 下(红色) 2: np.array([0, 1, 0]), # 左(绿色) 3: np.array([1, 1, 0]) # 右(黄色) } # 计算每个状态的Q值并确定颜色强度 for x in range(grid_size): for y in range(grid_size): relative_distance = (x - goal_location[0], y - goal_location[1]) state = np.array([*relative_distance]).reshape(1, -1) q_values = model.predict(state) best_action = np.argmax(q_values) if (x, y) == goal_location: heatmap_data[x, y] = np.array([1, 1, 1]) else: heatmap_data[x, y] = colors[best_action] # 绘制热图 plt.imshow(heatmap_data, interpolation='nearest') plt.xlabel(f'Episode: {episode}') plt.axis('off') plt.tight_layout(pad=0) plt.savefig(f'./figures/heatmap_{grid_size}_{episode}', bbox_inches='tight')
只需在训练循环中导入并根据需求运行即可。
下一步一旦您成功训练了模型并尝试了超参数,我鼓励您真正将其变为您自己的东西。
一些扩展系统的想法:
- 在代理和目标之间添加障碍物
- 创建一个更多样化的环境,可能包括随机生成的房间和路径
- 实现多代理的合作/竞争系统,例如捉迷藏
- 创建一个受《乒乓球》启发的游戏
- 实现资源管理,例如代理在前往目标的途中需要收集食物的饥饿或能量系统
下面是一个超出我们简单网格系统的示例:
使用Pygame,一个用于制作2D游戏的流行Python库,我构建了一个《Flappy Bird》克隆版。然后,在我们预建的
Environment
类中定义了交互、约束和奖励结构。我将状态表示为代理的当前速度和位置、到最近管道的距离以及开口的位置。
对于
Agent
类,我只需将输入大小更新为(4,)
,为NN添加更多层,并更新网络以仅输出两个值 — 跳跃或不跳跃。你可以在GitHub的
flappy_bird
目录中找到并运行此代码。确保使用pip install pygame
命令安装了Pygame。这表明你所构建的代码适用于各种环境。你甚至可以让代理人探索一个3D环境或执行更抽象的任务,比如股票交易。
在扩展你的系统时,不要害怕对环境、状态表示和奖励系统进行创造性的探索。就像代理人一样,我们通过探索来实现最佳的学习效果!
我希望通过从零开始构建DRL gym,能让你对人工智能的美感有更深入的认识,并激发你深入研究。
本文的灵感来自于Harrison Kinsley(sentdex)和Daniel Kukieł发布的《用Python从零开始构建神经网络》(Neural Networks From Scratch In Python Book)和YouTube系列视频。其中的对话风格和从零开始的代码实现真正巩固了我对神经网络的理解。