PokéLLMon 源码解析(四)

.\PokeLLMon\poke_env\exceptions.py

"""
This module contains exceptions.
"""# 定义一个自定义异常类 ShowdownException,继承自内置异常类 Exception
class ShowdownException(Exception):"""This exception is raised when a non-managed messageis received from the server."""# 当从服务器接收到非受控消息时引发此异常pass

.\PokeLLMon\poke_env\player\baselines.py

# 导入必要的模块
from typing import List
import json
import osfrom poke_env.environment.abstract_battle import AbstractBattle
from poke_env.environment.double_battle import DoubleBattle
from poke_env.environment.move_category import MoveCategory
from poke_env.environment.pokemon import Pokemon
from poke_env.environment.side_condition import SideCondition
from poke_env.player.player import Player
from poke_env.data.gen_data import GenData# 从文件中加载招式效果数据
with open("./poke_env/data/static/moves/moves_effect.json", "r") as f:move_effect = json.load(f)# 计算招式类型的伤害倍率
def calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list):# 定义所有可能的宝可梦类型TYPE_list = 'BUG,DARK,DRAGON,ELECTRIC,FAIRY,FIGHTING,FIRE,FLYING,GHOST,GRASS,GROUND,ICE,NORMAL,POISON,PSYCHIC,ROCK,STEEL,WATER'.split(",")move_type_damage_multiplier_list = []# 如果存在第二个类型if type_2:# 计算每种类型对应的伤害倍率for type in TYPE_list:move_type_damage_multiplier_list.append(type_chart[type_1][type] * type_chart[type_2][type])move_type_damage_multiplier_dict = dict(zip(TYPE_list, move_type_damage_multiplier_list))else:move_type_damage_multiplier_dict = type_chart[type_1]effective_type_list = []extreme_type_list = []resistant_type_list = []extreme_resistant_type_list = []immune_type_list = []# 根据伤害倍率将类型分为不同的类别for type, value in move_type_damage_multiplier_dict.items():if value == 2:effective_type_list.append(type)elif value == 4:extreme_type_list.append(type)elif value == 1 / 2:resistant_type_list.append(type)elif value == 1 / 4:extreme_resistant_type_list.append(type)elif value == 0:immune_type_list.append(type)else:  # value == 1continue# 如果约束类型列表不为空if constraint_type_list:# 更新极端类型列表,取交集extreme_type_list = list(set(extreme_type_list).intersection(set(constraint_type_list)))# 更新有效类型列表,取交集effective_type_list = list(set(effective_type_list).intersection(set(constraint_type_list)))# 更新抗性类型列表,取交集resistant_type_list = list(set(resistant_type_list).intersection(set(constraint_type_list)))# 更新极端抗性类型列表,取交集extreme_resistant_type_list = list(set(extreme_resistant_type_list).intersection(set(constraint_type_list)))# 更新免疫类型列表,取交集immune_type_list = list(set(immune_type_list).intersection(set(constraint_type_list)))# 返回更新后的各类型列表return extreme_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list
# 定义一个函数,根据给定的参数计算并返回对应的移动类型伤害提示
def move_type_damage_wraper(pokemon_name, type_1, type_2, type_chart, constraint_type_list=None):# 初始化移动类型伤害提示字符串move_type_damage_prompt = ""# 调用函数计算移动类型伤害倍数,得到各种类型的列表extreme_effective_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list = calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list)# 如果存在有效的、抵抗的或免疫的类型列表if effective_type_list or resistant_type_list or immune_type_list:# 构建移动类型伤害提示字符串move_type_damage_prompt = f"{pokemon_name}"if extreme_effective_type_list:move_type_damage_prompt = move_type_damage_prompt + " can be super-effectively attacked by " + ", ".join(extreme_effective_type_list) + " moves"if effective_type_list:move_type_damage_prompt = move_type_damage_prompt + ", can be effectively attacked by " + ", ".join(effective_type_list) + " moves"if resistant_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is resistant to " + ", ".join(resistant_type_list) + " moves"if extreme_resistant_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is super-resistant to " + ", ".join(extreme_resistant_type_list) + " moves"if immune_type_list:move_type_damage_prompt = move_type_damage_prompt + ", is immuned to " + ", ".join(immune_type_list) + " moves"# 返回移动类型伤害提示字符串return move_type_damage_prompt# 定义一个类,继承自Player类,实现最大基础伤害玩家
class MaxBasePowerPlayer(Player):# 重写choose_move方法def choose_move(self, battle: AbstractBattle):# 如果存在可用的移动if battle.available_moves:# 选择基础伤害最大的移动best_move = max(battle.available_moves, key=lambda move: move.base_power)return self.create_order(best_move)# 如果没有可用的移动,则随机选择一个移动return self.choose_random_move(battle)# 定义一个类,继承自Player类,实现简单启发式玩家
class SimpleHeuristicsPlayer(Player):# 定义了各种入场危害效果,将字符串映射到对应的SideCondition枚举值ENTRY_HAZARDS = {"spikes": SideCondition.SPIKES,"stealhrock": SideCondition.STEALTH_ROCK,"stickyweb": SideCondition.STICKY_WEB,"toxicspikes": SideCondition.TOXIC_SPIKES,}# 定义了反危害招式,使用集合存储ANTI_HAZARDS_MOVES = {"rapidspin", "defog"}# 定义了速度等级系数SPEED_TIER_COEFICIENT = 0.1# 定义了生命值分数系数HP_FRACTION_COEFICIENT = 0.4# 定义了交换出场匹配阈值SWITCH_OUT_MATCHUP_THRESHOLD = -2# 估算对战情况,返回得分def _estimate_matchup(self, mon: Pokemon, opponent: Pokemon):# 计算对手对我方造成的伤害倍率的最大值score = max([opponent.damage_multiplier(t) for t in mon.types if t is not None])# 减去我方对对手造成的伤害倍率的最大值score -= max([mon.damage_multiplier(t) for t in opponent.types if t is not None])# 根据速度等级差异调整得分if mon.base_stats["spe"] > opponent.base_stats["spe"]:score += self.SPEED_TIER_COEFICIENTelif opponent.base_stats["spe"] > mon.base_stats["spe"]:score -= self.SPEED_TIER_COEFICIENT# 根据生命值分数调整得分score += mon.current_hp_fraction * self.HP_FRACTION_COEFICIENTscore -= opponent.current_hp_fraction * self.HP_FRACTION_COEFICIENTreturn score# 判断是否应该使用极巨化def _should_dynamax(self, battle: AbstractBattle, n_remaining_mons: int):if battle.can_dynamax and self._dynamax_disable is False:# 最后一个满血的精灵if (len([m for m in battle.team.values() if m.current_hp_fraction == 1])== 1and battle.active_pokemon.current_hp_fraction == 1):return True# 有优势且双方都是满血if (self._estimate_matchup(battle.active_pokemon, battle.opponent_active_pokemon)> 0and battle.active_pokemon.current_hp_fraction == 1and battle.opponent_active_pokemon.current_hp_fraction == 1):return True# 只剩下一个精灵if n_remaining_mons == 1:return Truereturn False# 判断是否应该替换出当前精灵def _should_switch_out(self, battle: AbstractBattle):# 获取当前精灵和对手精灵active = battle.active_pokemonopponent = battle.opponent_active_pokemon# 如果有一个适合替换的精灵...if [mfor m in battle.available_switchesif self._estimate_matchup(m, opponent) > 0]:# ...并且有一个“好”的理由替换出去if active.boosts["def"] <= -3 or active.boosts["spd"] <= -3:return Trueif (active.boosts["atk"] <= -3and active.stats["atk"] >= active.stats["spa"]):return Trueif (active.boosts["spa"] <= -3and active.stats["atk"] <= active.stats["spa"]):return Trueif (self._estimate_matchup(active, opponent)< self.SWITCH_OUT_MATCHUP_THRESHOLD):return Truereturn False# 估算精灵的状态def _stat_estimation(self, mon: Pokemon, stat: str):# 计算状态提升值if mon.boosts[stat] > 1:boost = (2 + mon.boosts[stat]) / 2else:boost = 2 / (2 - mon.boosts[stat])return ((2 * mon.base_stats[stat] + 31) + 5) * boost# 计算奖励值def calc_reward(self, current_battle: AbstractBattle) -> float:# 计算奖励值return self.reward_computing_helper(current_battle, fainted_value=2.0, hp_value=1.0, victory_value=30.0)# 根据状态和等级返回加成倍数def boost_multiplier(self, state, level):# 如果状态是准确度if state == "accuracy":# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.33if level == 2:return 1.66if level == 3:return 2.0if level == 4:return 2.5if level == 5:return 2.66if level == 6:return 3.0if level == -1:return 0.75if level == -2:return 0.6if level == -3:return 0.5if level == -4:return 0.43if level == -5:return 0.36if level == -6:return 0.33# 如果状态不是准确度else:# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.5if level == 2:return 2.0if level == 3:return 2.5if level == 4:return 3.0if level == 5:return 3.5if level == 6:return 4.0if level == -1:return 0.67if level == -2:return 0.5if level == -3:return 0.4if level == -4:return 0.33if level == -5:return 0.29if level == -6:return 0.25# 检查给定状态的值,并返回相应的状态字符串def check_status(self, status):# 如果状态存在if status:# 根据状态值返回相应的状态字符串if status.value == 1:return "burnt"elif status.value == 2:return "fainted"elif status.value == 3:return "frozen"elif status.value == 4:return "paralyzed"elif status.value == 5:return "poisoned"elif status.value == 7:return "toxic"elif status.value == 6:return "sleeping"# 如果状态不存在,则返回"healthy"else:return "healthy"

.\PokeLLMon\poke_env\player\battle_order.py

# 从 dataclasses 模块中导入 dataclass 装饰器
# 从 typing 模块中导入 Any, List, Optional, Union 类型
# 从 poke_env.environment.double_battle 模块中导入 DoubleBattle 类
# 从 poke_env.environment.move 模块中导入 Move 类
# 从 poke_env.environment.pokemon 模块中导入 Pokemon 类
from dataclasses import dataclass
from typing import Any, List, Optional, Union# 定义一个名为 BattleOrder 的数据类
@dataclass
class BattleOrder:# order 属性可以是 Move 或 Pokemon 类型,初始值为 Noneorder: Optional[Union[Move, Pokemon]]# mega, z_move, dynamax, terastallize, move_target 属性的默认值mega: bool = Falsez_move: bool = Falsedynamax: bool = Falseterastallize: bool = Falsemove_target: int = DoubleBattle.EMPTY_TARGET_POSITION# 默认的指令字符串DEFAULT_ORDER = "/choose default"# 返回对象的字符串表示形式def __str__(self) -> str:return self.message# 返回消息字符串@propertydef message(self) -> str:# 如果 order 是 Move 类型if isinstance(self.order, Move):# 如果 order 的 id 是 "recharge"if self.order.id == "recharge":return "/choose move 1"# 构建消息字符串message = f"/choose move {self.order.id}"if self.mega:message += " mega"elif self.z_move:message += " zmove"elif self.dynamax:message += " dynamax"elif self.terastallize:message += " terastallize"# 如果 move_target 不是空目标位置if self.move_target != DoubleBattle.EMPTY_TARGET_POSITION:message += f" {self.move_target}"return message# 如果 order 是 Pokemon 类型elif isinstance(self.order, Pokemon):return f"/choose switch {self.order.species}"else:return ""# 定义一个名为 DefaultBattleOrder 的类,继承自 BattleOrder 类
class DefaultBattleOrder(BattleOrder):# 初始化方法,不执行任何操作def __init__(self, *args: Any, **kwargs: Any):pass# 返回默认指令字符串@propertydef message(self) -> str:return self.DEFAULT_ORDER# 定义一个名为 DoubleBattleOrder 的数据类,继承自 BattleOrder 类
@dataclass
class DoubleBattleOrder(BattleOrder):# 初始化方法,接受两个可选的 BattleOrder 参数def __init__(self,first_order: Optional[BattleOrder] = None,second_order: Optional[BattleOrder] = None,):self.first_order = first_orderself.second_order = second_order# 返回消息字符串@property# 返回合并后的消息字符串def message(self) -> str:# 如果存在第一和第二指令,则返回两者消息的组合if self.first_order and self.second_order:return (self.first_order.message+ ", "+ self.second_order.message.replace("/choose ", ""))# 如果只存在第一指令,则返回第一指令消息和默认消息的组合elif self.first_order:return self.first_order.message + ", default"# 如果只存在第二指令,则返回第二指令消息和默认消息的组合elif self.second_order:return self.second_order.message + ", default"# 如果都不存在指令,则返回默认指令消息else:return self.DEFAULT_ORDER# 静态方法,用于合并第一和第二指令列表生成双重战斗指令列表@staticmethoddef join_orders(first_orders: List[BattleOrder], second_orders: List[BattleOrder]):# 如果第一和第二指令列表都存在if first_orders and second_orders:# 生成双重战斗指令列表,排除特定条件下的指令orders = [DoubleBattleOrder(first_order=first_order, second_order=second_order)for first_order in first_ordersfor second_order in second_ordersif not first_order.mega or not second_order.megaif not first_order.z_move or not second_order.z_moveif not first_order.dynamax or not second_order.dynamaxif not first_order.terastallize or not second_order.terastallizeif first_order.order != second_order.order]# 如果生成了双重战斗指令列表,则返回该列表if orders:return orders# 如果只存在第一指令列表,则生成只包含第一指令的双重战斗指令列表elif first_orders:return [DoubleBattleOrder(first_order=order) for order in first_orders]# 如果只存在第二指令列表,则生成只包含第二指令的双重战斗指令列表elif second_orders:return [DoubleBattleOrder(first_order=order) for order in second_orders]# 如果两个指令列表都不存在,则返回只包含默认指令的双重战斗指令列表return [DefaultBattleOrder()]
# 定义一个名为ForfeitBattleOrder的类,继承自BattleOrder类
class ForfeitBattleOrder(BattleOrder):# 初始化方法,接受任意数量的位置参数和关键字参数def __init__(self, *args: Any, **kwargs: Any):# pass表示不做任何操作,保持方法的结构完整pass# 定义一个名为message的属性,返回字符串"/forfeit"@propertydef message(self) -> str:return "/forfeit"

.\PokeLLMon\poke_env\player\gpt_player.py

import json  # 导入 json 模块
import os  # 导入 os 模块
import random  # 导入 random 模块
from typing import List  # 导入 List 类型提示
from poke_env.environment.abstract_battle import AbstractBattle  # 导入 AbstractBattle 类
from poke_env.environment.double_battle import DoubleBattle  # 导入 DoubleBattle 类
from poke_env.environment.move_category import MoveCategory  # 导入 MoveCategory 类
from poke_env.environment.pokemon import Pokemon  # 导入 Pokemon 类
from poke_env.environment.side_condition import SideCondition  # 导入 SideCondition 类
from poke_env.player.player import Player, BattleOrder  # 导入 Player 和 BattleOrder 类
from typing import Dict, List, Optional, Union  # 导入 Dict, List, Optional, Union 类型提示
from poke_env.environment.move import Move  # 导入 Move 类
import time  # 导入 time 模块
import json  # 再次导入 json 模块(重复导入)
from openai import OpenAI  # 导入 OpenAI 类
from poke_env.data.gen_data import GenData  # 导入 GenData 类def calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list):TYPE_list = 'BUG,DARK,DRAGON,ELECTRIC,FAIRY,FIGHTING,FIRE,FLYING,GHOST,GRASS,GROUND,ICE,NORMAL,POISON,PSYCHIC,ROCK,STEEL,WATER'.split(",")move_type_damage_multiplier_list = []  # 初始化一个空列表,用于存储每种类型的伤害倍率if type_2:  # 如果存在第二种类型for type in TYPE_list:  # 遍历每种类型move_type_damage_multiplier_list.append(type_chart[type_1][type] * type_chart[type_2][type])  # 计算两种类型之间的伤害倍率并添加到列表中move_type_damage_multiplier_dict = dict(zip(TYPE_list, move_type_damage_multiplier_list))  # 将类型和对应的伤害倍率组成字典else:  # 如果只有一种类型move_type_damage_multiplier_dict = type_chart[type_1]  # 直接使用第一种类型的伤害倍率字典effective_type_list = []  # 初始化有效类型列表extreme_type_list = []  # 初始化极效类型列表resistant_type_list = []  # 初始化抵抗类型列表extreme_resistant_type_list = []  # 初始化极度抵抗类型列表immune_type_list = []  # 初始化免疫类型列表for type, value in move_type_damage_multiplier_dict.items():  # 遍历每种类型及其对应的伤害倍率if value == 2:  # 如果伤害倍率为 2effective_type_list.append(type)  # 添加到有效类型列表elif value == 4:  # 如果伤害倍率为 4extreme_type_list.append(type)  # 添加到极效类型列表elif value == 1 / 2:  # 如果伤害倍率为 1/2resistant_type_list.append(type)  # 添加到抵抗类型列表elif value == 1 / 4:  # 如果伤害倍率为 1/4extreme_resistant_type_list.append(type)  # 添加到极度抵抗类型列表elif value == 0:  # 如果伤害倍率为 0immune_type_list.append(type)  # 添加到免疫类型列表else:  # 如果伤害倍率为 1continue  # 继续循环# 如果约束类型列表不为空if constraint_type_list:# 将极端类型列表与约束类型列表的交集作为新的极端类型列表extreme_type_list = list(set(extreme_type_list).intersection(set(constraint_type_list)))# 将有效类型列表与约束类型列表的交集作为新的有效类型列表effective_type_list = list(set(effective_type_list).intersection(set(constraint_type_list)))# 将抗性类型列表与约束类型列表的交集作为新的抗性类型列表resistant_type_list = list(set(resistant_type_list).intersection(set(constraint_type_list)))# 将极端抗性类型列表与约束类型列表的交集作为新的极端抗性类型列表extreme_resistant_type_list = list(set(extreme_resistant_type_list).intersection(set(constraint_type_list)))# 将免疫类型列表与约束类型列表的交集作为新的免疫类型列表immune_type_list = list(set(immune_type_list).intersection(set(constraint_type_list)))# 返回各类型列表的首字母大写形式return (list(map(lambda x: x.capitalize(), extreme_type_list)),list(map(lambda x: x.capitalize(), effective_type_list)),list(map(lambda x: x.capitalize(), resistant_type_list)),list(map(lambda x: x.capitalize(), extreme_resistant_type_list)),list(map(lambda x: x.capitalize(), immune_type_list)))
# 定义一个函数,用于计算给定精灵对应的移动类型伤害提示
def move_type_damage_wraper(pokemon, type_chart, constraint_type_list=None):# 初始化变量,用于存储精灵的两种类型type_1 = Nonetype_2 = None# 如果精灵有第一种类型if pokemon.type_1:# 获取第一种类型的名称type_1 = pokemon.type_1.name# 如果精灵有第二种类型if pokemon.type_2:# 获取第二种类型的名称type_2 = pokemon.type_2.name# 初始化移动类型伤害提示字符串move_type_damage_prompt = ""# 调用函数计算移动类型伤害倍数,得到不同类型的列表extreme_effective_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list = calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list)# 根据不同类型的列表生成移动类型伤害提示if extreme_effective_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(extreme_effective_type_list) +f"-type attack is extremely-effective (4x damage) to {pokemon.species}.")if effective_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(effective_type_list) +f"-type attack is super-effective (2x damage) to {pokemon.species}.")if resistant_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(resistant_type_list) +f"-type attack is ineffective (0.5x damage) to {pokemon.species}.")if extreme_resistant_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(extreme_resistant_type_list) +f"-type attack is highly ineffective (0.25x damage) to {pokemon.species}.")if immune_type_list:move_type_damage_prompt = (move_type_damage_prompt + " " + ", ".join(immune_type_list) +f"-type attack is zero effect (0x damage) to {pokemon.species}.")# 返回移动类型伤害提示字符串return move_type_damage_prompt# 定义一个类,继承自Player类
class LLMPlayer(Player):# 使用 OpenAI API 进行对话生成,返回生成的文本def chatgpt(self, system_prompt, user_prompt, model, temperature=0.7, json_format=False, seed=None, stop=[], max_tokens=200) -> str:# 创建 OpenAI 客户端对象client = OpenAI(api_key=self.api_key)# 如果需要返回 JSON 格式的响应if json_format:# 调用 API 完成对话生成,返回 JSON 格式的响应response = client.chat.completions.create(response_format={"type": "json_object"},model=model,messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=temperature,stream=False,# seed=seed,stop=stop,max_tokens=max_tokens)else:# 调用 API 完成对话生成response = client.chat.completions.create(model=model,messages=[{"role": "system", "content": system_prompt},{"role": "user", "content": user_prompt}],temperature=temperature,stream=False,# seed=seed,max_tokens=max_tokens,stop=stop)# 获取生成的文本内容outputs = response.choices[0].message.content# 记录完成的 token 数量self.completion_tokens += response.usage.completion_tokens# 记录 prompt 的 token 数量self.prompt_tokens += response.usage.prompt_tokens# 返回生成的文本return outputs# 估算两只精灵之间的对战得分def _estimate_matchup(self, mon: Pokemon, opponent: Pokemon):# 计算对手对该精灵造成的伤害加成中的最大值score = max([opponent.damage_multiplier(t) for t in mon.types if t is not None])# 计算该精灵对对手造成的伤害加成中的最大值score -= max([mon.damage_multiplier(t) for t in opponent.types if t is not None])# 根据速度判断得分if mon.base_stats["spe"] > opponent.base_stats["spe"]:score += self.SPEED_TIER_COEFICIENTelif opponent.base_stats["spe"] > mon.base_stats["spe"]:score -= self.SPEED_TIER_COEFICIENT# 根据当前生命值比例调整得分score += mon.current_hp_fraction * self.HP_FRACTION_COEFICIENTscore -= opponent.current_hp_fraction * self.HP_FRACTION_COEFICIENTreturn score# 判断是否应该使用极巨化def _should_dynamax(self, battle: AbstractBattle):# 统计队伍中剩余未倒下的精灵数量n_remaining_mons = len([m for m in battle.team.values() if m.fainted is False])if battle.can_dynamax and self._dynamax_disable is False:# 如果只剩下一只全血的精灵if (len([m for m in battle.team.values() if m.current_hp_fraction == 1])== 1and battle.active_pokemon.current_hp_fraction == 1):return True# 如果有对战优势且双方都是全血状态if (self._estimate_matchup(battle.active_pokemon, battle.opponent_active_pokemon)> 0and battle.active_pokemon.current_hp_fraction == 1and battle.opponent_active_pokemon.current_hp_fraction == 1):return True# 如果只剩下一只精灵if n_remaining_mons == 1:return Truereturn False# 解析LLM输出,找到JSON内容的起始位置json_start = llm_output.find('{')# 找到JSON内容的结束位置,从后往前找第一个}json_end = llm_output.rfind('}') + 1# 提取JSON内容json_content = llm_output[json_start:json_end]# 将JSON内容加载为Python对象llm_action_json = json.loads(json_content)# 初始化下一个动作为Nonenext_action = None# 如果JSON中包含"move"字段if "move" in llm_action_json.keys():# 获取LLM中的移动ID并处理格式llm_move_id = llm_action_json["move"]llm_move_id = llm_move_id.replace(" ","").replace("-", "")# 遍历可用的移动列表,匹配LLM中的移动IDfor i, move in enumerate(battle.available_moves):if move.id.lower() == llm_move_id.lower():# 创建相应的移动指令next_action = self.create_order(move, dynamax=self._should_dynamax(battle))# 如果JSON中包含"switch"字段elif "switch" in llm_action_json.keys():# 获取LLM中的交换精灵种类并匹配可用的交换精灵列表llm_switch_species = llm_action_json["switch"]for i, pokemon in enumerate(battle.available_switches):if pokemon.species.lower() == llm_switch_species.lower():# 创建相应的交换指令next_action = self.create_order(pokemon)# 如果下一个动作仍为None,则抛出数值错误异常if next_action is None:raise ValueError("Value Error")# 返回下一个动作return next_action# 解析LLM输出,找到JSON内容的起始位置json_start = llm_output.find('{')# 找到JSON内容的结束位置,从后往前找第一个}json_end = llm_output.rfind('}') + 1# 提取JSON内容json_content = llm_output[json_start:json_end]# 将JSON内容转换为Python对象llm_action_json = json.loads(json_content)next_action = None# 获取动作和目标action = llm_action_json["decision"]["action"]target = llm_action_json["decision"]["target"]# 处理目标字符串,去除空格和下划线target = target.replace(" ", "").replace("_", "")# 如果动作是移动if action.lower() == "move":# 遍历可用的移动for i, move in enumerate(battle.available_moves):# 如果移动ID匹配目标if move.id.lower() == target.lower():# 创建移动指令next_action = self.create_order(move, dynamax=self._should_dynamax(battle))# 如果动作是交换elif action.lower() == "switch":# 遍历可用的交换精灵for i, pokemon in enumerate(battle.available_switches):# 如果精灵种类匹配目标if pokemon.species.lower() == target.lower():# 创建交换指令next_action = self.create_order(pokemon)# 如果没有找到下一步动作,抛出数值错误if next_action is None:raise ValueError("Value Error")# 返回下一步动作return next_action# 检查状态并返回对应的字符串def check_status(self, status):if status:if status.value == 1:return "burnt"elif status.value == 2:return "fainted"elif status.value == 3:return "frozen"elif status.value == 4:return "paralyzed"elif status.value == 5:return "poisoned"elif status.value == 7:return "toxic"elif status.value == 6:return "sleeping"else:return ""# 根据状态和等级返回加成倍数def boost_multiplier(self, state, level):# 如果状态是准确度if state == "accuracy":# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.33if level == 2:return 1.66if level == 3:return 2.0if level == 4:return 2.5if level == 5:return 2.66if level == 6:return 3.0if level == -1:return 0.75if level == -2:return 0.6if level == -3:return 0.5if level == -4:return 0.43if level == -5:return 0.36if level == -6:return 0.33# 如果状态不是准确度else:# 根据等级返回对应的加成倍数if level == 0:return 1.0if level == 1:return 1.5if level == 2:return 2.0if level == 3:return 2.5if level == 4:return 3.0if level == 5:return 3.5if level == 6:return 4.0if level == -1:return 0.67if level == -2:return 0.5if level == -3:return 0.4if level == -4:return 0.33if level == -5:return 0.29if level == -6:return 0.25# 返回战斗摘要信息,包括击败得分、剩余得分、胜利列表和标签列表def battle_summary(self):# 初始化空列表用于存储击败得分、剩余得分、胜利列表和标签列表beat_list = []remain_list = []win_list = []tag_list = []# 遍历每场战斗,计算击败得分、剩余得分、是否胜利以及标签for tag, battle in self.battles.items():beat_score = 0# 计算对手队伍的击败得分for mon in battle.opponent_team.values():beat_score += (1-mon.current_hp_fraction)beat_list.append(beat_score)remain_score = 0# 计算己方队伍的剩余得分for mon in battle.team.values():remain_score += mon.current_hp_fractionremain_list.append(remain_score)# 如果战斗胜利,则在胜利列表中添加1if battle.won:win_list.append(1)tag_list.append(tag)# 返回击败得分列表、剩余得分列表、胜利列表和标签列表return beat_list, remain_list, win_list, tag_list# 辅助计算奖励值的函数def reward_computing_helper(self,battle: AbstractBattle,*,fainted_value: float = 0.0,hp_value: float = 0.0,number_of_pokemons: int = 6,starting_value: float = 0.0,status_value: float = 0.0,victory_value: float = 1.0,) -> float:"""A helper function to compute rewards."""# 如果战斗不在奖励缓冲区中,则将其添加,并设置初始值if battle not in self._reward_buffer:self._reward_buffer[battle] = starting_valuecurrent_value = 0# 遍历我方队伍中的每只精灵for mon in battle.team.values():# 根据当前生命值比例计算当前值current_value += mon.current_hp_fraction * hp_value# 如果精灵已经倒下,则减去倒下值if mon.fainted:current_value -= fainted_value# 如果精灵有异常状态,则减去异常状态值elif mon.status is not None:current_value -= status_value# 根据己方队伍中精灵数量与总精灵数量的差值计算当前值current_value += (number_of_pokemons - len(battle.team)) * hp_value# 遍历对方队伍中的每只精灵for mon in battle.opponent_team.values():# 根据当前生命值比例计算当前值current_value -= mon.current_hp_fraction * hp_value# 如果精灵已经倒下,则加上倒下值if mon.fainted:current_value += fainted_value# 如果精灵有异常状态,则加上异常状态值elif mon.status is not None:current_value += status_value# 根据对方队伍中精灵数量与总精灵数量的差值计算当前值current_value -= (number_of_pokemons - len(battle.opponent_team)) * hp_value# 如果战斗胜利,则加上胜利值if battle.won:current_value += victory_value# 如果战斗失败,则减去胜利值elif battle.lost:current_value -= victory_value# 计算当前值与奖励缓冲区中的值的差值作为返回值to_return = current_value - self._reward_buffer[battle] # the return value is the deltaself._reward_buffer[battle] = current_valuereturn to_returndef choose_max_damage_move(self, battle: AbstractBattle):# 如果有可用的招式,则选择基础威力最大的招式if battle.available_moves:best_move = max(battle.available_moves, key=lambda move: move.base_power)return self.create_order(best_move)# 如果没有可用的招式,则随机选择一个招式return self.choose_random_move(battle)

.\PokeLLMon\poke_env\player\llama_player.py

# 导入所需的模块
from poke_env.player.gpt_player import LLMPlayer
from poke_env.environment.abstract_battle import AbstractBattle
import json
from peft import PeftModel
import transformers
import torch
from poke_env.player.player import BattleOrder# 设置空字符串作为默认的令牌
my_token = ""
# 定义忽略索引
IGNORE_INDEX = -100
# 定义默认的填充令牌
DEFAULT_PAD_TOKEN = "[PAD]"
# 定义默认的结束令牌
DEFAULT_EOS_TOKEN = "</s>"
# 定义默认的开始令牌
DEFAULT_BOS_TOKEN = "<s>"
# 定义默认的未知令牌
DEFAULT_UNK_TOKEN = "<unk>"# 定义 LLAMAPlayer 类,继承自 LLMPlayer
class LLAMAPlayer(LLMPlayer):# 初始化函数,接受多个参数def __init__(self, battle_format,model_name_or_path: str = "",# tokenizer_path: str = "",lora_weights: str = "",model_max_length: int = 2048,w_reason = False,log_dir = "",account_configuration=None,server_configuration=None,):# 调用父类的初始化函数super().__init__(battle_format=battle_format,account_configuration=account_configuration,server_configuration=server_configuration)# 初始化 LLAMA 模型# 加载 LLAMA 模型self.except_cnt = 0self.total_cnt = 0self.log_dir = log_dirself.w_reason = w_reasonself.last_output = Noneself.last_state_prompt = None# 断言确保模型路径已指定assert (model_name_or_path), "Please specify the model path"# 使用指定的模型路径加载 tokenizerself.tokenizer = transformers.AutoTokenizer.from_pretrained(model_name_or_path,model_max_length=model_max_length,padding_side="right",use_fast=False,use_auth_token=my_token)# 使用指定的模型路径加载模型self.model = transformers.AutoModelForCausalLM.from_pretrained(model_name_or_path,load_in_8bit=False,torch_dtype=torch.float16,device_map="auto",use_auth_token=my_token)# 如果有 LoRA 权重,则加载if lora_weights:print("Recover LoRA weights..")self.model = PeftModel.from_pretrained(self.model,lora_weights,torch_dtype=torch.float16,)# 输出加载完成信息print("Loading finished...")# 设置模型为评估模式self.model.eval()

.\PokeLLMon\poke_env\player\openai_api.py

"""This module defines a player class with the OpenAI API on the main thread.
For a black-box implementation consider using the module env_player.
"""
# 导入必要的模块
from __future__ import annotationsimport asyncio
import copy
import random
import time
from abc import ABC, abstractmethod
from logging import Logger
from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, Tuple, Union# 导入自定义模块
from gymnasium.core import ActType, Env, ObsType
from gymnasium.spaces import Discrete, Space# 导入自定义模块
from poke_env.concurrency import POKE_LOOP, create_in_poke_loop
from poke_env.environment.abstract_battle import AbstractBattle
from poke_env.player.battle_order import BattleOrder, ForfeitBattleOrder
from poke_env.player.player import Player
from poke_env.ps_client import AccountConfiguration
from poke_env.ps_client.server_configuration import (LocalhostServerConfiguration,ServerConfiguration,
)
from poke_env.teambuilder.teambuilder import Teambuilder# 定义一个异步队列类
class _AsyncQueue:def __init__(self, queue: asyncio.Queue[Any]):self.queue = queue# 异步获取队列中的元素async def async_get(self):return await self.queue.get()# 获取队列中的元素def get(self):res = asyncio.run_coroutine_threadsafe(self.queue.get(), POKE_LOOP)return res.result()# 异步向队列中放入元素async def async_put(self, item: Any):await self.queue.put(item)# 向队列中放入元素def put(self, item: Any):task = asyncio.run_coroutine_threadsafe(self.queue.put(item), POKE_LOOP)task.result()# 判断队列是否为空def empty(self):return self.queue.empty()# 阻塞直到队列中的所有元素都被处理def join(self):task = asyncio.run_coroutine_threadsafe(self.queue.join(), POKE_LOOP)task.result()# 异步等待队列中的所有元素都被处理async def async_join(self):await self.queue.join()# 定义一个异步玩家类
class _AsyncPlayer(Generic[ObsType, ActType], Player):actions: _AsyncQueueobservations: _AsyncQueuedef __init__(self,user_funcs: OpenAIGymEnv[ObsType, ActType],username: str,**kwargs: Any,# 定义一个类,继承自AsyncPlayer类):# 设置类名为usernameself.__class__.__name__ = username# 调用父类的初始化方法super().__init__(**kwargs)# 设置类名为"_AsyncPlayer"self.__class__.__name__ = "_AsyncPlayer"# 初始化observations为一个异步队列self.observations = _AsyncQueue(create_in_poke_loop(asyncio.Queue, 1))# 初始化actions为一个异步队列self.actions = _AsyncQueue(create_in_poke_loop(asyncio.Queue, 1))# 初始化current_battle为Noneself.current_battle: Optional[AbstractBattle] = None# 初始化_user_funcs为user_funcs# 定义一个方法,用于选择移动def choose_move(self, battle: AbstractBattle) -> Awaitable[BattleOrder]:# 返回_env_move方法的结果return self._env_move(battle)# 定义一个异步方法,用于处理环境移动async def _env_move(self, battle: AbstractBattle) -> BattleOrder:# 如果当前战斗为空或已结束,则将当前战斗设置为传入的战斗if not self.current_battle or self.current_battle.finished:self.current_battle = battle# 如果当前战斗不等于传入的战斗,则抛出异常if not self.current_battle == battle:raise RuntimeError("Using different battles for queues")# 将战斗嵌入到用户函数中,并异步放入observations队列中battle_to_send = self._user_funcs.embed_battle(battle)await self.observations.async_put(battle_to_send)# 从actions队列中异步获取动作action = await self.actions.async_get()# 如果动作为-1,则返回放弃战斗的指令if action == -1:return ForfeitBattleOrder()# 将动作转换为移动指令并返回return self._user_funcs.action_to_move(action, battle)# 定义一个回调方法,用于处理战斗结束时的操作def _battle_finished_callback(self, battle: AbstractBattle):# 将战斗嵌入到用户函数中,并异步放入observations队列中to_put = self._user_funcs.embed_battle(battle)# 在POKE_LOOP中安全地运行异步放入操作asyncio.run_coroutine_threadsafe(self.observations.async_put(to_put), POKE_LOOP)
# 定义一个元类,继承自 ABC 类型
class _ABCMetaclass(type(ABC)):pass# 定义一个元类,继承自 Env 类型
class _EnvMetaclass(type(Env)):pass# 定义一个元类,继承自 _EnvMetaclass 和 _ABCMetaclass
class _OpenAIGymEnvMetaclass(_EnvMetaclass, _ABCMetaclass):pass# 定义一个类 OpenAIGymEnv,继承自 Env[ObsType, ActType] 和 ABC 类型,使用 _OpenAIGymEnvMetaclass 元类
class OpenAIGymEnv(Env[ObsType, ActType],ABC,metaclass=_OpenAIGymEnvMetaclass,
):"""Base class implementing the OpenAI Gym API on the main thread."""# 初始化重试次数_INIT_RETRIES = 100# 重试之间的时间间隔_TIME_BETWEEN_RETRIES = 0.5# 切换挑战任务的重试次数_SWITCH_CHALLENGE_TASK_RETRIES = 30# 切换重试之间的时间间隔_TIME_BETWEEN_SWITCH_RETIRES = 1# 初始化方法def __init__(self,account_configuration: Optional[AccountConfiguration] = None,*,avatar: Optional[int] = None,battle_format: str = "gen8randombattle",log_level: Optional[int] = None,save_replays: Union[bool, str] = False,server_configuration: Optional[ServerConfiguration] = LocalhostServerConfiguration,start_timer_on_battle_start: bool = False,start_listening: bool = True,ping_interval: Optional[float] = 20.0,ping_timeout: Optional[float] = 20.0,team: Optional[Union[str, Teambuilder]] = None,start_challenging: bool = False,# 抽象方法,计算奖励@abstractmethoddef calc_reward(self, last_battle: AbstractBattle, current_battle: AbstractBattle) -> float:"""Returns the reward for the current battle state. The battle state in the previousturn is given as well and can be used for comparisons.:param last_battle: The battle state in the previous turn.:type last_battle: AbstractBattle:param current_battle: The current battle state.:type current_battle: AbstractBattle:return: The reward for current_battle.:rtype: float"""pass# 抽象方法@abstractmethod# 根据给定的动作和当前战斗状态返回相应的战斗指令def action_to_move(self, action: int, battle: AbstractBattle) -> BattleOrder:"""Returns the BattleOrder relative to the given action.:param action: The action to take.:type action: int:param battle: The current battle state:type battle: AbstractBattle:return: The battle order for the given action in context of the current battle.:rtype: BattleOrder"""pass# 返回当前战斗状态的嵌入,格式与OpenAI gym API兼容@abstractmethoddef embed_battle(self, battle: AbstractBattle) -> ObsType:"""Returns the embedding of the current battle state in a format compatible withthe OpenAI gym API.:param battle: The current battle state.:type battle: AbstractBattle:return: The embedding of the current battle state."""pass# 返回嵌入的描述,必须返回一个指定了下限和上限的Space@abstractmethoddef describe_embedding(self) -> Space[ObsType]:"""Returns the description of the embedding. It must return a Space specifyinglow bounds and high bounds.:return: The description of the embedding.:rtype: Space"""pass# 返回动作空间的大小,如果大小为x,则动作空间从0到x-1@abstractmethoddef action_space_size(self) -> int:"""Returns the size of the action space. Given size x, the action space goesfrom 0 to x - 1.:return: The action space size.:rtype: int"""pass# 返回将在挑战循环的下一次迭代中挑战的对手(或对手列表)# 如果返回一个列表,则在挑战循环期间将随机选择一个元素@abstractmethoddef get_opponent(self,) -> Union[Player, str, List[Player], List[str]]:"""Returns the opponent (or list of opponents) that will be challengedon the next iteration of the challenge loop. If a list is returned,a random element will be chosen at random during the challenge loop.:return: The opponent (or list of opponents).:rtype: Player or str or list(Player) or list(str)"""pass# 获取对手玩家或字符串def _get_opponent(self) -> Union[Player, str]:# 获取对手opponent = self.get_opponent()# 如果对手是列表,则随机选择一个对手,否则直接返回对手random_opponent = (random.choice(opponent) if isinstance(opponent, list) else opponent)return random_opponent# 重置环境def reset(self,*,seed: Optional[int] = None,options: Optional[Dict[str, Any]] = None,) -> Tuple[ObsType, Dict[str, Any]]:# 如果有种子值,则使用种子值重置环境if seed is not None:super().reset(seed=seed)  # type: ignoreself._seed_initialized = True# 如果种子值未初始化,则使用当前时间戳作为种子值elif not self._seed_initialized:super().reset(seed=int(time.time()))  # type: ignoreself._seed_initialized = True# 如果当前没有对战,则等待对战开始if not self.agent.current_battle:count = self._INIT_RETRIESwhile not self.agent.current_battle:if count == 0:raise RuntimeError("Agent is not challenging")count -= 1time.sleep(self._TIME_BETWEEN_RETRIES)# 如果当前对战未结束,则等待对战结束if self.current_battle and not self.current_battle.finished:if self.current_battle == self.agent.current_battle:self._actions.put(-1)self._observations.get()else:raise RuntimeError("Environment and agent aren't synchronized. Try to restart")# 等待当前对战与对手对战不同while self.current_battle == self.agent.current_battle:time.sleep(0.01)# 更新当前对战为对手对战self.current_battle = self.agent.current_battlebattle = copy.copy(self.current_battle)battle.logger = Noneself.last_battle = copy.deepcopy(battle)return self._observations.get(), self.get_additional_info()# 获取额外信息def get_additional_info(self) -> Dict[str, Any]:"""Returns additional info for the reset method.Override only if you really need it.:return: Additional information as a Dict:rtype: Dict"""return {}def step(self, action: ActType) -> Tuple[ObsType, float, bool, bool, Dict[str, Any]]:"""Execute the specified action in the environment.:param ActType action: The action to be executed.:return: A tuple containing the new observation, reward, termination flag, truncation flag, and info dictionary.:rtype: Tuple[ObsType, float, bool, bool, Dict[str, Any]]"""# 如果当前战斗为空,则重置环境并返回初始观察和信息if not self.current_battle:obs, info = self.reset()return obs, 0.0, False, False, info# 如果当前战斗已经结束,则抛出异常if self.current_battle.finished:raise RuntimeError("Battle is already finished, call reset")# 复制当前战斗对象,以便进行操作battle = copy.copy(self.current_battle)battle.logger = None# 深度复制当前战斗对象,用于记录上一次的战斗状态self.last_battle = copy.deepcopy(battle)# 将动作放入动作队列self._actions.put(action)# 从观察队列中获取观察结果observation = self._observations.get()# 计算奖励reward = self.calc_reward(self.last_battle, self.current_battle)terminated = Falsetruncated = False# 如果当前战斗已经结束if self.current_battle.finished:size = self.current_battle.team_size# 计算剩余队伍中未被击倒的精灵数量remaining_mons = size - len([mon for mon in self.current_battle.team.values() if mon.fainted])remaining_opponent_mons = size - len([monfor mon in self.current_battle.opponent_team.values()if mon.fainted])# 如果一方队伍的精灵全部被击倒,则游戏结束if (remaining_mons == 0) != (remaining_opponent_mons == 0):terminated = Trueelse:truncated = True# 返回观察结果、奖励、游戏是否结束、游戏是否截断以及额外信息return observation, reward, terminated, truncated, self.get_additional_info()# 渲染当前战斗状态,显示当前回合信息和双方精灵状态def render(self, mode: str = "human"):# 如果当前存在战斗if self.current_battle is not None:# 打印当前回合信息和双方精灵状态print("  Turn %4d. | [%s][%3d/%3dhp] %10.10s - %10.10s [%3d%%hp][%s]"% (self.current_battle.turn,"".join(["⦻" if mon.fainted else "●"for mon in self.current_battle.team.values()]),self.current_battle.active_pokemon.current_hp or 0,self.current_battle.active_pokemon.max_hp or 0,self.current_battle.active_pokemon.species,self.current_battle.opponent_active_pokemon.species,self.current_battle.opponent_active_pokemon.current_hp or 0,"".join(["⦻" if mon.fainted else "●"for mon in self.current_battle.opponent_team.values()]),),end="\n" if self.current_battle.finished else "\r",)# 关闭当前战斗,清理资源def close(self, purge: bool = True):# 如果当前没有战斗或者当前战斗已结束if self.current_battle is None or self.current_battle.finished:# 等待1秒time.sleep(1)# 如果当前战斗不是代理的当前战斗if self.current_battle != self.agent.current_battle:self.current_battle = self.agent.current_battle# 创建一个异步任务来停止挑战循环closing_task = asyncio.run_coroutine_threadsafe(self._stop_challenge_loop(purge=purge), POKE_LOOP)# 获取异步任务的结果closing_task.result()def background_send_challenge(self, username: str):"""Sends a single challenge to a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task and not self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行发送挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.send_challenges(username, 1), POKE_LOOP)def background_accept_challenge(self, username: str):"""Accepts a single challenge from a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task and not self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行接受挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.accept_challenges(username, 1, self.agent.next_team), POKE_LOOP)async def _challenge_loop(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,# 如果没有指定挑战次数,则持续挑战直到 self._keep_challenging 为 False):# 如果没有挑战次数且 self._keep_challenging 为 Trueif not n_challenges:# 持续挑战直到 self._keep_challenging 为 Falsewhile self._keep_challenging:# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型if isinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent, 1)else:# 发送挑战请求await self.agent.send_challenges(opponent, 1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle is not None:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果指定了挑战次数且挑战次数大于 0elif n_challenges > 0:# 循环指定次数for _ in range(n_challenges):# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型if isinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent, 1)else:# 发送挑战请求await self.agent.send_challenges(opponent, 1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle is not None:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果挑战次数小于等于 0else:# 抛出数值错误异常raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")# 开始挑战def start_challenging(# 指定挑战次数,默认为 Noneself,n_challenges: Optional[int] = None,# 回调函数,接受 AbstractBattle 类型参数并返回 Nonecallback: Optional[Callable[[AbstractBattle], None]] = None,):"""Starts the challenge loop.:param n_challenges: The number of challenges to send. If empty it will run untilstopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with a copy ofthe final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task and not self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhile not self._challenge_task.done():if count == 0:raise RuntimeError("Agent is already challenging")count -= 1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战if not n_challenges:self._keep_challenging = True# 启动挑战循环任务self._challenge_task = asyncio.run_coroutine_threadsafe(self._challenge_loop(n_challenges, callback), POKE_LOOP)async def _ladder_loop(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,):# 如果指定了挑战次数,则进行相应次数的挑战if n_challenges:if n_challenges <= 0:raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")for _ in range(n_challenges):await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle is not None:callback(copy.deepcopy(self.current_battle))# 如果未指定挑战次数,则持续挑战直到停止else:while self._keep_challenging:await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle is not None:callback(copy.deepcopy(self.current_battle))# 启动 ladder 循环挑战def start_laddering(self,n_challenges: Optional[int] = None,callback: Optional[Callable[[AbstractBattle], None]] = None,):"""Starts the laddering loop.:param n_challenges: The number of ladder games to play. If empty itwill run until stopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with acopy of the final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task and not self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhile not self._challenge_task.done():if count == 0:raise RuntimeError("Agent is already challenging")count -= 1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战if not n_challenges:self._keep_challenging = True# 使用 asyncio 在另一个线程中运行 _ladder_loop 方法,传入挑战次数和回调函数self._challenge_task = asyncio.run_coroutine_threadsafe(self._ladder_loop(n_challenges, callback), POKE_LOOP)async def _stop_challenge_loop(self, force: bool = True, wait: bool = True, purge: bool = False):  # 定义一个方法,接受多个参数self._keep_challenging = False  # 将属性_keep_challenging设置为Falseif force:  # 如果force为真if self.current_battle and not self.current_battle.finished:  # 如果存在当前战斗且未结束if not self._actions.empty():  # 如果_actions队列不为空await asyncio.sleep(2)  # 异步等待2秒if not self._actions.empty():  # 如果_actions队列仍不为空raise RuntimeError(  # 抛出运行时错误"The agent is still sending actions. ""Use this method only when training or ""evaluation are over.")if not self._observations.empty():  # 如果_observations队列不为空await self._observations.async_get()  # 异步获取_observations队列中的数据await self._actions.async_put(-1)  # 异步将-1放入_actions队列中if wait and self._challenge_task:  # 如果wait为真且_challenge_task存在while not self._challenge_task.done():  # 当_challenge_task未完成时await asyncio.sleep(1)  # 异步等待1秒self._challenge_task.result()  # 获取_challenge_task的结果self._challenge_task = None  # 将_challenge_task设置为Noneself.current_battle = None  # 将current_battle设置为Noneself.agent.current_battle = None  # 将agent的current_battle设置为Nonewhile not self._actions.empty():  # 当_actions队列不为空时await self._actions.async_get()  # 异步获取_actions队列中的数据while not self._observations.empty():  # 当_observations队列不为空时await self._observations.async_get()  # 异步获取_observations队列中的数据if purge:  # 如果purge为真self.agent.reset_battles()  # 调用agent的reset_battles方法def reset_battles(self):  # 定义一个方法reset_battles"""Resets the player's inner battle tracker."""  # 重置玩家的内部战斗追踪器self.agent.reset_battles()  # 调用agent的reset_battles方法# 检查任务是否完成,可设置超时时间def done(self, timeout: Optional[int] = None) -> bool:"""Returns True if the task is done or is done after the timeout, false otherwise.:param timeout: The amount of time to wait for if the task is not already done.If empty it will wait until the task is done.:type timeout: int, optional:return: True if the task is done or if the task gets completed after thetimeout.:rtype: bool"""# 如果挑战任务为空,则返回Trueif self._challenge_task is None:return True# 如果超时时间为空,则等待任务完成if timeout is None:self._challenge_task.result()return True# 如果挑战任务已完成,则返回Trueif self._challenge_task.done():return True# 等待一段时间后再次检查任务是否完成time.sleep(timeout)return self._challenge_task.done()# 暴露Player类的属性@propertydef battles(self) -> Dict[str, AbstractBattle]:return self.agent.battles@propertydef format(self) -> str:return self.agent.format@propertydef format_is_doubles(self) -> bool:return self.agent.format_is_doubles@propertydef n_finished_battles(self) -> int:return self.agent.n_finished_battles@propertydef n_lost_battles(self) -> int:return self.agent.n_lost_battles@propertydef n_tied_battles(self) -> int:return self.agent.n_tied_battles@propertydef n_won_battles(self) -> int:return self.agent.n_won_battles@propertydef win_rate(self) -> float:return self.agent.win_rate# 暴露Player Network Interface Class的属性@propertydef logged_in(self) -> asyncio.Event:"""Event object associated with user login.:return: The logged-in event:rtype: Event"""return self.agent.ps_client.logged_in@property# 返回与玩家相关联的日志记录器def logger(self) -> Logger:"""Logger associated with the player.:return: The logger.:rtype: Logger"""return self.agent.logger# 返回玩家的用户名@propertydef username(self) -> str:"""The player's username.:return: The player's username.:rtype: str"""return self.agent.username# 返回 WebSocket 的 URL@propertydef websocket_url(self) -> str:"""The websocket url.It is derived from the server url.:return: The websocket url.:rtype: str"""return self.agent.ps_client.websocket_url# 获取属性的值def __getattr__(self, item: str):return getattr(self.agent, item)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/734675.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

openssl3.2 - exp - generate prime

文章目录 openssl3.2 - exp - generate prime概述笔记END openssl3.2 - exp - generate prime 概述 openssl3.2.命令行可以生成质数 openssl prime -generate -bits 256 -hex -safe 单步调试openssl.exe工程, 整理了一个函数, 用openssl API来产生质数. openssl命令行是将结果…

2024/3/10打卡借教室——二分+差分

题目 在大学期间&#xff0c;经常需要租借教室。 大到院系举办活动&#xff0c;小到学习小组自习讨论&#xff0c;都需要向学校申请借教室。 教室的大小功能不同&#xff0c;借教室人的身份不同&#xff0c;借教室的手续也不一样。  面对海量租借教室的信息&#xff0c;我们自…

IDEA打开项目文件目录不见了

偶尔发生新拉下来的代码&#xff0c;或者旧代码修改了包名&#xff0c;项目名称等&#xff0c;idea左侧project一栏不显示代码的文件目录。例如下面此时不要慌张&#xff0c;不用删除项目重新拉取&#xff0c;通过以下方式解决&#xff1a; 本人尝试能够解决&#xff0c;如果无…

c# 二分查找(迭代与递归)

二分搜索被定义为一种在排序数组中使用的搜索算法&#xff0c;通过重复将搜索间隔一分为二。二分查找的思想是利用数组已排序的信息&#xff0c;将时间复杂度降低到O(log N)。 二分查找算法示例 何时在数据结构中应用二分查找的条件&#xff1a; 应用二分查找算法&#xff1a…

stable diffusion faceswaplab换脸插件报错解决

错误提示&#xff1a; ERROR - Failed to swap face in postprocess method : apply_overlay() takes 3 positional arguments but 4 were given 打开插件对应目录&#xff1a; \sd-webui-aki-v4.6.1\extensions\sd-webui-faceswaplab\scripts\faceswaplab_utils中 imgutil…

架构:Apache Kafka Connect实现sqlserver数据实时同步

实现Apache Kafka Connect与SQL Server之间的实时数据同步&#xff0c;您可以使用Kafka Connect的JDBC Source Connector。以下是一个基本的步骤&#xff1a; 1. 安装Kafka Connect&#xff1a;确保您已经安装了Apache Kafka 和 Kafka Connect。您可以从Apache Kafka的官方网站…

Servlet API 详细讲解

Servlet API 详细讲解 API就是一组类和方法的集合&#xff0c;servlet 中的 类是非常多的&#xff0c;咱们只需要学习 3个类即可。 HttpServletHttpServletRequest&#xff08;服务器如何读取客户端响应&#xff09;HttpServletResponse&#xff08;服务器如何把响应返回给客…

【ros2 control 机器人驱动开发】双关节多控制器机器人学习-example 4

【ros2 control 机器人驱动开发】双关节多控制器机器人学习-example 4 文章目录 前言一、创建controller相关二、测试运行测试forward_position_controller总结前言 本篇文章在上篇文章的基础上主要讲解双轴机器人驱动怎么编写机器人外部/内部(扭矩、压力)传感器数据反馈1,…

delphi7中出现“无法更改以命令对象为源的记录集对象..“的错误解决

我在delphi7环境下写一个数据库应用程序&#xff0c;每次关闭界面时总出现“无法更改以命令对象为源的记录集对象.."的错误。如图所示。 经查阅资料&#xff0c;我得到一些思路&#xff1a;最 这个错误信息通常表示在关闭窗体时&#xff0c;有一个或多个数据库组件&…

大数据开发(Hadoop面试真题-卷四)

大数据开发&#xff08;Hadoop面试真题&#xff09; 1、Hadoop小文件处理问题&#xff1f;2、介绍下HDFS&#xff0c;说下HDFS优缺点&#xff0c;以及使用场景&#xff1f;3、HDFS作用4、HDFS的容错机制5、HDFS的副本机制6、HDFS的常见数据格式&#xff0c;列式存储格式和行存储…

Uniapp开发模板unibest

&#x1f3e0;简介 unibest 是一个集成了多种工具和技术的 uniapp 开发模板&#xff0c;由 uniapp Vue3 Ts Vite4 UnoCss uv-ui VSCode 构建&#xff0c;模板具有代码提示、自动格式化、统一配置、代码片段等功能&#xff0c;并内置了许多常用的基本组件和基本功能&#…

软件安全——堆栈基础知识点总结

一、堆栈基础——内存区域 1、内存区域相关概念 内存区域&#xff1a;一个进程可能被分配到不同的内存区域去执行&#xff1a; 代码区&#xff1a;这个区域存储着被装入执行的二进制机器代码&#xff0c;处理器会到这个区域取指并执行。 数据区&#xff1a;用于存储全局变量…

重拾C++之菜鸟刷算法第10篇---二叉树(下)

十五、合并二叉树 题目 给你两棵二叉树&#xff1a; root1 和 root2 。 想象一下&#xff0c;当你将其中一棵覆盖到另一棵之上时&#xff0c;两棵树上的一些节点将会重叠&#xff08;而另一些不会&#xff09;。你需要将这两棵树合并成一棵新二叉树。合并的规则是&#xff1…

【NR 定位】3GPP NR Positioning 5G定位标准解读(八)- OTDOA定位

前言 3GPP NR Positioning 5G定位标准&#xff1a;3GPP TS 38.305 V18 3GPP 标准网址&#xff1a;Directory Listing /ftp/ 【NR 定位】3GPP NR Positioning 5G定位标准解读&#xff08;一&#xff09;-CSDN博客 【NR 定位】3GPP NR Positioning 5G定位标准解读&#xff08;…

车规芯片为什么需要信息安全(1)

目录 1.汽车出现过被黑客攻击事件吗&#xff1f; 2.汽车信息安全标准汇总 2.1 国际标准 2.2 国内标准 3.车规芯片的信息安全应该从什么地方考虑 3.1 芯片硬件安全防护能力 3.2 车规芯片的信息安全服务 3.3 芯片厂如何证明芯片的信息安全能力 4.小结 这个来自家里人的灵…

WebGL之创建 3D 对象

现在让我们给之前的正方形添加五个面从而可以创建一个三维的立方体。最简单的方式就是通过调用方法 gl.drawElements() 使用顶点数组列表来替换之前的通过方法gl.drawArrays() 直接使用顶点数组。而顶点数组列表里保存着将会被引用到一个个独立的顶点。 其实现在会存在这样一个…

设计模式学习系列 -- 随记

文章目录 前言 一、设计模式是什么&#xff1f; 二、设计模式的历史 三、为什么以及如何学习设计模式&#xff1f; 四、关于模式的争议 一种针对不完善编程语言的蹩脚解决方案 低效的解决方案 不当使用 五、设计模式分类 总结 前言 最近可能工作生活上的稳定慢慢感觉自己丢失…

【轮式平衡机器人】——TMS320F28069片内外设之ePWM

声明&#xff1a;本系列博客参考有关专业书籍&#xff0c;截图均为自己实操&#xff0c;仅供交流学习&#xff01; 引入 脉冲宽度调制&#xff08;PWM&#xff09;是一种对模拟信号进行数字编码的方法&#xff0c;硬件原理是根据相应载荷的变化来调制晶体管栅极或基极的偏置&…

【c++】string模拟实现

string类的接口 namespace zjw {class string{public:typedef char* iterator;typedef const char* const_iterator;private:char* _str;int _size;int _capacity;};这里的迭代器直接使用原生指针来封装。 _str为指向string数组的首地址的指针。 _size为string数组的大小。 …

测试工具使用技巧01-->jmeter链接mysql

前言 在做接口或者性能测试的时候&#xff0c;有时需要jmeter连接数据库做操作&#xff0c;可以看看如下实例。操作实例 在mysql数据库中有如下数据表 在jmeter导入jdbc驱动插件&#xff08;需要的留言找我拿&#xff09; 在jmeter测试计划元件最下面&#xff0c;导入jdbc.…