"""
This module contains exceptions.
"""# 定义一个自定义异常类 ShowdownException,继承自内置异常类 ExceptionclassShowdownException(Exception):"""This exception is raised when a non-managed messageis received from the server."""# 当从服务器接收到非受控消息时引发此异常pass
.\PokeLLMon\poke_env\player\baselines.py
# 导入必要的模块from typing import List
import json
import osfrom poke_env.environment.abstract_battle import AbstractBattle
from poke_env.environment.double_battle import DoubleBattle
from poke_env.environment.move_category import MoveCategory
from poke_env.environment.pokemon import Pokemon
from poke_env.environment.side_condition import SideCondition
from poke_env.player.player import Player
from poke_env.data.gen_data import GenData# 从文件中加载招式效果数据withopen("./poke_env/data/static/moves/moves_effect.json","r")as f:move_effect = json.load(f)# 计算招式类型的伤害倍率defcalculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list):# 定义所有可能的宝可梦类型TYPE_list ='BUG,DARK,DRAGON,ELECTRIC,FAIRY,FIGHTING,FIRE,FLYING,GHOST,GRASS,GROUND,ICE,NORMAL,POISON,PSYCHIC,ROCK,STEEL,WATER'.split(",")move_type_damage_multiplier_list =[]# 如果存在第二个类型if type_2:# 计算每种类型对应的伤害倍率fortypein TYPE_list:move_type_damage_multiplier_list.append(type_chart[type_1][type]* type_chart[type_2][type])move_type_damage_multiplier_dict =dict(zip(TYPE_list, move_type_damage_multiplier_list))else:move_type_damage_multiplier_dict = type_chart[type_1]effective_type_list =[]extreme_type_list =[]resistant_type_list =[]extreme_resistant_type_list =[]immune_type_list =[]# 根据伤害倍率将类型分为不同的类别fortype, value in move_type_damage_multiplier_dict.items():if value ==2:effective_type_list.append(type)elif value ==4:extreme_type_list.append(type)elif value ==1/2:resistant_type_list.append(type)elif value ==1/4:extreme_resistant_type_list.append(type)elif value ==0:immune_type_list.append(type)else:# value == 1continue# 如果约束类型列表不为空if constraint_type_list:# 更新极端类型列表,取交集extreme_type_list =list(set(extreme_type_list).intersection(set(constraint_type_list)))# 更新有效类型列表,取交集effective_type_list =list(set(effective_type_list).intersection(set(constraint_type_list)))# 更新抗性类型列表,取交集resistant_type_list =list(set(resistant_type_list).intersection(set(constraint_type_list)))# 更新极端抗性类型列表,取交集extreme_resistant_type_list =list(set(extreme_resistant_type_list).intersection(set(constraint_type_list)))# 更新免疫类型列表,取交集immune_type_list =list(set(immune_type_list).intersection(set(constraint_type_list)))# 返回更新后的各类型列表return extreme_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list
# 定义一个函数,根据给定的参数计算并返回对应的移动类型伤害提示defmove_type_damage_wraper(pokemon_name, type_1, type_2, type_chart, constraint_type_list=None):# 初始化移动类型伤害提示字符串move_type_damage_prompt =""# 调用函数计算移动类型伤害倍数,得到各种类型的列表extreme_effective_type_list, effective_type_list, resistant_type_list, extreme_resistant_type_list, immune_type_list = calculate_move_type_damage_multipier(type_1, type_2, type_chart, constraint_type_list)# 如果存在有效的、抵抗的或免疫的类型列表if effective_type_list or resistant_type_list or immune_type_list:# 构建移动类型伤害提示字符串move_type_damage_prompt =f"{pokemon_name}"if extreme_effective_type_list:move_type_damage_prompt = move_type_damage_prompt +" can be super-effectively attacked by "+", ".join(extreme_effective_type_list)+" moves"if effective_type_list:move_type_damage_prompt = move_type_damage_prompt +", can be effectively attacked by "+", ".join(effective_type_list)+" moves"if resistant_type_list:move_type_damage_prompt = move_type_damage_prompt +", is resistant to "+", ".join(resistant_type_list)+" moves"if extreme_resistant_type_list:move_type_damage_prompt = move_type_damage_prompt +", is super-resistant to "+", ".join(extreme_resistant_type_list)+" moves"if immune_type_list:move_type_damage_prompt = move_type_damage_prompt +", is immuned to "+", ".join(immune_type_list)+" moves"# 返回移动类型伤害提示字符串return move_type_damage_prompt# 定义一个类,继承自Player类,实现最大基础伤害玩家classMaxBasePowerPlayer(Player):# 重写choose_move方法defchoose_move(self, battle: AbstractBattle):# 如果存在可用的移动if battle.available_moves:# 选择基础伤害最大的移动best_move =max(battle.available_moves, key=lambda move: move.base_power)return self.create_order(best_move)# 如果没有可用的移动,则随机选择一个移动return self.choose_random_move(battle)# 定义一个类,继承自Player类,实现简单启发式玩家classSimpleHeuristicsPlayer(Player):# 定义了各种入场危害效果,将字符串映射到对应的SideCondition枚举值ENTRY_HAZARDS ={"spikes": SideCondition.SPIKES,"stealhrock": SideCondition.STEALTH_ROCK,"stickyweb": SideCondition.STICKY_WEB,"toxicspikes": SideCondition.TOXIC_SPIKES,}# 定义了反危害招式,使用集合存储ANTI_HAZARDS_MOVES ={"rapidspin","defog"}# 定义了速度等级系数SPEED_TIER_COEFICIENT =0.1# 定义了生命值分数系数HP_FRACTION_COEFICIENT =0.4# 定义了交换出场匹配阈值SWITCH_OUT_MATCHUP_THRESHOLD =-2# 估算对战情况,返回得分def_estimate_matchup(self, mon: Pokemon, opponent: Pokemon):# 计算对手对我方造成的伤害倍率的最大值score =max([opponent.damage_multiplier(t)for t in mon.types if t isnotNone])# 减去我方对对手造成的伤害倍率的最大值score -=max([mon.damage_multiplier(t)for t in opponent.types if t isnotNone])# 根据速度等级差异调整得分if mon.base_stats["spe"]> opponent.base_stats["spe"]:score += self.SPEED_TIER_COEFICIENTelif opponent.base_stats["spe"]> mon.base_stats["spe"]:score -= self.SPEED_TIER_COEFICIENT# 根据生命值分数调整得分score += mon.current_hp_fraction * self.HP_FRACTION_COEFICIENTscore -= opponent.current_hp_fraction * self.HP_FRACTION_COEFICIENTreturn score# 判断是否应该使用极巨化def_should_dynamax(self, battle: AbstractBattle, n_remaining_mons:int):if battle.can_dynamax and self._dynamax_disable isFalse:# 最后一个满血的精灵if(len([m for m in battle.team.values()if m.current_hp_fraction ==1])==1and battle.active_pokemon.current_hp_fraction ==1):returnTrue# 有优势且双方都是满血if(self._estimate_matchup(battle.active_pokemon, battle.opponent_active_pokemon)>0and battle.active_pokemon.current_hp_fraction ==1and battle.opponent_active_pokemon.current_hp_fraction ==1):returnTrue# 只剩下一个精灵if n_remaining_mons ==1:returnTruereturnFalse# 判断是否应该替换出当前精灵def_should_switch_out(self, battle: AbstractBattle):# 获取当前精灵和对手精灵active = battle.active_pokemonopponent = battle.opponent_active_pokemon# 如果有一个适合替换的精灵...if[mfor m in battle.available_switchesif self._estimate_matchup(m, opponent)>0]:# ...并且有一个“好”的理由替换出去if active.boosts["def"]<=-3or active.boosts["spd"]<=-3:returnTrueif(active.boosts["atk"]<=-3and active.stats["atk"]>= active.stats["spa"]):returnTrueif(active.boosts["spa"]<=-3and active.stats["atk"]<= active.stats["spa"]):returnTrueif(self._estimate_matchup(active, opponent)< self.SWITCH_OUT_MATCHUP_THRESHOLD):returnTruereturnFalse# 估算精灵的状态def_stat_estimation(self, mon: Pokemon, stat:str):# 计算状态提升值if mon.boosts[stat]>1:boost =(2+ mon.boosts[stat])/2else:boost =2/(2- mon.boosts[stat])return((2* mon.base_stats[stat]+31)+5)* boost# 计算奖励值defcalc_reward(self, current_battle: AbstractBattle)->float:# 计算奖励值return self.reward_computing_helper(current_battle, fainted_value=2.0, hp_value=1.0, victory_value=30.0)# 根据状态和等级返回加成倍数defboost_multiplier(self, state, level):# 如果状态是准确度if state =="accuracy":# 根据等级返回对应的加成倍数if level ==0:return1.0if level ==1:return1.33if level ==2:return1.66if level ==3:return2.0if level ==4:return2.5if level ==5:return2.66if level ==6:return3.0if level ==-1:return0.75if level ==-2:return0.6if level ==-3:return0.5if level ==-4:return0.43if level ==-5:return0.36if level ==-6:return0.33# 如果状态不是准确度else:# 根据等级返回对应的加成倍数if level ==0:return1.0if level ==1:return1.5if level ==2:return2.0if level ==3:return2.5if level ==4:return3.0if level ==5:return3.5if level ==6:return4.0if level ==-1:return0.67if level ==-2:return0.5if level ==-3:return0.4if level ==-4:return0.33if level ==-5:return0.29if level ==-6:return0.25# 检查给定状态的值,并返回相应的状态字符串defcheck_status(self, status):# 如果状态存在if status:# 根据状态值返回相应的状态字符串if status.value ==1:return"burnt"elif status.value ==2:return"fainted"elif status.value ==3:return"frozen"elif status.value ==4:return"paralyzed"elif status.value ==5:return"poisoned"elif status.value ==7:return"toxic"elif status.value ==6:return"sleeping"# 如果状态不存在,则返回"healthy"else:return"healthy"
"""This module defines a player class with the OpenAI API on the main thread.
For a black-box implementation consider using the module env_player.
"""# 导入必要的模块from __future__ import annotationsimport asyncio
import copy
import random
import time
from abc import ABC, abstractmethod
from logging import Logger
from typing import Any, Awaitable, Callable, Dict, Generic, List, Optional, Tuple, Union# 导入自定义模块from gymnasium.core import ActType, Env, ObsType
from gymnasium.spaces import Discrete, Space# 导入自定义模块from poke_env.concurrency import POKE_LOOP, create_in_poke_loop
from poke_env.environment.abstract_battle import AbstractBattle
from poke_env.player.battle_order import BattleOrder, ForfeitBattleOrder
from poke_env.player.player import Player
from poke_env.ps_client import AccountConfiguration
from poke_env.ps_client.server_configuration import(LocalhostServerConfiguration,ServerConfiguration,)from poke_env.teambuilder.teambuilder import Teambuilder# 定义一个异步队列类class_AsyncQueue:def__init__(self, queue: asyncio.Queue[Any]):self.queue = queue# 异步获取队列中的元素asyncdefasync_get(self):returnawait self.queue.get()# 获取队列中的元素defget(self):res = asyncio.run_coroutine_threadsafe(self.queue.get(), POKE_LOOP)return res.result()# 异步向队列中放入元素asyncdefasync_put(self, item: Any):await self.queue.put(item)# 向队列中放入元素defput(self, item: Any):task = asyncio.run_coroutine_threadsafe(self.queue.put(item), POKE_LOOP)task.result()# 判断队列是否为空defempty(self):return self.queue.empty()# 阻塞直到队列中的所有元素都被处理defjoin(self):task = asyncio.run_coroutine_threadsafe(self.queue.join(), POKE_LOOP)task.result()# 异步等待队列中的所有元素都被处理asyncdefasync_join(self):await self.queue.join()# 定义一个异步玩家类class_AsyncPlayer(Generic[ObsType, ActType], Player):actions: _AsyncQueueobservations: _AsyncQueuedef__init__(self,user_funcs: OpenAIGymEnv[ObsType, ActType],username:str,**kwargs: Any,# 定义一个类,继承自AsyncPlayer类):# 设置类名为usernameself.__class__.__name__ = username# 调用父类的初始化方法super().__init__(**kwargs)# 设置类名为"_AsyncPlayer"self.__class__.__name__ ="_AsyncPlayer"# 初始化observations为一个异步队列self.observations = _AsyncQueue(create_in_poke_loop(asyncio.Queue,1))# 初始化actions为一个异步队列self.actions = _AsyncQueue(create_in_poke_loop(asyncio.Queue,1))# 初始化current_battle为Noneself.current_battle: Optional[AbstractBattle]=None# 初始化_user_funcs为user_funcs# 定义一个方法,用于选择移动defchoose_move(self, battle: AbstractBattle)-> Awaitable[BattleOrder]:# 返回_env_move方法的结果return self._env_move(battle)# 定义一个异步方法,用于处理环境移动asyncdef_env_move(self, battle: AbstractBattle)-> BattleOrder:# 如果当前战斗为空或已结束,则将当前战斗设置为传入的战斗ifnot self.current_battle or self.current_battle.finished:self.current_battle = battle# 如果当前战斗不等于传入的战斗,则抛出异常ifnot self.current_battle == battle:raise RuntimeError("Using different battles for queues")# 将战斗嵌入到用户函数中,并异步放入observations队列中battle_to_send = self._user_funcs.embed_battle(battle)await self.observations.async_put(battle_to_send)# 从actions队列中异步获取动作action =await self.actions.async_get()# 如果动作为-1,则返回放弃战斗的指令if action ==-1:return ForfeitBattleOrder()# 将动作转换为移动指令并返回return self._user_funcs.action_to_move(action, battle)# 定义一个回调方法,用于处理战斗结束时的操作def_battle_finished_callback(self, battle: AbstractBattle):# 将战斗嵌入到用户函数中,并异步放入observations队列中to_put = self._user_funcs.embed_battle(battle)# 在POKE_LOOP中安全地运行异步放入操作asyncio.run_coroutine_threadsafe(self.observations.async_put(to_put), POKE_LOOP)# 定义一个元类,继承自 ABC 类型class_ABCMetaclass(type(ABC)):pass# 定义一个元类,继承自 Env 类型class_EnvMetaclass(type(Env)):pass# 定义一个元类,继承自 _EnvMetaclass 和 _ABCMetaclassclass_OpenAIGymEnvMetaclass(_EnvMetaclass, _ABCMetaclass):pass# 定义一个类 OpenAIGymEnv,继承自 Env[ObsType, ActType] 和 ABC 类型,使用 _OpenAIGymEnvMetaclass 元类classOpenAIGymEnv(Env[ObsType, ActType],ABC,metaclass=_OpenAIGymEnvMetaclass,):"""Base class implementing the OpenAI Gym API on the main thread."""# 初始化重试次数_INIT_RETRIES =100# 重试之间的时间间隔_TIME_BETWEEN_RETRIES =0.5# 切换挑战任务的重试次数_SWITCH_CHALLENGE_TASK_RETRIES =30# 切换重试之间的时间间隔_TIME_BETWEEN_SWITCH_RETIRES =1# 初始化方法def__init__(self,account_configuration: Optional[AccountConfiguration]=None,*,avatar: Optional[int]=None,battle_format:str="gen8randombattle",log_level: Optional[int]=None,save_replays: Union[bool,str]=False,server_configuration: Optional[ServerConfiguration]= LocalhostServerConfiguration,start_timer_on_battle_start:bool=False,start_listening:bool=True,ping_interval: Optional[float]=20.0,ping_timeout: Optional[float]=20.0,team: Optional[Union[str, Teambuilder]]=None,start_challenging:bool=False,# 抽象方法,计算奖励@abstractmethoddefcalc_reward(self, last_battle: AbstractBattle, current_battle: AbstractBattle)->float:"""Returns the reward for the current battle state. The battle state in the previousturn is given as well and can be used for comparisons.:param last_battle: The battle state in the previous turn.:type last_battle: AbstractBattle:param current_battle: The current battle state.:type current_battle: AbstractBattle:return: The reward for current_battle.:rtype: float"""pass# 抽象方法@abstractmethod# 根据给定的动作和当前战斗状态返回相应的战斗指令defaction_to_move(self, action:int, battle: AbstractBattle)-> BattleOrder:"""Returns the BattleOrder relative to the given action.:param action: The action to take.:type action: int:param battle: The current battle state:type battle: AbstractBattle:return: The battle order for the given action in context of the current battle.:rtype: BattleOrder"""pass# 返回当前战斗状态的嵌入,格式与OpenAI gym API兼容@abstractmethoddefembed_battle(self, battle: AbstractBattle)-> ObsType:"""Returns the embedding of the current battle state in a format compatible withthe OpenAI gym API.:param battle: The current battle state.:type battle: AbstractBattle:return: The embedding of the current battle state."""pass# 返回嵌入的描述,必须返回一个指定了下限和上限的Space@abstractmethoddefdescribe_embedding(self)-> Space[ObsType]:"""Returns the description of the embedding. It must return a Space specifyinglow bounds and high bounds.:return: The description of the embedding.:rtype: Space"""pass# 返回动作空间的大小,如果大小为x,则动作空间从0到x-1@abstractmethoddefaction_space_size(self)->int:"""Returns the size of the action space. Given size x, the action space goesfrom 0 to x - 1.:return: The action space size.:rtype: int"""pass# 返回将在挑战循环的下一次迭代中挑战的对手(或对手列表)# 如果返回一个列表,则在挑战循环期间将随机选择一个元素@abstractmethoddefget_opponent(self,)-> Union[Player,str, List[Player], List[str]]:"""Returns the opponent (or list of opponents) that will be challengedon the next iteration of the challenge loop. If a list is returned,a random element will be chosen at random during the challenge loop.:return: The opponent (or list of opponents).:rtype: Player or str or list(Player) or list(str)"""pass# 获取对手玩家或字符串def_get_opponent(self)-> Union[Player,str]:# 获取对手opponent = self.get_opponent()# 如果对手是列表,则随机选择一个对手,否则直接返回对手random_opponent =(random.choice(opponent)ifisinstance(opponent,list)else opponent)return random_opponent# 重置环境defreset(self,*,seed: Optional[int]=None,options: Optional[Dict[str, Any]]=None,)-> Tuple[ObsType, Dict[str, Any]]:# 如果有种子值,则使用种子值重置环境if seed isnotNone:super().reset(seed=seed)# type: ignoreself._seed_initialized =True# 如果种子值未初始化,则使用当前时间戳作为种子值elifnot self._seed_initialized:super().reset(seed=int(time.time()))# type: ignoreself._seed_initialized =True# 如果当前没有对战,则等待对战开始ifnot self.agent.current_battle:count = self._INIT_RETRIESwhilenot self.agent.current_battle:if count ==0:raise RuntimeError("Agent is not challenging")count -=1time.sleep(self._TIME_BETWEEN_RETRIES)# 如果当前对战未结束,则等待对战结束if self.current_battle andnot self.current_battle.finished:if self.current_battle == self.agent.current_battle:self._actions.put(-1)self._observations.get()else:raise RuntimeError("Environment and agent aren't synchronized. Try to restart")# 等待当前对战与对手对战不同while self.current_battle == self.agent.current_battle:time.sleep(0.01)# 更新当前对战为对手对战self.current_battle = self.agent.current_battlebattle = copy.copy(self.current_battle)battle.logger =Noneself.last_battle = copy.deepcopy(battle)return self._observations.get(), self.get_additional_info()# 获取额外信息defget_additional_info(self)-> Dict[str, Any]:"""Returns additional info for the reset method.Override only if you really need it.:return: Additional information as a Dict:rtype: Dict"""return{}defstep(self, action: ActType)-> Tuple[ObsType,float,bool,bool, Dict[str, Any]]:"""Execute the specified action in the environment.:param ActType action: The action to be executed.:return: A tuple containing the new observation, reward, termination flag, truncation flag, and info dictionary.:rtype: Tuple[ObsType, float, bool, bool, Dict[str, Any]]"""# 如果当前战斗为空,则重置环境并返回初始观察和信息ifnot self.current_battle:obs, info = self.reset()return obs,0.0,False,False, info# 如果当前战斗已经结束,则抛出异常if self.current_battle.finished:raise RuntimeError("Battle is already finished, call reset")# 复制当前战斗对象,以便进行操作battle = copy.copy(self.current_battle)battle.logger =None# 深度复制当前战斗对象,用于记录上一次的战斗状态self.last_battle = copy.deepcopy(battle)# 将动作放入动作队列self._actions.put(action)# 从观察队列中获取观察结果observation = self._observations.get()# 计算奖励reward = self.calc_reward(self.last_battle, self.current_battle)terminated =Falsetruncated =False# 如果当前战斗已经结束if self.current_battle.finished:size = self.current_battle.team_size# 计算剩余队伍中未被击倒的精灵数量remaining_mons = size -len([mon for mon in self.current_battle.team.values()if mon.fainted])remaining_opponent_mons = size -len([monfor mon in self.current_battle.opponent_team.values()if mon.fainted])# 如果一方队伍的精灵全部被击倒,则游戏结束if(remaining_mons ==0)!=(remaining_opponent_mons ==0):terminated =Trueelse:truncated =True# 返回观察结果、奖励、游戏是否结束、游戏是否截断以及额外信息return observation, reward, terminated, truncated, self.get_additional_info()# 渲染当前战斗状态,显示当前回合信息和双方精灵状态defrender(self, mode:str="human"):# 如果当前存在战斗if self.current_battle isnotNone:# 打印当前回合信息和双方精灵状态print(" Turn %4d. | [%s][%3d/%3dhp] %10.10s - %10.10s [%3d%%hp][%s]"%(self.current_battle.turn,"".join(["⦻"if mon.fainted else"●"for mon in self.current_battle.team.values()]),self.current_battle.active_pokemon.current_hp or0,self.current_battle.active_pokemon.max_hp or0,self.current_battle.active_pokemon.species,self.current_battle.opponent_active_pokemon.species,self.current_battle.opponent_active_pokemon.current_hp or0,"".join(["⦻"if mon.fainted else"●"for mon in self.current_battle.opponent_team.values()]),),end="\n"if self.current_battle.finished else"\r",)# 关闭当前战斗,清理资源defclose(self, purge:bool=True):# 如果当前没有战斗或者当前战斗已结束if self.current_battle isNoneor self.current_battle.finished:# 等待1秒time.sleep(1)# 如果当前战斗不是代理的当前战斗if self.current_battle != self.agent.current_battle:self.current_battle = self.agent.current_battle# 创建一个异步任务来停止挑战循环closing_task = asyncio.run_coroutine_threadsafe(self._stop_challenge_loop(purge=purge), POKE_LOOP)# 获取异步任务的结果closing_task.result()defbackground_send_challenge(self, username:str):"""Sends a single challenge to a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task andnot self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行发送挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.send_challenges(username,1), POKE_LOOP)defbackground_accept_challenge(self, username:str):"""Accepts a single challenge from a specified player asynchronously. The function immediately returnsto allow use of the OpenAI gym API.:param username: The username of the player to challenge.:type username: str"""# 检查是否已经有挑战任务在进行,如果有则抛出异常if self._challenge_task andnot self._challenge_task.done():raise RuntimeError("Agent is already challenging opponents with the challenging loop. ""Try to specify 'start_challenging=True' during instantiation or call ""'await agent.stop_challenge_loop()' to clear the task.")# 在另一个线程中异步运行接受挑战的方法self._challenge_task = asyncio.run_coroutine_threadsafe(self.agent.accept_challenges(username,1, self.agent.next_team), POKE_LOOP)asyncdef_challenge_loop(self,n_challenges: Optional[int]=None,callback: Optional[Callable[[AbstractBattle],None]]=None,# 如果没有指定挑战次数,则持续挑战直到 self._keep_challenging 为 False):# 如果没有挑战次数且 self._keep_challenging 为 Trueifnot n_challenges:# 持续挑战直到 self._keep_challenging 为 Falsewhile self._keep_challenging:# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型ifisinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent,1)else:# 发送挑战请求await self.agent.send_challenges(opponent,1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle isnotNone:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果指定了挑战次数且挑战次数大于 0elif n_challenges >0:# 循环指定次数for _ inrange(n_challenges):# 获取对手opponent = self._get_opponent()# 如果对手是 Player 类型ifisinstance(opponent, Player):# 进行一场对战await self.agent.battle_against(opponent,1)else:# 发送挑战请求await self.agent.send_challenges(opponent,1)# 如果有回调函数且当前对战不为 Noneif callback and self.current_battle isnotNone:# 复制当前对战并调用回调函数callback(copy.deepcopy(self.current_battle))# 如果挑战次数小于等于 0else:# 抛出数值错误异常raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")# 开始挑战defstart_challenging(# 指定挑战次数,默认为 Noneself,n_challenges: Optional[int]=None,# 回调函数,接受 AbstractBattle 类型参数并返回 Nonecallback: Optional[Callable[[AbstractBattle],None]]=None,):"""Starts the challenge loop.:param n_challenges: The number of challenges to send. If empty it will run untilstopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with a copy ofthe final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task andnot self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhilenot self._challenge_task.done():if count ==0:raise RuntimeError("Agent is already challenging")count -=1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战ifnot n_challenges:self._keep_challenging =True# 启动挑战循环任务self._challenge_task = asyncio.run_coroutine_threadsafe(self._challenge_loop(n_challenges, callback), POKE_LOOP)asyncdef_ladder_loop(self,n_challenges: Optional[int]=None,callback: Optional[Callable[[AbstractBattle],None]]=None,):# 如果指定了挑战次数,则进行相应次数的挑战if n_challenges:if n_challenges <=0:raise ValueError(f"Number of challenges must be > 0. Got {n_challenges}")for _ inrange(n_challenges):await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle isnotNone:callback(copy.deepcopy(self.current_battle))# 如果未指定挑战次数,则持续挑战直到停止else:while self._keep_challenging:await self.agent.ladder(1)# 如果有回调函数且当前战斗状态不为空,则执行回调函数if callback and self.current_battle isnotNone:callback(copy.deepcopy(self.current_battle))# 启动 ladder 循环挑战defstart_laddering(self,n_challenges: Optional[int]=None,callback: Optional[Callable[[AbstractBattle],None]]=None,):"""Starts the laddering loop.:param n_challenges: The number of ladder games to play. If empty itwill run until stopped.:type n_challenges: int, optional:param callback: The function to callback after each challenge with acopy of the final battle state.:type callback: Callable[[AbstractBattle], None], optional"""# 检查是否存在正在进行的挑战任务,如果有则等待直到完成if self._challenge_task andnot self._challenge_task.done():count = self._SWITCH_CHALLENGE_TASK_RETRIESwhilenot self._challenge_task.done():if count ==0:raise RuntimeError("Agent is already challenging")count -=1time.sleep(self._TIME_BETWEEN_SWITCH_RETIRES)# 如果没有指定挑战次数,则设置为持续挑战ifnot n_challenges:self._keep_challenging =True# 使用 asyncio 在另一个线程中运行 _ladder_loop 方法,传入挑战次数和回调函数self._challenge_task = asyncio.run_coroutine_threadsafe(self._ladder_loop(n_challenges, callback), POKE_LOOP)asyncdef_stop_challenge_loop(self, force:bool=True, wait:bool=True, purge:bool=False):# 定义一个方法,接受多个参数self._keep_challenging =False# 将属性_keep_challenging设置为Falseif force:# 如果force为真if self.current_battle andnot self.current_battle.finished:# 如果存在当前战斗且未结束ifnot self._actions.empty():# 如果_actions队列不为空await asyncio.sleep(2)# 异步等待2秒ifnot self._actions.empty():# 如果_actions队列仍不为空raise RuntimeError(# 抛出运行时错误"The agent is still sending actions. ""Use this method only when training or ""evaluation are over.")ifnot self._observations.empty():# 如果_observations队列不为空await self._observations.async_get()# 异步获取_observations队列中的数据await self._actions.async_put(-1)# 异步将-1放入_actions队列中if wait and self._challenge_task:# 如果wait为真且_challenge_task存在whilenot self._challenge_task.done():# 当_challenge_task未完成时await asyncio.sleep(1)# 异步等待1秒self._challenge_task.result()# 获取_challenge_task的结果self._challenge_task =None# 将_challenge_task设置为Noneself.current_battle =None# 将current_battle设置为Noneself.agent.current_battle =None# 将agent的current_battle设置为Nonewhilenot self._actions.empty():# 当_actions队列不为空时await self._actions.async_get()# 异步获取_actions队列中的数据whilenot self._observations.empty():# 当_observations队列不为空时await self._observations.async_get()# 异步获取_observations队列中的数据if purge:# 如果purge为真self.agent.reset_battles()# 调用agent的reset_battles方法defreset_battles(self):# 定义一个方法reset_battles"""Resets the player's inner battle tracker."""# 重置玩家的内部战斗追踪器self.agent.reset_battles()# 调用agent的reset_battles方法# 检查任务是否完成,可设置超时时间defdone(self, timeout: Optional[int]=None)->bool:"""Returns True if the task is done or is done after the timeout, false otherwise.:param timeout: The amount of time to wait for if the task is not already done.If empty it will wait until the task is done.:type timeout: int, optional:return: True if the task is done or if the task gets completed after thetimeout.:rtype: bool"""# 如果挑战任务为空,则返回Trueif self._challenge_task isNone:returnTrue# 如果超时时间为空,则等待任务完成if timeout isNone:self._challenge_task.result()returnTrue# 如果挑战任务已完成,则返回Trueif self._challenge_task.done():returnTrue# 等待一段时间后再次检查任务是否完成time.sleep(timeout)return self._challenge_task.done()# 暴露Player类的属性@propertydefbattles(self)-> Dict[str, AbstractBattle]:return self.agent.battles@propertydefformat(self)->str:return self.agent.format@propertydefformat_is_doubles(self)->bool:return self.agent.format_is_doubles@propertydefn_finished_battles(self)->int:return self.agent.n_finished_battles@propertydefn_lost_battles(self)->int:return self.agent.n_lost_battles@propertydefn_tied_battles(self)->int:return self.agent.n_tied_battles@propertydefn_won_battles(self)->int:return self.agent.n_won_battles@propertydefwin_rate(self)->float:return self.agent.win_rate# 暴露Player Network Interface Class的属性@propertydeflogged_in(self)-> asyncio.Event:"""Event object associated with user login.:return: The logged-in event:rtype: Event"""return self.agent.ps_client.logged_in@property# 返回与玩家相关联的日志记录器deflogger(self)-> Logger:"""Logger associated with the player.:return: The logger.:rtype: Logger"""return self.agent.logger# 返回玩家的用户名@propertydefusername(self)->str:"""The player's username.:return: The player's username.:rtype: str"""return self.agent.username# 返回 WebSocket 的 URL@propertydefwebsocket_url(self)->str:"""The websocket url.It is derived from the server url.:return: The websocket url.:rtype: str"""return self.agent.ps_client.websocket_url# 获取属性的值def__getattr__(self, item:str):returngetattr(self.agent, item)
错误提示:
ERROR - Failed to swap face in postprocess method : apply_overlay() takes 3 positional arguments but 4 were given
打开插件对应目录:
\sd-webui-aki-v4.6.1\extensions\sd-webui-faceswaplab\scripts\faceswaplab_utils中
imgutil…