概 述
月球着陆器代理是一个模拟飞行器在月球表面着陆的环境,它有八个连续的状态变量,分别是水平坐标、垂直坐标、水平速度、垂直速度、角度、角速度、腿1触地、腿2触地。它有四个离散的动作,分别是什么都不做、发动左方向引擎、发动主引擎、发动右方向引擎。
训练月球着陆器代理的目标是使飞行器能够安全地降落在两个黄色旗帜之间的停机坪上,最小化燃料消耗和着陆时间。为了实现这个目标,我们可以用策略梯度算法来训练一个神经网络,使其输出在每个状态下执行每个动作的概率。我们可以用蒙特卡罗方法来估计每个状态-动作对的回报,即从该状态-动作对开始,按照当前策略执行到终止状态的累积折扣回报。然后,我们可以用这个回报来更新神经网络的参数,使其更倾向于选择高回报的动作,从而改进策略。
示例代码
这是一个使用 PyTorch 和 OpenAI Gym 的强化学习项目,目的是训练一个智能体在月球着陆器环境中完成任务。
policy_network.py 一个使用策略梯度算法的强化学习的示例,它定义了一个策略网络(PolicyNetwork)和一个智能体(Agent)的类,以及它们的相关方法.
import torch # 导入 PyTorch 库,用于张量运算和自动求导
import torch.nn as nn # 导入 PyTorch 的神经网络模块,用于定义网络层和模型
import torch.optim as optim # 导入 PyTorch 的优化器模块,用于更新模型参数
import torch.nn.functional as F # 导入 PyTorch 的函数模块,用于激活函数和损失函数等class PolicyNetwork(nn.Module): # 定义一个策略网络类,继承自 nn.Module 基类def __init__(self, n_features, n_actions, lr): # 定义初始化方法,接受三个参数:状态特征数,动作数,学习率super(PolicyNetwork, self).__init__() # 调用父类的初始化方法self.layers = nn.Sequential( # 定义一个顺序容器,包含三个全连接层和两个 ReLU 激活函数nn.Linear(n_features, 128), # 第一个全连接层,输入特征数为 n_features,输出特征数为 128nn.ReLU(), # 第一个 ReLU 激活函数,对上一层的输出进行非线性变换nn.Linear(128, 128), # 第二个全连接层,输入和输出特征数都为 128nn.ReLU(), # 第二个 ReLU 激活函数,对上一层的输出进行非线性变换nn.Linear(128, 256), # 第三个全连接层,输入特征数为 128,输出特征数为 256nn.ReLU(), # 第三个 ReLU 激活函数,对上一层的输出进行非线性变换)# 定义一个全连接层,输入特征数为 256,输出特征数为 n_actions,用于输出每个动作的对数值self.pi = nn.Linear(256, n_actions)if lr is not None: # 如果学习率不为空,说明是训练模式# 定义一个优化器,使用 Adam 算法,传入模型的所有参数和学习率self.optimizer = optim.Adam(self.parameters(), lr=lr)self.device = "cuda" if torch.cuda.is_available() else "cpu" # 判断是否有 GPU 可用,如果有则使用 GPU,否则使用 CPUself.to(self.device) # 将模型移动到相应的设备上def forward(self, state: torch.Tensor): # 定义前向传播方法,接受一个参数:状态张量features = self.layers(state) # 将状态张量输入到顺序容器中,得到特征张量# log value for each actionaction_logs = self.pi(features) # 将特征张量输入到全连接层中,得到每个动作的对数值张量# calculate the probability of logs# 对每个动作的对数值进行 softmax 变换,得到每个动作的概率张量,并返回return F.softmax(action_logs, dim=0)class Agent: # 定义一个智能体类def __init__(self, observation_space, action_space, lr): # 定义初始化方法,接受三个参数:观察空间大小,动作空间大小,学习率self.policy = PolicyNetwork( # 创建一个策略网络对象,传入观察空间大小,动作空间大小,学习率n_features=observation_space, n_actions=action_space, lr=lr)self.rewards = [] # 定义一个空列表,用于存储每个时间步的奖励self.action_probs = [] # 定义一个空列表,用于存储每个时间步的动作概率self.lr = lr # 将学习率赋值给 self.lr,用于判断是否是训练模式self.gamma = 0.99 # 定义一个折扣因子,用于计算累积奖励def choose_action(self, state): # 定义一个选择动作的方法,接受一个参数:状态state = torch.tensor(state).to(self.policy.device) # 将状态转换为张量,并移动到相应的设备上log_probs = self.policy(state) # 将状态张量输入到策略网络中,得到每个动作的概率张量action_dist = torch.distributions.Categorical(log_probs) # 根据概率张量创建一个分类分布对象,用于采样动作action = action_dist.sample() # 从分类分布中采样一个动作# no calculations required during testing.if self.lr is not None: # 如果学习率不为空,说明是训练模式action_probs: int = action_dist.log_prob(action).unsqueeze(0) # 计算采样动作的对数概率,并增加一个维度self.action_probs.append(action_probs) # 将动作的对数概率添加到列表中return action.item() # 返回动作的数值def learn(self): # 定义一个学习的方法,用于更新策略网络的参数G = 0 # 定义一个变量,用于存储累积奖励returns = [] # 定义一个空列表,用于存储每个时间步的累积奖励for reward in reversed(self.rewards): # 从后向前遍历奖励列表G = self.gamma * G + reward # 计算当前时间步的累积奖励,使用折扣因子和当前奖励returns.append(G) # 将累积奖励添加到列表中returns.reverse() # 将累积奖励列表反转,使其与时间步对应# 将累积奖励列表转换为张量,并移动到相应的设备上returns = torch.tensor(returns, dtype=torch.float,device=self.policy.device)action_probs = torch.cat(self.action_probs) # 将动作概率列表拼接为一个张量# loss = G * log pi (at | st)# -loss: since gradient ascent is being performedloss = -(returns * action_probs).sum() # 计算损失函数,使用累积奖励和动作概率的乘积的负和self.policy.optimizer.zero_grad() # 将优化器的梯度清零loss.backward() # 对损失函数进行反向传播,计算梯度self.policy.optimizer.step() # 对优化器进行一步更新,更新模型参数self.action_probs = [] # 清空动作概率列表self.rewards = [] # 清空奖励列表return loss # 返回损失值
play.py 使用策略梯度算法来训练月球着陆器智能体的Python脚本。
代码首先导入了一些必要的库,如 numpy, gym, torch, tqdm 等,并从 policy_network 模块中导入了 Agent 类,该类定义了智能体的策略网络和学习算法。
代码使用 argparse 库来解析命令行参数,如 epochs, lr, logdir, env, chkpt 等,这些参数可以用来控制训练的过程和结果。
代码使用 SummaryWriter 来记录训练的指标,如损失和奖励,并将它们保存在 log_dir 中,方便用 TensorBoard 可视化。
代码使用 gym.make() 来创建一个月球着陆器的环境,并使用 agent 对象来与环境交互,选择动作,获取奖励,更新网络参数等。
代码使用 tqdm 来显示训练的进度条,以及每个回合的奖励,平均奖励和平均损失等信息。
代码使用 torch.save() 来保存智能体的策略网络的状态,以便在之后加载或继续训练。
代码最后使用 writer.close() 和 env.close() 来关闭 SummaryWriter 和环境对象。
# 导入numpy库,用于进行数值计算
import numpy as np
# 导入gymnasium库,用于创建和管理强化学习的环境
import gymnasium as gym
# 导入SummaryWriter类,用于记录和可视化训练过程的数据
from torch.utils.tensorboard import SummaryWriter
# 导入save函数,并重命名为torch_save,用于保存和加载模型的参数
from torch import save as torch_save
# 导入Agent类,用于定义和实现策略梯度算法的智能体
from policy_network import Agent
# 导入tqdm类,用于显示训练过程的进度条
from tqdm import tqdm
# 导入ArgumentParser类,用于解析命令行参数
from argparse import ArgumentParser
# 导入os库,用于进行文件和路径的操作
import os# 如果当前文件是主程序,那么执行以下代码
if __name__ == "__main__":# 创建一个ArgumentParser对象,用于解析命令行参数parser = ArgumentParser()# 添加一个名为--epochs的参数,表示要玩的游戏的次数,默认为20000,类型为整数parser.add_argument("--epochs", default=20_000, type=int, help="Number of games to play")# 添加一个名为--lr的参数,表示策略网络的学习率,默认为0.0005,类型为浮点数parser.add_argument("--lr", default=0.0005, help="Learning rate for NN Policy Network", type=float)# 添加一个名为--logdir的参数,表示记录和可视化数据的目录,默认为./plays,类型为字符串parser.add_argument("--logdir", default="./plays", type=str)# 添加一个名为--env的参数,表示要玩的游戏的环境,默认为LunarLander-v2,类型为字符串parser.add_argument("--env", default="LunarLander-v2", type=str)# 添加一个名为--chkpt的参数,表示保存和加载模型参数的文件地址,默认为./agent,类型为字符串parser.add_argument("--chkpt",default="./agent",help="Save/Load checkpoint file address for model",type=str,)# 解析命令行参数,并赋值给args变量args = parser.parse_args()# 根据args.logdir和args.env拼接出完整的记录和可视化数据的目录,并赋值给log_dir变量log_dir = os.path.join(args.logdir, args.env)# 根据args.chkpt和args.env拼接出完整的保存和加载模型参数的文件地址,并赋值给chkpt变量chkpt = os.path.join(args.chkpt, f"{args.env}.pt")# 创建一个SummaryWriter对象,用于记录和可视化数据,指定log_dir为记录和可视化数据的目录writer = SummaryWriter(log_dir=log_dir)# 调用gym库的make函数,根据args.env创建一个强化学习的环境,并赋值给env变量env = gym.make(args.env)# 创建一个Agent对象,用于实现策略梯度算法,指定环境的状态空间维度,动作空间数量,和学习率,并赋值给agent变量agent = Agent(env.observation_space.shape[0], env.action_space.n, lr=args.lr)# 打印运行时的细节,包括游戏的环境,次数,记录和可视化数据的目录,和保存和加载模型参数的文件地址print("RunTime Details: ")print(f" > Playing - {args.env} for {args.epochs} episodes")print(f" > TensorBoard Logdir - {log_dir} Checkpoint File - {chkpt}")# 创建一个tqdm对象,用于显示训练过程的进度条,指定总数为args.epochs,描述为Playing episodeprogress_bar = tqdm(total=args.epochs, desc="Playing episode")# 创建一个numpy数组,用于存储每个游戏的奖励,形状为(args.epochs, 1),并赋值给episode_rewards变量episode_rewards = np.zeros((args.epochs, 1))# 创建一个numpy数组,用于存储每个游戏的损失,形状为(args.epochs, 1),并赋值给episode_losses变量episode_losses = np.zeros((args.epochs, 1))# 对于每个游戏,执行以下代码for epoch in range(args.epochs):# 初始化一个布尔变量done为False,表示游戏是否结束done = False# 调用环境的reset方法,重置环境,并返回初始的状态和信息,并赋值给obs和info变量obs, info = env.reset()# 当游戏没有结束时,执行以下代码while not done:# 调用智能体的choose_action方法,根据当前的状态选择一个动作,并赋值给action变量action = agent.choose_action(obs)# 调用环境的step方法,执行选择的动作,并返回新的状态,奖励,是否截断,是否终止,和信息,并赋值给obs, reward, truncated, terminated, info变量obs, reward, truncated, terminated, info = env.step(action)# 判断是否终止或截断,如果是,那么将done变量设为True,表示游戏结束done = terminated or truncated# 将奖励添加到智能体的rewards列表中agent.rewards.append(reward)# 计算智能体的rewards列表中的奖励之和,并赋值给episode_rewards数组的对应位置episode_rewards[epoch] = sum(agent.rewards)# 调用智能体的learn方法,根据累积奖励和动作概率更新策略网络的参数,并返回损失,并赋值给loss变量loss = agent.learn()# 将损失转换为cpu上的numpy数组,并赋值给episode_losses数组的对应位置episode_losses[epoch] = loss.cpu().detach().numpy()# 如果游戏的次数大于100,那么执行以下代码if epoch > 100:# 调用SummaryWriter对象的add_scalar方法,记录并可视化最近100个游戏的损失和奖励的均值,以及当前游戏的奖励,指定标签和步数writer.add_scalar("Play/mean_loss", episode_losses[epoch - 100 : epoch].mean(), epoch)writer.add_scalar("Play/mean_rewards", episode_rewards[epoch - 100 : epoch].mean(), epoch)writer.add_scalar("Play/episode_rewards", episode_rewards[epoch], epoch)# 调用SummaryWriter对象的flush方法,将数据写入文件writer.flush()# 调用tqdm对象的set_postfix_str方法,设置进度条的后缀字符串,包括当前游戏的奖励,最近100个游戏的奖励和损失的均值progress_bar.set_postfix_str(f"episode_reward - {float(episode_rewards[epoch][0]):.2f} "f"mean_rewards - {float(episode_rewards[epoch-100: epoch].mean()):.2f} "f"mean_loss - {float(episode_losses[epoch-100: epoch].mean()):.2f}")# 调用torch_save函数,保存策略网络的参数到chkpt文件torch_save(agent.policy.state_dict(), chkpt)# 调用tqdm对象的update方法,更新进度条progress_bar.update(1)# 调用SummaryWriter对象的close方法,关闭文件writer.close()# 调用环境的close方法,关闭环境env.close()
终端输出:
RunTime Details: > Playing - LunarLander-v2 for 20000 episodes> TensorBoard Logdir - ./plays\LunarLander-v2 Checkpoint File - ./agent\LunarLander-v2.pt
Playing episode: 1%|▏ | 208/20000 [00:26<51:43, 6.38it/s, episode_reward - -106.63 mean_rewards - -179.48 mean_loss - -12856.09]
play_test.py 这段代码的目的是让一个智能体(agent)在一个仿真环境(env)中玩一个叫做 LunarLander-v2 的游戏,并将游戏过程录制成视频。
这段代码的主要逻辑如下:
# 导入time库,用于获取和处理时间
import time
# 导入gymnasium库,用于创建和管理强化学习的环境
import gymnasium as gym
# 导入imageio库,用于读写图像和视频
import imageio
# 导入Agent类,用于定义和实现策略梯度算法的智能体
from policy_network import Agent
# 导入load函数,并重命名为torch_load,用于加载模型的参数
from torch import load as torch_load
# 导入ArgumentParser类,用于解析命令行参数
from argparse import ArgumentParser
# 导入os库,用于进行文件和路径的操作
import os# 导入VideoRecorder类,用于录制视频
from gymnasium.wrappers.monitoring.video_recorder import VideoRecorder# 如果当前文件是主程序,那么执行以下代码
if __name__ == "__main__":# 创建一个ArgumentParser对象,用于解析命令行参数parser = ArgumentParser()# 添加一个名为--env的参数,表示要玩的游戏的环境,默认为LunarLander-v2,类型为字符串parser.add_argument("--env", default="LunarLander-v2", type=str)# 添加一个名为--chkpt的参数,表示保存和加载模型参数的文件地址,默认为./agent,类型为字符串parser.add_argument("--chkpt",default="./agent/",help="Save/Load checkpoint file address for model",type=str,)# 添加一个名为--render_mode的参数,表示环境的渲染模式,默认为rgb_array,类型为字符串parser.add_argument("--render_mode", default="rgb_array", help="Render Mode for Env", type=str) # human rgb_array# 解析命令行参数,并赋值给args变量args = parser.parse_args()# 根据args.chkpt和args.env拼接出完整的保存和加载模型参数的文件地址,并赋值给chkpt变量chkpt = os.path.join(args.chkpt, f"{args.env}.pt")# 调用gym库的make函数,根据args.env和args.render_mode创建一个强化学习的环境,并赋值给env变量env = gym.make(args.env, render_mode=args.render_mode)# 使用 time 模块获取时间戳timestamp = time.time()# 使用 time 模块将时间戳转换为字符串time_str = time.strftime("%Y-%m-%d-%H-%M-%S", time.localtime(timestamp))# 在文件名前添加时间字符串,并赋值给new_filename变量new_filename = time_str + "_" + "LunarLander-v2.mp4" # 创建一个 VideoRecorder 对象,指定视频文件名为new_filename,并赋值给video变量video = VideoRecorder(env, new_filename)# 创建一个Agent对象,用于实现策略梯度算法,指定环境的状态空间维度,动作空间数量,和学习率,并赋值给agent变量agent = Agent(env.observation_space.shape[0], env.action_space.n, lr=None)# 调用torch_load函数,加载chkpt文件中的模型参数,并赋值给智能体的策略网络的状态字典agent.policy.load_state_dict(torch_load(chkpt))# 调用智能体的策略网络的eval方法,将其设置为评估模式,不进行梯度计算和参数更新agent.policy.eval()# 无限循环,直到用户输入非y的值while True:# 获取用户的输入,提示是否玩游戏,赋值给play_one变量play_one = input("Play game - [y/N] ")# 如果用户输入y,那么执行以下代码if play_one == "y":# 初始化一个布尔变量done为False,表示游戏是否结束done = False# 调用环境的reset方法,重置环境,并返回初始的状态和信息,并赋值给obs和info变量obs, info = env.reset()# 当游戏没有结束时,执行以下代码while not done:# 调用环境的render方法,渲染环境,获取当前帧的图像env.render() # 调用VideoRecorder对象的capture_frame方法,捕捉当前帧,并将其写入视频文件video.capture_frame() # 调用智能体的choose_action方法,根据当前的状态选择一个动作,并赋值给action变量action = agent.choose_action(obs)# 调用环境的step方法,执行选择的动作,并返回新的状态,奖励,是否截断,是否终止,和信息,并赋值给obs, reward, truncated, terminated, info变量obs, reward, truncated, terminated, info = env.step(action)# 判断是否终止或截断,如果是,那么将done变量设为True,表示游戏结束done = truncated or terminated# 调用VideoRecorder对象的close方法,关闭视频文件video.close() # 调用环境的close方法,关闭环境env.close() # 跳出循环break# 否则,跳出循环else:break
录制的四次游戏视频:
结 语
策略梯度算法是一类强化学习算法的统称,它们都是基于策略梯度定理来更新策略参数的。策略梯度算法有很多种,其中一些常见的有:
REINFORCE:这是一种基于蒙特卡洛方法的策略梯度算法,它使用整个轨迹的回报作为动作值函数的估计,然后根据策略梯度定理更新策略参数。
Actor-Critic:这是一种结合了值函数和策略函数的策略梯度算法,它使用一个**演员(Actor)网络来输出策略,一个评论者(Critic)**网络来输出动作值函数,然后根据策略梯度定理和值函数的梯度更新策略参数和值函数参数。
TRPO:这是一种基于自然梯度的策略梯度算法,它使用一个**信任域(Trust Region)**来限制策略的更新幅度,从而保证策略的改进不会过大,导致性能下降。
PPO:这是一种基于比例裁剪的策略梯度算法,它使用一个**目标函数(Objective Function)**来衡量策略的改进,然后通过裁剪策略比率来避免策略的更新过大或过小,从而提高策略的稳定性和效率。
这些只是策略梯度算法的一部分,还有很多其他的策略梯度算法,如A2C,A3C,DDPG,TD3,SAC等。如果您想了解更多关于策略梯度算法的内容,您可以参考以下的资源:
一篇介绍策略梯度算法原理和实现的博客文章:https://zhuanlan.zhihu.com/p/21725498
一本介绍强化学习和策略梯度算法的书籍:《强化学习:原理与Python实现》
一份包含策略梯度算法的代码示例的GitHub仓库:https://github.com/ShangtongZhang/reinforcement-learning-an-introduction
The End