代码实现了一个基于 OpenFOAM 的强化学习环境类OpenFoam,用于模拟流体动力学问题并与智能体进行交互。该环境类继承自gym.Env,提供了与强化学习算法进行交互的接口,包括初始化环境、执行动作、获取状态和奖励、重置环境以及关闭环境等方法。
#完整代码下载:https://download.csdn.net/download/huanghm88/89909280
from typing import Any, Callable, List, Optional, Tuple, Unionimport gym
import numpy as np
import os
import re
import shutil
from abc import ABCMeta, abstractmethod
from time import time
from scipy import signal
import numpy as np
import pandas as pd
from DRLinFluids import cfd, utils
from gym import spaces, logger
from gym.utils import seeding
from tianshou.utils import RunningMeanStd
from sklearn.preprocessing import StandardScalerclass OpenFoam(gym.Env):def __init__(self,foam_root_path:Optional[str]= None,foam_params: Optional[dict]= None,agent_params: Optional[dict]= None,state_params: Optional[dict]= None,server=True,**kwargs):self.foam_params = foam_paramsself.agent_params = agent_paramsself.state_params = state_paramsself.foam_root_path = foam_root_pathself.task = 'OpenFoam-v0'self.state_params['probe_info'] = utils.read_foam_file('/'.join([foam_root_path, 'system', 'probes']))self.dashboard_data = {}self.trajectory_start_time = 0self.trajectory_end_time = 0self.num_episode = 0self.info_list = []self.episode_reward_sequence = []self.exec_info = {}self.num_trajectory = 0self.trajectory_reward = np.array([])self.all_episode_trajectory_reward = pd.DataFrame()self.state_data = np.array([])self.episode_reward = 0self.decorated_actions = np.array([])self.actions_sequence = np.array([])self.start_actions = 0self.end_actions = 0self.single_step_actions = np.array([])self.all_episode_actions = pd.DataFrame()self.all_episode_decorated_actions = pd.DataFrame()self.all_episode_single_step_actions = pd.DataFrame()self.probe_velocity_df = pd.DataFrame()self.probe_pressure_df = pd.DataFrame()self.force_df = pd.DataFrame()self.force_Coeffs_df = pd.DataFrame()self.history_force_df = pd.DataFrame()self.initial_force_Coeffs_df = pd.DataFrame()self.history_force_Coeffs_df = pd.DataFrame()self.history_force_Coeffs_df_alltime = pd.DataFrame()self.history_force_Coeffs_df_stepnumber=0self.start_time_float=0self.end_time_float=0self.action_time = 0self.vortex_shedding = 0self.svd_rank_df=10self.cfd_init_time_str = str(float(foam_params['cfd_init_time'])).rstrip('0').rstrip('.')self.decimal = int(np.max([len(str(agent_params['interaction_period']).split('.')[-1]),len(str(foam_params['cfd_init_time']).split('.')[-1])]))self.pressure_DMD_initial_snapshot=np.array([])self.control_matrix_gammaDMDc=np.array([])if server:action_tocsv_list = [[0, 0, 0, 0],[self.foam_params['cfd_init_time'], 0, 0, 0]]pd.DataFrame(action_tocsv_list).to_csv(self.foam_root_path + '/system/jet.csv', index=False, header=False)for f_name in os.listdir(self.foam_root_path):if f_name == 'prosessor0':shutil.rmtree('/'.join([self.foam_root_path, f_name]))elif f_name == 'prosessor1':shutil.rmtree('/'.join([self.foam_root_path, f_name]))elif f_name == 'prosessor2':shutil.rmtree('/'.join([self.foam_root_path, f_name]))elif f_name == 'prosessor3':shutil.rmtree('/'.join([self.foam_root_path, f_name]))else:passcfd.run_init(foam_root_path, foam_params)self.velocity_table_init = utils.read_foam_file(foam_root_path + f'/postProcessing/probes/0.000/U',dimension=self.foam_params['num_dimension'])cfd_init_time = int(self.foam_