文章目录
- 强化学习概念
- 实现
- qLearning
- 基于这个思路,那么解决这个问题的代码如下
强化学习概念
强化学习有一个非常直观的表现,就是从出发点到目标之间存在着一个连续的状态转换,比如说从状态一到状态456,而每一个状态都有多种的行为,这些行为会有相应的惩罚和奖励。比如走迷宫问题,每上下左右,或者静止走一步都距离出口进入了不同的状态
我们可以根据每一步进入的状态得到不同的奖励,从而找到出口.最后就得到了我们无论进入哪一个状态,而预知的有益的下一步行动的奖励表。
这样就叫做环境状态感知,将一个智能体随机放入当前环境任何状态,他都可以根据学到的决策表选择最佳行动
接下来我们来实现这个走迷宫的问题。来具体的实践 qlearning的基础思想。
实现
状态矩阵,如下矩阵中的每一行都代表了从出发点到目标之间的的每一个状态,分别是状态一,状态二,状态三,状态4,而每一个状态都有5种行动,分别是上下左右静止,这个是四个状态上下左右行为的状态奖励表,0表示行动不可以进行,-1表示行动可以进行,但是不是终点,10则表示是终点。-10是陷阱
解读第一行就是状态一无法向上,得到0分,向下扣10分,无法向左得0分,向右扣1分,静止扣1分
reward = np.array([[0, -10, 0, -1, -1],[0, 10, -1, 0, -1],[-1, 0, 0, 10, -10],[-1, 0, -10, 0, 10]])
状态转移矩阵,这个矩阵表示的是,哪一个状态可以向哪一个状态进行迁移,分别是上下左右静止所进入的状态,负一表示无法进入新的状态,零表示进入状态0,一表示进入状态一,二表示进入状态二,三表示进入状态三。
对应这四个状态的五种行动,分别会进入到什么状态中
解读第一行就是状态一向上无法进入新状态,向下进入状态2,向左无法进入新状态,向右进入状态1,静止待在状态0
transition_matrix = np.array([[-1, 2, -1, 1, 0],[-1, 3, 0, -1, 1],[0, -1, -1, 3, 2],[1, -1, 2, -1, 3]])
有效行动矩阵,这个矩阵表示了每一个状态可行的行为。对应上面的状态转移矩阵。由此智能体无论落在哪一个状态,他都可以选择有效的行动。
valid_actions = np.array([[1, 3, 4],[1, 2, 4],[0, 3, 4],[0, 2, 4]])
综上所述,我们定义了,智能体从出发到终点,每一个状态的有效的行为,以及智能体如何进行状态转移。那么我们接下来所要求的是智能体无论落在哪一个状态他都要快速而有效的找到终点,得到每一个状态有效的行动奖励表
我们初始化这个行动表,并设置收益系数为0.8,即是用来计算智能体在他可预见的行为里,每一个行为可以得到的收益.
q_matrix = np.zeros((4, 5))
gamma = 0.8
然后我们随机将智能体放在任意的状态上,并让他随机地选择行动,我们来查看一下, Ta.所统计出的行动奖励表,以及每一个行动以及所得到的收益。
for i in range(10):start_state = np.random.choice([0, 1, 2], size=1)[0] # 随机初始起点print('start_state:{}'.format(start_state))current_state = start_statewhile current_state != 3: # 判断是否到达终点action = random.choice(valid_actions[current_state]) # greedy 随机选择当前状态下的有效动作next_state = transition_matrix[current_state][action] # 通过选择的动作得到下一个状态future_rewards = []for action_nxt in valid_actions[next_state]:future_rewards.append(q_matrix[next_state][action_nxt]) # 得到下一个状态所有可能动作的奖励q_state = reward[current_state][action] + gamma * max(future_rewards) # bellman equationq_matrix[current_state][action] = q_state # 更新 q 矩阵current_state = next_state # 将下一个状态变成当前状态print('episode: {}, q matrix: \n{}'.format(i, q_matrix))print()
我们上面这个程序是让每一个智能体随机的走,直到走出终点为止,然后对他所走过的路给出评分评价。我们可以先看到,第一次智能体落在了状态0,它尝试向右走,得到了-1分来到了状态1,然后向左走来到状态0,又是-1分,从状态0又向下走到状态2,得到了-10分,然后选择静止一次,扣了10分,最后从状态2向右走到终点状态3,得到10分,然后结束第一次尝试,更新记忆表
我怎么看出来的?
我已经说了,每行代表的是每个状态,每列代表的是上下左右静止5个动作,你从状态0去看,看看走哪个状态会得到-10和-1,结合那个迷宫走向,根据得分就可以看出来
qLearning
从上面的基础模型中我们知道,如果要寻找最佳行动策略,需要的事件数据有,当前环境状态,可以进行的行为,当前环境每个行为的下一步奖励,最后得到一个状态奖励表,表示从这个状态里如何走才是最佳策略
问题是,现实世界不简单啊,前面迷宫的状态是有限的,格子总是有限的,动作和状态还可以录入,但是现实中状态太多了.
我们将走迷宫问题改成下棋问题,比如围棋,我们每走一步,都有几百种落点可供选择,预示着几百个落点后棋盘会处于多少种不同的状态,然后再次走一步,继续是几百种不同的状态,我们要求ai下棋赢过我们,让ai可以估计接下来的10步如何走才能形成优势,让ai记忆棋谱是行不通的,状态太多了,状态可以逼近了万亿种状态,几乎无穷无尽
如果可以让ai自己探索,自己去学习和观察其中有益的动作状态,自己给自己喂数据,自己训练自己,最后只根据结果完成的好不好来判定,那么就好了
神经网络则提出了一种新的方式,不必知道所有的环境状态,只需要让神经网络学习曾经经历过的有益的状态就可以了,就像人类一样,形成经验,我们可以把当前围棋环境参数输入神经网络,让神经网络自主学习如何走出下一步,估计下一步得分,只要对局足够多,神经网络就会学习的足够多,它就会根据有益的下棋经历,形成一套自己的经验价值奖励表,至于其从对局中学习到了什么,我们是无从得知的,但是从实际结果上看,从aphaGO击败柯洁后,几乎围棋领域已经无人敢称尊了
那么我们不必告诉神经网络我们的环境中有多少种状态,我们只需要告诉他环境参数是如何的,有几种行动就可以了,让它自己找到最佳策略
接下来我们来解决一下gym里经典的摆锤问题:
有一辆小车和一个竖杆,初始状态下,杆子是倾斜的,要求智能体左右移动小车让竖杆保持直立
如果竖杆偏移小车中心2个单位智能体就会失败,相反杆子持续竖着,智能体坚持时间越长奖励越多
那么接下来就需要设计一下,如何让神经网络完成这个目标
先说一下目标,我们希望得到一个模型,这个模型可以根据环境状态给出每个行动状态的下一步状态的奖励预测,从而选择最佳状态去行动
那么我们这个神经网络学习的目标就是如何准确预测当前状态采取的每个行动所得到的奖励,即是q值,这是我们的优化目标
正常来说,需要一个神经网络A,根据当前的环境数据,动作数据,以及跟随动作的下一步环境状态反馈奖励数据,让这个A不断的自我学习,预测出的行动奖励和真实的环境奖励贴近就可以了
但是实际训练中,往往不会直接使用真实的环境奖励,而是采取了缓步更新的方式
这种方式是这样的:
策略网络A:根据当前环境给出估计的下一步最优行动,给出估计奖励值Q1
目标网络B:根据当前环境给出下一步行动的估计奖励值Q2
A与B的网络结构一样,A负责接收环境数据进行训练更新参数,
目标网络B不进行训练,其参数来自于A的复制,但是其估算值Q2和当前环境下一步实际奖励值Q进行一起计算得到Q3,Q3用于对A进行训练,作为A的优化目标值进行A的网络结构更新
B的参数随着A的训练缓步更新,B的参数复制更新总是迟滞于A,Q1向着Q3优化
这样使用同一个网络结构,B来引导A进行训练
这主要是基于以下几个问题考虑:
1.稳定性:使用目标网络可以提供一种稳定的目标值来更新在策略网络的Q值。如果直接使用策略网络的Q值来更新,那么每次更新都可能导致目标值发生变化,这会使得学习过程更加不稳定。通过使用目标网络,我们可以确保在一段时间内目标值是稳定的,这有助于算法的收敛。
2.减少过拟合:目标网络的作用类似于深度学习中的dropout或正则化技术,它可以减少在线网络对特定样本的过拟合。如果每次更新都直接依赖于当前策略网络的输出,那么策略网络可能会过于拟合当前的数据,导致泛化能力下降。
3.解耦目标和当前策略:使用目标网络可以解耦目标Q值的计算和当前策略的选择。这意味着我们可以独立地更新目标网络,而不需要在每个时间步都重新计算目标Q值。这提高了算法的效率。
4.平滑学习:目标网络通过保持一定的稳定性,使得学习过程更加平滑。在每次更新时,我们不会引入太大的变化,这有助于算法逐步逼近最优解。
5.避免陷入局部最优:通过引入目标网络,算法能够在一定程度上避免陷入局部最优解。因为目标网络提供的目标值是基于过去的策略和Q值估计,这有助于引导策略网络探索更多的状态空间
基于这个思路,那么解决这个问题的代码如下
import time
import matplotlib.pyplot as plt
from IPython import display
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import gymfrom gym import wrappersenv = gym.make("CartPole-v1",render_mode="human")
# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:from IPython import display
# 开启交互式绘图模式。动态展示训练过程
plt.ion()# if GPU is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# 命名元组,状态转移变量,state: 表示当前状态。
# action: 表示采取的动作。
# next_state: 表示转移到的下一个状态。
# reward: 表示从当前状态到下一个状态的奖励
Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward'))# 经验回放内存类
class ReplayMemory(object):# 经验回放内存的容量。双向队列长度,maxlen 参数限制了队列的最大长度,确保内存不会无限增长def __init__(self, capacity):self.memory = deque([], maxlen=capacity)# 保存一次状态转移,添加到内存队列def push(self, *args):"""Save a transition"""self.memory.append(Transition(*args))# 从内存中随机采样一批状态转移def sample(self, batch_size):return random.sample(self.memory, batch_size)# 获取内存中状态转移的数量def __len__(self):return len(self.memory)# 这个 DQN 模型是一个具有两个隐藏层的前馈神经网络,用于近似 Q-值函数
class DQN(nn.Module):# n_observations:表示输入状态的维度(观测值的数量)。# n_actions:表示输出动作的维度(动作的数量)def __init__(self, n_observations, n_actions):super(DQN, self).__init__()# 创建了一个线性层(nn.Linear),将输入状态映射到一个具有 128 个神经元的隐藏层。self.layer1 = nn.Linear(n_observations, 128)# 创建了第二个具有 128 个神经元的隐藏层self.layer2 = nn.Linear(128, 128)# 创建了一个线性层,将隐藏层的输出映射到动作空间的维度。self.layer3 = nn.Linear(128, n_actions)# Called with either one element to determine next action, or a batch# during optimization. Returns tensor([[left0exp,right0exp]...]).# 调用这个方法来获取模型的输出。# 将输入 x 通过 ReLU 激活函数传递给第一个隐藏层,然后再传递给第二个隐藏层。最后,我们返回输出层的结果,表示不同动作的预期值。def forward(self, x):x = F.relu(self.layer1(x))x = F.relu(self.layer2(x))return self.layer3(x)# 每次训练时从经验回放内存中随机选择的状态转移的数量
BATCH_SIZE = 128
# 计算未来奖励的折现值。较大的折扣因子意味着更重视未来奖励
GAMMA = 0.99
# ε-贪心策略 中的参数。在训练初期,智能体更倾向于探索新的动作(ε 较大)。
# 随着训练的进行,它逐渐减少探索,更倾向于选择当前估计最优的动作(ε 较小)。
EPS_START = 0.9
EPS_END = 0.05
# 探索率(ε)的衰减速率。每隔一定步数,ε 会减小一次,以平衡探索和利用。
EPS_DECAY = 1000
# 目标网络 更新的参数。目标网络用于计算目标 Q-值,通过软更新(滑动平均)来更新。较小的 TAU 值意味着更频繁的更新目标网络。
TAU = 0.005
# 调整模型权重的更新步长
LR = 1e-4
# 获取了 OpenAI Gym 环境中的动作数量(n_actions),然后获取了状态观测值的数量(n_observations)
# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)
# 主要模型,用于预测每个动作的 Q 值。它接受状态观测值作为输入,并输出每个动作的预期值。
policy_net = DQN(n_observations, n_actions).to(device)
# 目标网络,用于计算目标 Q 值。在训练过程中,它的参数会被软更新(滑动平均)以稳定训练。
target_net = DQN(n_observations, n_actions).to(device)
# 用于将 policy_net 的参数加载到 target_net 中,确保两者初始状态相同。可以使用 policy_net 进行训练和推断,同时使用 target_net 计算目标 Q 值
target_net.load_state_dict(policy_net.state_dict())
# 使用了 AdamW 优化器来训练模型,并创建了一个经验回放内存,用于存储状态转移数据
optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(1000)steps_done = 0# 负责根据ε-贪心策略选择动作
# 探索和利用之间的平衡由**探索率(epsilon)**控制。
# epsilon 越大,越倾向于探索(随机选择动作)。
# epsilon 值随着时间衰减,以鼓励智能体在学习过程中更多地进行利用。
def select_action(state):global steps_done# 生成一个介于 0 和 1 之间的随机样本。sample = random.random()# 根据当前 epsilon 值计算探索阈值。eps_threshold = EPS_END + (EPS_START - EPS_END) * \math.exp(-1. * steps_done / EPS_DECAY)steps_done += 1# 如果样本大于阈值,选择具有最高 Q 值的动作(利用)。if sample > eps_threshold:with torch.no_grad():# t.max(1) will return the largest column value of each row.# second column on max result is index of where max element was# found, so we pick action with the larger expected reward.return policy_net(state).max(1).indices.view(1, 1)# 否则,从动作空间中随机采样一个动作(探索)。else:return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)episode_durations = []# 绘制训练过程中的回合持续时间(Duration),用于绘图
def plot_durations(show_result=False):# 创建一个名为 plt.figure(1) 的图形。plt.figure(1)# 将回合持续时间(episode_durations)转换为张量 durations_t。durations_t = torch.tensor(episode_durations, dtype=torch.float)# 如果 show_result 为真,标题设置为“Result”;否则,标题设置为“Training…”。if show_result:plt.title('Result')else:plt.clf()plt.title('Training...')# 横轴表示回合数,纵轴表示持续时间。plt.xlabel('Episode')plt.ylabel('Duration')plt.plot(durations_t.numpy())# 绘制了回合持续时间的折线图,并计算了最近 100 个回合的平均值。如果回合数超过 100,还会绘制平均值的折线图。# 帮助我们可视化训练过程中的回合持续时间# Take 100 episode averages and plot them tooif len(durations_t) >= 100:means = durations_t.unfold(0, 100, 1).mean(1).view(-1)means = torch.cat((torch.zeros(99), means))plt.plot(means.numpy())plt.pause(0.001) # pause a bit so that plots are updatedif is_ipython:if not show_result:display.display(plt.gcf())display.clear_output(wait=True)else:display.display(plt.gcf())# Deep Q-Network (DQN) 训练过程中的关键部分
def optimize_model():# 如果回放内存的大小小于批次大小 (BATCH_SIZE),则提前返回(不进行优化)if len(memory) < BATCH_SIZE:return# 否则,从回放内存中随机采样一批过渡数据。transitions = memory.sample(BATCH_SIZE)# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for# detailed explanation). This converts batch-array of Transitions# to Transition of batch-arrays.# 转置批处理(参见https://stackoverflow.com/a/19343/3343043# 详细解释)。这将转换转换的批处理数组# to批量数组的转换。batch = Transition(*zip(*transitions))# Compute a mask of non-final states and concatenate the batch elements# (a final state would've been the one after which simulation ended)# 计算非最终状态的掩码,并连接批处理元素# (最终状态是模拟结束后的状态)non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,batch.next_state)), device=device, dtype=torch.bool)non_final_next_states = torch.cat([s for s in batch.next_stateif s is not None])state_batch = torch.cat(batch.state)action_batch = torch.cat(batch.action)reward_batch = torch.cat(batch.reward)# 使用 policy_net 计算当前状态-动作对的 Q 值。# Compute Q(s_t, a) - the model computes Q(s_t), then we select the# columns of actions taken. These are the actions which would've been taken# for each batch state according to policy_net# 计算Q(s_t, a) -模型计算Q(s_t),然后我们选择# 所采取的行动列。这些都是应该采取的行动# 根据policy_net对应每个批处理状态state_action_values = policy_net(state_batch).gather(1, action_batch)# Compute V(s_{t+1}) for all next states.# Expected values of actions for non_final_next_states are computed based# on the "older" target_net; selecting their best reward with max(1).values# This is merged based on the mask, such that we'll have either the expected# state value or 0 in case the state was final.# 计算所有下一个状态的V(s_{t+1})# non_final_next_states的动作期望值是基于计算的# 在“旧的”target_net上;选择最大(1)个值的最佳奖励# 这是基于掩码合并的,这样我们就会有预期的# state值,如果状态为final,则为0。# 使用 target_net 计算下一状态的预期 Q 值。next_state_values = torch.zeros(BATCH_SIZE, device=device)with torch.no_grad():next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values# Compute the expected Q values# 计算预期 Q 值与当前 Q 值之间的均方误差作为损失。通过反向传播更新 policy_net 的参数。# 这个函数使用从回放内存中采样的一批过渡数据来优化 policy_netexpected_state_action_values = (next_state_values * GAMMA) + reward_batch# SmoothL1Loss 是一种用于回归任务的损失函数,它在绝对误差和平方误差之间取得平衡。# 在 DQN 训练中,我们使用它来计算 Q 值的损失。这段代码计算了损失并通过反向传播更新了模型的参数# Compute Huber losscriterion = nn.SmoothL1Loss()loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))# Optimize the modeloptimizer.zero_grad()loss.backward()# In-place gradient clippingtorch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)optimizer.step()if torch.cuda.is_available():num_episodes = 6000
else:num_episodes = 500for i_episode in range(num_episodes):# Initialize the environment and get it's state# 初始化环境并获取初始状态。env.render()state, info = env.reset()state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)for t in count():# 在每个回合中,根据当前状态选择动作。action = select_action(state)# 执行动作,获取奖励和下一状态。observation, reward, terminated, truncated, _ = env.step(action.item())reward = torch.tensor([reward], device=device)done = terminated or truncatedif terminated:next_state = Noneelse:next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)# 将过渡数据存储到回放内存中。# Store the transition in memorymemory.push(state, action, next_state, reward)# Move to the next statestate = next_state# Perform one step of the optimization (on the policy network)optimize_model()# 执行一步优化,更新策略网络的参数。# 软更新目标网络的权重。# Soft update of the target network's weights# θ′ ← τ θ + (1 −τ )θ′target_net_state_dict = target_net.state_dict()policy_net_state_dict = policy_net.state_dict()for key in policy_net_state_dict:target_net_state_dict[key] = policy_net_state_dict[key] * TAU + target_net_state_dict[key] * (1 - TAU)target_net.load_state_dict(target_net_state_dict)# 如果回合结束,记录回合持续时间并绘制图表。if done:episode_durations.append(t + 1)plot_durations()print("Episode finished after {} timesteps".format(t + 1))breakprint('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()