深度Q-Learning在算法交易中的应用

一、说明

        在《华尔街的随机漫步》一书中,作者伯顿·马尔基尔(Burton G. Malkiel)声称:“一只蒙着眼睛的猴子向报纸的财经版面投掷飞镖,可以选择一个与专家精心挑选的投资组合一样好的投资组合。

        如果我们让巴甫洛夫的狗接受强化学习训练,而不是猴子来选择最佳投资组合策略,会怎么样?在本文中,强化学习 (RL) 是一种机器学习技术,智能体在不确定的环境中学习动作,以最大化其价值。智能体从其操作的结果中学习,而无需使用特定于任务的规则进行显式编程,

        任何 RL 算法的目标都是找到价值最大化策略 (π):

        其中 γ (0 ≤ γ ≤ 1) 是控制智能体当前奖励的贴现因子,t 是时间步长,R 是该步骤中的回报。RL 中的策略是在状态 s 中采取措施 a 的概率。

        我们将采用的算法是 Q-Learning,这是一种无模型的 RL 算法,旨在通过离散状态的动作的 VALUE 间接学习策略(称为 Q 值),而不是策略本身。它在我们的案例中很有用,因为它不需要对环境进行建模——在我们的案例中,是动荡的资本市场。

        估计 Q 值是通过贝尔曼方程完成的:

        这些 Q 值被放置在一个详尽的表 (explorarion) 中,并由代理用作查找,以查找当前状态中所有可能操作的 Q 值。从那里,它可以选择具有最高 Q 值(利用)的操作。这在有限的空间内是好的,但在具有无限组合的随机环境中就不行了,我们将用我们的神经网络解决这个问题。

        本文中设计的该代理的灵感来自 Théate、Thibaut 和 Ernst, Damien (2021) 的论文。

二、代码和 TPU 要求

        本文代码说明,TF Agents Framework、Reverb 和 Gym 需要 Linux 操作系统。如果你和我们一样,喜欢在 VSCode 上工作,请在管理员 powershell 上安装 WSL:wsl — install

        更新发行版:sudo apt-get update 和 sudo apt-get install wget ca-certificates。在 WSL cli 中,运行代码 。它将为您打开一个 VSCode。安装 VSCode WSL 扩展以获得无缝的开发人员体验,请参阅此处的教程。

        要使用硬件加速,请在 Linux 机器上安装 NVidia 驱动程序。通过以下命令检查安装是否成功:nvidia-smi。安装 TensorFlow 的 cuda 框架:pip install tensorflow[and-cuda] 并验证安装:python3 -c “import tensorflow as tf;print(tf.config.list_physical_devices('GPU'))”

        我们准备在GPU或更好的GPU上运行我们的执行策略;通过定义策略,分布在 TPU 中:

if ('COLAB_TPU_ADDR' in os.environ and IN_COLAB) or (IN_KAGGLE and 'TPU_ACCELERATOR_TYPE' in os.environ):tpu = tf.distribute.cluster_resolver.TPUClusterResolver()tf.config.experimental_connect_to_cluster(tpu)tf.tpu.experimental.initialize_tpu_system(tpu)strategy = tf.distribute.experimental.TPUStrategy(tpu)print("Running on TPU:", resolver.master())
elif len(tf.config.list_physical_devices('GPU')) > 0:strategy = tf.distribute.MirroredStrategy()gpus = tf.config.experimental.list_physical_devices('GPU')for gpu in gpus:tf.config.experimental.set_memory_growth(gpu, True)print("Running on", len(tf.config.list_physical_devices('GPU')), "GPU(s)")
else:strategy = tf.distribute.get_strategy()print("Running on CPU")print("Number of accelerators:", strategy.num_replicas_in_sync)

        使用 TPU 就绪策略,您可以分配工作负载,例如 BATCH_SIZE = 32 * strategy.num_replicas_in_sync,底层基础设施会将张量分发到 TPU 或 GPU。

        注意:TF Agent 的框架在 Kaggle 上对 TPU 的分布式策略存在问题,并在最终迭代中被放弃!

三、财务数据

        我们下载了一些财务数据,这现在是我们所有文章的标准。

def get_tickerdata(tickers_symbols, start=START_DATE, end=END_DATE, interval=INTERVAL, datadir=DATA_DIR):tickers = {}earliest_end= datetime.strptime(end,'%Y-%m-%d')latest_start = datetime.strptime(start,'%Y-%m-%d')os.makedirs(DATA_DIR, exist_ok=True)for symbol in tickers_symbols:cached_file_path = f"{datadir}/{symbol}-{start}-{end}-{interval}.csv"try:if os.path.exists(cached_file_path):df = pd.read_parquet(cached_file_path)df.index = pd.to_datetime(df.index)assert len(df) > 0else:df = yf.download(symbol,start=START_DATE,end=END_DATE,progress=False,interval=INTERVAL,)assert len(df) > 0df.to_parquet(cached_file_path, index=True, compression="snappy")min_date = df.index.min()max_date = df.index.max()nan_count = df["Close"].isnull().sum()skewness = round(skew(df["Close"].dropna()), 2)kurt = round(kurtosis(df["Close"].dropna()), 2)outliers_count = (df["Close"] > df["Close"].mean() + (3 * df["Close"].std())).sum()print(f"{symbol} => min_date: {min_date}, max_date: {max_date}, kurt:{kurt}, skewness:{skewness}, outliers_count:{outliers_count},  nan_count: {nan_count}")tickers[symbol] = dfif min_date > latest_start:latest_start = min_dateif max_date < earliest_end:earliest_end = max_dateexcept Exception as e:print(f"Error with {symbol}: {e}")return tickers, latest_start, earliest_end
tickers, latest_start, earliest_end = get_tickerdata(TICKER_SYMBOLS)
tickers[TARGET]

四、问题定义

        通过 Q-Training,我们将教巴甫洛夫代理进行交易。我们的目标是进行顺序交互,从而实现最高夏普比,通过该策略形式化(记住 Q-Learning 是非策略的,我们不会直接学习这一点):

在每个时间步 t:

观察环境状态 st 并使用 f(.) 映射历史记录 来自历史 ht 的观察 ot 具有先前的操作 a_t-1、先前的观察 o_t-1 及其返回 r_t-1。

在我们的实验中,我们将这些编码为网络的特征。

执行动作a_t,可以是:hold、long、short以 γt 获得r_t折扣的退货。γ是防止代理人只为当前回报做出战术选择(错过更好的未来回报)的贴现因素。

π(at|ht) 在 = Qt 处对数量 Q 创建操作。其中正 Q 是多头,负 Q 表示空头,当其 0 时不采取任何行动。在本文中,我们将交替使用策略 π(at|ht) 和 Q 值 Q(at,st) 的定义。

五、观测和状态空间

        本文仅使用最高价、最低价、开盘价、收盘价和成交量作为智能体环境状态的观测值。

在后面的实验中,我们将用 3 个技术指标和 3 个宏观经济指标来扩充这个空间:

  • MACD 和 APT,来自我们的文章:“动量和回归交易信号分析”
  • 每日VIX作为市场波动和恐惧的代表,以及10年期国债作为通货膨胀和利率的代表,来自我们的文章:“具有广泛市场信号条件的时间卷积神经网络”

六、行动和奖励

        RL的一个核心概念是奖励工程。让我们看一下时间 t 的动作空间 A:

        考虑到我们的流动性vc_t(我们投资组合的价值 v,剩余现金 c)和以 p 股的价格购买Q_long(交易成本 C),如果我们还没有做多,则行动 Q_Long,t 旨在最大化买入回报:

        行动 Q_Short,t 旨在将负数的股票转换为回报(做空是借入股票,因此我们的v_c最初将是负数)。

        请注意,-2n 表示卖出两次,这意味着不仅要平仓多头头寸,还要为 Qn 股票开立空头头寸,因为做空是一条负轨迹,我们需要否定我们可以买入的金额以获得正确的持仓表示。如果我们没有开始的份额,那么 -2(0) 除了空头金额外,不会产生任何影响:

        空头是有风险的,我们需要给代理人设定界限,因为空头可能会招致无限的损失:

        鉴于我们的投资组合不能落入负数,我们需要对约束进行建模。

  1. 现金价值vc_t需要足够大才能恢复到中性n_t=0。
  2. 为了回到 0,我们需要调整由市场波动 epsilon ε引起的成本 C(想想滑点、点差等)。
  3. 我们重新定义了允许的行动空间,以确保我们始终可以恢复到中立状态。

        动作空间 A 被重新定义为边界 Q- 和 Q+ 之间的一组可接受的 Q_t值:

        其中,顶部边界 Q+ 为:

        下边界 Q- 是(对于从 delta t 为正的多头出来,或者反转空头并产生两倍的成本,delta t 为负):

        delta t 是投资组合持有价值在时间上的变化:

        在上述边界中,交易成本定义为:

        其中 C 是给定绝对数量的事务成本百分比 |Q_t|的股票及其价格p_t。

七、代理目标

        在本文中,他们利用百分比回报作为奖励信号,在 -1 和 1 之间剪裁,并通过贴现因子γ进行调整,为代理提供更多战术性的短期奖励:

        在本文中,我们还将使用年化夏普(从 N 个时间窗口开始,最多 252 个交易日),并教代理生成一个没有贴现因子的最佳比率:

这只是最大化:

        或投资组合的回报率(R平均值)减去无风险利率(Rf,在撰写本文时为5%)除以投资组合的波动率(σ)

八、交易环境

        使用 TensorFlow 的 PyEnvironment,我们将为代理提供实现上述规则的环境:

class TradingEnv(py_environment.PyEnvironment):"""A custom trading environment for reinforcement learning, compatible with tf_agents.
This environment simulates a simple trading scenario where an agent can take one of three actions:- Long (buy), Short (sell), or Hold a financial instrument, aiming to maximize profit through trading decisions.Parameters:- data: DataFrame containing the stock market data.- data_dim: Dimension of the data to be used for each observation.- money: Initial capital to start trading.- state_length: Number of past observations to consider for the state.- transaction_cost: Costs associated with trading actions."""def __init__(self, data, features = FEATURES, money=CAPITAL, state_length=STATE_LEN, transaction_cost=0, market_costs=TRADE_COSTS_PERCENT, reward_discount=DISCOUNT):super(TradingEnv, self).__init__()assert data is not Noneself.features = featuresself.data_dim = len(self.features)self.state_length = state_lengthself.current_step = self.state_lengthself.reward_discount = reward_discountself.balance = moneyself.initial_balance = moneyself.transaction_cost = transaction_costself.epsilon = max(market_costs, np.finfo(float).eps) # there is always volatility costsself.total_shares = 0self._episode_ended = Falseself._batch_size = 1self._action_spec = array_spec.BoundedArraySpec(shape=(), dtype=np.int32, minimum=ACT_SHORT, maximum=ACT_LONG, name='action')self._observation_spec = array_spec.BoundedArraySpec(shape=(self.state_length * self.data_dim, ), dtype=np.float32, name='observation')self.data = self.preprocess_data(data.copy())self.reset()@propertydef batched(self):return False #True@propertydef batch_size(self):return None #self._batch_size@batch_size.setterdef batch_size(self, size):self._batch_size = sizedef preprocess_data(self, df):def _log_rets(df):log_returns = np.log(df / df.shift(1))df = (log_returns - log_returns.mean()) / log_returns.std()df = df.dropna()return dfdef min_max_scale_tensor(tensor):min_val = tf.reduce_min(tensor)max_val = tf.reduce_max(tensor)return (tensor - min_val) / (max_val - min_val)price_raw = df['Close'].copy()for col in self.features:tensor = tf.convert_to_tensor(df[col], dtype=tf.float32)normalized_tensor = min_max_scale_tensor(tensor)df[col] = normalized_tensor.numpy()df = df.replace(0.0, np.nan)df = df.interpolate(method='linear', limit=5, limit_area='inside')df = df.ffill().bfill()df[TARGET_FEATURE] = price_rawdf['Sharpe'] = 0df['Position'] = 0df['Action'] = ACT_HOLDdf['Holdings'] = 0.df['Cash'] = float(self.balance)df['Money'] = df['Holdings'] + df['Cash']df['Returns'] = 0.assert not df.isna().any().any()return dfdef action_spec(self):"""Provides the specification of the action space."""return self._action_specdef observation_spec(self):"""Provides the specification of the observation space."""return self._observation_specdef _reset(self):"""Resets the environment state and prepares for a new episode."""self.balance = self.initial_balanceself.current_step = self.state_lengthself._episode_ended = Falseself.total_shares = 0self.data['Sharpe'] = 0self.data['Position'] = 0self.data['Action'] = ACT_HOLDself.data['Holdings'] = 0.self.data['Cash']  = float(self.balance)self.data['Money'] = self.data.iloc[0]['Holdings'] + self.data.iloc[0]['Cash']self.data['Returns'] = 0.initial_observation = self._next_observation()return ts.restart(initial_observation)def _next_observation(self):"""Generates the next observation based on the current step and history length."""start_idx = max(0, self.current_step - self.state_length + 1)end_idx = self.current_step + 1obs = self.data[self.features].iloc[start_idx:end_idx]# flatten because: https://stackoverflow.com/questions/67921084/dqn-agent-issue-with-custom-environmentobs_values = obs.values.flatten().astype(np.float32)return obs_valuesdef _step(self, action):"""Executes a trading action and updates the environment's state."""if self._episode_ended:return self.reset()self.current_step += 1current_price = self.data.iloc[self.current_step][TARGET_FEATURE]assert not self.data.iloc[self.current_step].isna().any().any()if action == ACT_LONG:self._process_long_position(current_price)elif action == ACT_SHORT:prev_current_price = self.data.iloc[self.current_step - 1][TARGET_FEATURE]self._process_short_position(current_price, prev_current_price)elif action == ACT_HOLD:self._process_hold_position()elif action == ACT_NEUTRAL:self._process_neutral_position(current_price)else:raise Exception(f"Invalid Actions: {action}")self._update_financials()done = self.current_step >= len(self.data) - 1reward = self._calculate_reward_signal()if done:self._episode_ended = Truereturn ts.termination(self._next_observation(), reward)else:return ts.transition(self._next_observation(), reward, discount=self.reward_discount)def _get_lower_bound(self, cash, total_shares, price):"""Compute the lower bound of the action space, particularly for short selling,based on current cash, the number of shares, and the current price."""delta = -cash - total_shares * price * (1 + self.epsilon) * (1 + self.transaction_cost)if delta < 0:lowerBound = delta / (price * (2 * self.transaction_cost + self.epsilon * (1 + self.transaction_cost)))else:lowerBound = delta / (price * self.epsilon * (1 + self.transaction_cost))if np.isinf(lowerBound):assert Falsereturn lowerBounddef _process_hold_position(self):step_idx = self.data.index[self.current_step]self.data.at[step_idx, "Cash"] = self.data.iloc[self.current_step - 1]["Cash"]self.data.at[step_idx, "Holdings"] = self.data.iloc[self.current_step - 1]["Holdings"]self.data.at[step_idx, "Position"] = self.data.iloc[self.current_step - 1]["Position"]self.data.at[step_idx, "Action"] = ACT_HOLDdef _process_neutral_position(self, current_price):step_idx = self.data.index[self.current_step]self.data.at[step_idx, "Cash"] = self.data.iloc[self.current_step - 1]['Cash'] - self.total_shares * current_price * (1 + self.transaction_cost)self.data.at[step_idx, "Holdings"] = 0.0self.data.at[step_idx, "Position"] = 0.0self.data.at[step_idx, "Action"] = ACT_NEUTRALdef _process_long_position(self, current_price):step_idx = self.data.index[self.current_step]self.data.at[step_idx, 'Position'] = 1if self.data.iloc[self.current_step - 1]['Position'] == 1:# more longself.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step - 1]['Cash']self.data.at[step_idx, 'Holdings'] = self.total_shares * current_priceelif self.data.iloc[self.current_step - 1]['Position'] == 0:# new longself.total_shares = math.floor(self.data.iloc[self.current_step - 1]['Cash'] / (current_price * (1 + self.transaction_cost)))self.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step - 1]['Cash'] - self.total_shares * current_price * (1 + self.transaction_cost)self.data.at[step_idx, 'Holdings'] = self.total_shares * current_priceself.data.at[step_idx, 'Action'] = 1else:# short to longself.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step - 1]['Cash'] - self.total_shares * current_price * (1 + self.transaction_cost)self.total_shares = math.floor(self.data.iloc[self.current_step]['Cash'] / (current_price * (1 + self.transaction_cost)))self.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step]['Cash'] - self.total_shares * current_price * (1 + self.transaction_cost)self.data.at[step_idx, 'Holdings'] = self.total_shares * current_priceself.data.at[step_idx, 'Action'] = 1def _process_short_position(self, current_price, prev_price):"""Adjusts the logic for processing short positions to include lower bound calculations."""step_idx = self.data.index[self.current_step]self.data.at[step_idx, 'Position'] = -1if self.data.iloc[self.current_step - 1]['Position'] == -1:# Short morelow = self._get_lower_bound(self.data.iloc[self.current_step - 1]['Cash'], -self.total_shares, prev_price)if low <= 0:self.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step - 1]["Cash"]self.data.at[step_idx, 'Holdings'] = -self.total_shares * current_priceelse:total_sharesToBuy = min(math.floor(low), self.total_shares)self.total_shares -= total_sharesToBuyself.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step - 1]["Cash"] - total_sharesToBuy * current_price * (1 + self.transaction_cost)self.data.at[step_idx, 'Holdings'] = -self.total_shares * current_priceelif self.data.iloc[self.current_step - 1]['Position'] == 0:# new shortself.total_shares = math.floor(self.data.iloc[self.current_step - 1]["Cash"] / (current_price * (1 + self.transaction_cost)))self.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step - 1]["Cash"] + self.total_shares * current_price * (1 - self.transaction_cost)self.data.at[step_idx, 'Holdings'] = -self.total_shares * current_priceself.data.at[step_idx, 'Action'] = -1else:# long to shortself.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step - 1]["Cash"] + self.total_shares * current_price * (1 - self.transaction_cost)self.total_shares = math.floor(self.data.iloc[self.current_step]["Cash"] / (current_price * (1 + self.transaction_cost)))self.data.at[step_idx, 'Cash'] = self.data.iloc[self.current_step]["Cash"] + self.total_shares * current_price * (1 - self.transaction_cost)self.data.at[step_idx, 'Holdings'] = -self.total_shares * current_priceself.data.at[step_idx, 'Action'] = -1def _update_financials(self):"""Updates the financial metrics including cash, money, and returns."""step_idx = self.data.index[self.current_step]self.balance = self.data.iloc[self.current_step]['Cash']self.data.at[step_idx,'Money'] = self.data.iloc[self.current_step]['Holdings'] + self.data.iloc[self.current_step]['Cash']self.data.at[step_idx,'Returns'] = ((self.data.iloc[self.current_step]['Money'] - self.data.iloc[self.current_step - 1]['Money'])) / self.data.iloc[self.current_step - 1]['Money']def _calculate_reward_signal(self, reward_clip=REWARD_CLIP):"""Calculates the reward for the current step. In the paper they use the %returns."""return np.clip(self.data.iloc[self.current_step]['Returns'], -reward_clip, reward_clip)def calculate_churn(self):churn = 0prev_position = 0for i in range(1, self.current_step + 1):current_position = self.data.at[self.data.index[i], 'Position']action = self.data.at[self.data.index[i], 'Action']if action in [ACT_LONG, ACT_SHORT] and current_position != prev_position:churn += self.transaction_costprev_position = current_positionif self.current_step > 0:churn /= self.current_stepreturn churndef calculate_drawdown_metrics(self):cumulative_returns = (1 + self.data['Returns'].iloc[:self.current_step + 1]).cumprod()peak = cumulative_returns.expanding(min_periods=1).max()drawdown = (cumulative_returns - peak) / peakpeak_dates = cumulative_returns[peak == cumulative_returns]drawdown_durations = pd.Series(index=drawdown.index, dtype='timedelta64[ns]')for date in drawdown.index:recent_peak_date = peak_dates[peak_dates.index <= date].index[-1]duration = date - recent_peak_datedrawdown_durations.at[date] = durationdrawdown_durations_days = drawdown_durations.dt.daysmax_dd_duration_days = drawdown_durations_days.max()return drawdown, max_dd_duration_daysdef get_trade_stats(self, riskfree_rate=0.05):rets = self.data['Returns'].iloc[:self.current_step + 1]monthly_riskfree_rate = (1 + riskfree_rate)**(1/12) - 1annualized_return = rets.mean() * 12annualized_vol = rets.std() * np.sqrt(12)sharpe_ratio = (rets.mean() - monthly_riskfree_rate) / rets.std() * np.sqrt(12)downside_deviation = rets[rets < 0].std() * np.sqrt(12)drawdowns, max_dd_duration_days = self.calculate_drawdown_metrics()sortino_ratio = (rets.mean() - monthly_riskfree_rate) / downside_deviationchurn = self.calculate_churn()return {"Annualized Return": annualized_return,"Annualized Vol": annualized_vol,"Sharpe Ratio": sharpe_ratio,"Downside Deviation": downside_deviation,"Sortino Ratio": sortino_ratio,"Max Drawdown": drawdowns.max(),"Max Drawdown Days": max_dd_duration_days,"Trade Churn": churn,"Skewness": skew(rets.values),"Kurtosis": kurtosis(rets.values)}def render(self, mode='human'):print(f'Step: {self.current_step}, Balance: {self.balance}, Holdings: {self.total_shares}')print(f"trade stats: {self.get_trade_stats()}")

Test the environment:

stock= tickers[TARGET]
train_data = stock[stock.index < pd.to_datetime(SPLIT_DATE)].copy()
test_data = stock[stock.index >= pd.to_datetime(SPLIT_DATE)].copy()train_env = TradingEnv(train_data)
utils.validate_py_environment(train_env, episodes=TRAIN_EPISODES)
test_env = TradingEnv(test_data)
utils.validate_py_environment(train_env, episodes=TRAIN_EPISODES//4)
print(f"TimeStep Specs: {train_env.time_step_spec()}")
print(f"Action Specs: {train_env.action_spec()}")
print(f"Reward Specs: {train_env.time_step_spec().reward}")
def execute_action_and_print_state(env, action):next_time_step = env.step(np.array(action, dtype=np.int32))print(f'Action taken: {action} at step: {env.current_step}')# print(f'New state: {next_time_step.observation}')print(f'New balance: {env.balance}')print(f'Total shares: {env.total_shares}')print(f'Reward: {next_time_step.reward}\n')
time_step = train_env.reset()
# print(f'Initial state: {time_step.observation}')
# Some dryruns to validate our env logic: Buy, Sell, we should have a positive balance with TSLA
execute_action_and_print_state(train_env, ACT_HOLD)
execute_action_and_print_state(train_env, ACT_LONG)
execute_action_and_print_state(train_env, ACT_SHORT)
execute_action_and_print_state(train_env, ACT_HOLD)
execute_action_and_print_state(train_env, ACT_LONG)

九、深度 Q-Network 架构

        深度 Q 神经网络架构 (DQN) 近似于动作值函数 π∗(at|st) 的 Q 表。这是一个近似值,因为您可以使用 Q-Table 的组合数量是巨大的且无法处理的。

        Q 网络也称为策略模型。我们还将在我们的架构中利用目标 Q 网络。Tha 目标模型的更新比 Q-Network 少,并且有助于稳定训练过程,因为 Q-Network 被训练以减少其输出和目标网络(更稳定的值)。

        最后,我们将添加一个回放记忆来为我们的模型提供样本数据。内存是固定大小的循环内存(因此它会“忘记”旧内存),并且在每个固定频率下,模型使用内存来计算其预测的 Q 值与内存中执行的 Q 值之间的损失。

十、强化学习流程

        一张图片胜过千言万语;下面的流程图将指导我们进行整个训练和更新目标模型:

        首先,我们启动了环境 St 和动作状态 Qt。

        然后,我们运行多次迭代来训练模型并记住状态、动作和预测的 Q 值。在每次迭代中,将发生以下事件:

  1. 获取状态。
  2. 采取随机行动(ε贪婪)或预测给定当前状态下的行动的 Q 值,前者称为探索,后者称为利用。ε会随着时间的流逝而衰减,随着模型的学习,它应该探索得更少。
  3. 在预测 Q 值时,它将使用策略模型。无论是探索还是爆炸,它都会保存状态、动作和给定 Q 值的记忆。
  4. 通过从目标网络获取最大预测来计算目标 Q 值。从前面的公式 rt + γt * Qtarget(s_t+1, a_t+1) 中,gamma γ 是贴现因子。
  5. 重新调整策略模型,以最小化不同模型的 Q 值差异(损失)。训练使用来自回放记忆的采样状态。
  6. 在剧集结束时,或者我们选择的任何时间间隔,我们将策略模型的余量复制到目标模型。
def create_q_network(env, fc_layer_params=LAYERS, dropout_rate=DROPOUT, l2_reg=L2FACTOR):"""Creates a Q-Network with dropout and batch normalization.Parameters:- env: The environment instance.- fc_layer_params: Tuple of integers representing the number of units in each dense layer.- dropout_rate: Dropout rate for dropout layers.- l2_reg: L2 regularization factor.Returns:- q_net: The Q-Network model."""env = tf_py_environment.TFPyEnvironment(env)action_tensor_spec = tensor_spec.from_spec(env.action_spec())num_actions = action_tensor_spec.maximum - action_tensor_spec.minimum + 1layers = []for num_units in fc_layer_params:layers.append(tf.keras.layers.Dense(num_units,activation=None,kernel_initializer=tf.keras.initializers.VarianceScaling(scale=2.0, mode='fan_in', distribution='truncated_normal'),kernel_regularizer=tf.keras.regularizers.l2(l2_reg)))layers.append(tf.keras.layers.BatchNormalization())layers.append(tf.keras.layers.LeakyReLU())layers.append(tf.keras.layers.Dropout(dropout_rate))q_values_layer = tf.keras.layers.Dense(num_actions,activation=None,kernel_initializer=tf.keras.initializers.GlorotNormal(),bias_initializer=tf.keras.initializers.GlorotNormal(),kernel_regularizer=tf.keras.regularizers.l2(l2_reg))q_net = sequential.Sequential(layers + [q_values_layer])return q_net
def create_agent(q_net, env, t_q_net = None, optimizer = None, eps=EPSILON_START, learning_rate=LEARN_RATE, gradient_clipping = GRAD_CLIP):"""Creates a DQN agent for a given environment with specified configurations.Parameters:- q_net (tf_agents.networks.Network): The primary Q-network for the agent.- env (tf_agents.environments.PyEnvironment or tf_agents.environments.TFPyEnvironment):The environment the agent will interact with. A TFPyEnvironment wrapper is appliedif not already wrapped.- t_q_net (tf_agents.networks.Network, optional): The target Q-network for the agent.If None, no target network is used.- optimizer (tf.keras.optimizers.Optimizer, optional): The optimizer to use for training the agent.If None, an Adam optimizer with exponential decay learning rate is used.- eps (float): The epsilon value for epsilon-greedy exploration.- learning_rate (float): The initial learning rate for the exponential decay learning rate schedule.Ignored if an optimizer is provided.- gradient_clipping (float): The value for gradient clipping. If 1., no clipping is applied.Returns:- agent (tf_agents.agents.DqnAgent): The initialized and configured DQN agent."""if optimizer is None:lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(learning_rate, decay_steps=1000, decay_rate=0.96, staircase=True)optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedule)env = tf_py_environment.TFPyEnvironment(env)# see: https://www.tensorflow.org/agents/api_docs/python/tf_agents/agents/DqnAgentagent = dqn_agent.DqnAgent(env.time_step_spec(),env.action_spec(),q_network=q_net,target_q_network = t_q_net,target_update_period = TARGET_UPDATE,optimizer=optimizer,epsilon_greedy = eps,reward_scale_factor = 1,gradient_clipping = gradient_clipping,td_errors_loss_fn=common.element_wise_huber_loss,train_step_counter=tf.compat.v1.train.get_or_create_global_step(),name="TradeAgent")agent.initialize()print(agent.policy)print(agent.collect_policy)return agent
with strategy.scope():q_net = create_q_network(train_env)t_q_net = create_q_network(train_env)agent = create_agent(q_net, train_env, t_q_net=t_q_net)

十一、交易操作

        使用 TensorFlow 代理的框架,培训我们的巴甫洛夫交易员应该比自己构建架构更容易。

        交易模拟器类将准备所有必需的变量。在这种情况下,它将使用 DeepMind 的 Reverb 初始化回放内存,并为代理创建收集器策略。与用于预测目标 Q 值的评估策略 (π(at|ht)) 不同,收集器将探索和收集带有动作的数据及其对内存的结果值,记忆被保存为轨迹 (τ) 在 tensorflow 中,它是当前观察到的状态 (ot)、采取的行动 (at)、收到的奖励 (r_t+1) 和以下观察到的状态 (o_t+1) 的集合,形式化为 r=(o_t-1, a_t-1, rt, ot, dt),其中 dt 是结束状态的标志,如果这是最后一次观测值。

        为了给我们的智能体提供学习的机会,我们将使用一个高 epsilon 让它进行大量探索,并使用以下公式慢慢衰减它:

哪里:

  • ε_decayed 是当前步骤的衰减 epsilon 值,
  • ε_initial 是训练开始时的初始 epsilon 值,我们将其设置为 1,这意味着它仅在开始时进行探索。
  • 我们希望代理利用的最终值是环境,ε_final最好是在部署时。
  • step 是训练过程中的当前步骤或迭代,decay_steps 是控制速率的参数,在本例中为 1000。随着台阶接近尾声,衰变会越来越小。

让我们编写模拟器代码来训练和评估我们的代理,在 TPU 上它运行 2 小时,在 CPU 上它可以运行长达 8 小时:

import tensorflow as tf
class CustomMetrics(tf.Module):def __init__(self, name=None):super(CustomMetrics, self).__init__(name=name)self.total_returns = tf.Variable([], dtype=tf.float32, trainable=False, shape=tf.TensorShape(None))self.average_returns = tf.Variable([], dtype=tf.float32, trainable=False, shape=tf.TensorShape(None))self.sharpe_ratios = tf.Variable([], dtype=tf.float32, trainable=False, shape=tf.TensorShape(None))self.losses = tf.Variable([], dtype=tf.float32, trainable=False, shape=tf.TensorShape(None))def update_metrics(self, losses=None, total_return=None, average_return=None, sharpe_ratio=None):if total_return is not None:self.total_returns.assign(tf.concat([self.total_returns.value(), [total_return]], axis=0))if total_return is not None:self.average_returns.assign(tf.concat([self.average_returns.value(), [average_return]], axis=0))if total_return is not None:self.sharpe_ratios.assign(tf.concat([self.sharpe_ratios.value(), [sharpe_ratio]], axis=0))if losses is not None:self.losses.assign(tf.concat([self.losses.value(), [losses]], axis=0))
class TradingSimulator:def __init__(self, env, eval_env, agent, episodes=TRAIN_EPISODES,batch_size=BATCH_SIZE, num_eval_episodes=TEST_INTERVALS,collect_steps_per_iteration=INIT_COLLECT,replay_buffer_max_length=MEMORY_LENGTH ,num_iterations = TOTAL_ITERS, log_interval=LOG_INTERVALS,eval_interval=None, global_step=None):self.py_env = envself.env =  tf_py_environment.TFPyEnvironment(self.py_env)self.py_eval_env = eval_envself.eval_env =  tf_py_environment.TFPyEnvironment(self.py_eval_env)self.agent = agentself.episodes = episodesself.log_interval = log_intervalself.eval_interval = eval_intervalself.global_step = global_stepself.batch_size = batch_sizeself.num_eval_episodes = num_eval_episodesself.collect_steps_per_iteration = collect_steps_per_iterationself.replay_buffer_max_length = replay_buffer_max_lengthself.num_iterations = num_iterationsself.policy = self.agent.policyself.collect_policy = self.agent.collect_policyself.random_policy = random_tf_policy.RandomTFPolicy(self.env.time_step_spec(),self.env.action_spec())self.replay_buffer_signature = tensor_spec.from_spec(self.agent.collect_data_spec)self.replay_buffer_signature = tensor_spec.add_outer_dim(self.replay_buffer_signature)def init_memory(self, table_name = 'uniform_table'):self.table = reverb.Table(table_name,max_size=self.replay_buffer_max_length,sampler=reverb.selectors.Uniform(),remover=reverb.selectors.Fifo(),rate_limiter=reverb.rate_limiters.MinSize(1),signature=self.replay_buffer_signature)self.reverb_server = reverb.Server([self.table])self.replay_buffer = reverb_replay_buffer.ReverbReplayBuffer(self.agent.collect_data_spec,table_name=table_name,sequence_length=2,local_server=self.reverb_server)self.rb_observer = reverb_utils.ReverbAddTrajectoryObserver(self.replay_buffer.py_client, table_name, sequence_length=2)self.dataset = self.replay_buffer.as_dataset(num_parallel_calls=3, sample_batch_size=self.batch_size, num_steps=2).prefetch(3)return self.dataset, iter(self.dataset)def eval_metrics(self, num_eval_episodes):@tf.functiondef _eval_step():time_step = self.eval_env.reset()episode_returns = tf.TensorArray(dtype=tf.float32, size=0, dynamic_size=True, clear_after_read=False)while not time_step.is_last():action_step = self.policy.action(time_step)time_step = self.eval_env.step(action_step.action)rewards = time_step.rewardepisode_returns = episode_returns.write(episode_returns.size(), rewards)episode_returns_stacked = episode_returns.stack()cumulative_returns = tf.math.cumprod(episode_returns_stacked + 1) - 1total_episode_return = cumulative_returns[-1]episode_avg_return = tf.reduce_mean(episode_returns_stacked)episode_std_dev = tf.math.reduce_std(episode_returns_stacked)episode_sharpe_ratio = tf.cond(episode_std_dev > 0,lambda: episode_avg_return / episode_std_dev,lambda: tf.constant(0.0))return total_episode_return, episode_avg_return, episode_std_dev, episode_sharpe_ratio# Initialize lists to hold the metrics for all episodestotal_returns_list = []episode_avg_returns_list = []episode_std_devs_list = []episode_sharpe_ratios_list = []for i in tqdm(range(0, num_eval_episodes), desc=f"Eval for {num_eval_episodes}"):total_episode_return, episode_avg_return, episode_std_dev, episode_sharpe_ratio = _eval_step()total_returns_list.append(total_episode_return)episode_avg_returns_list.append(episode_avg_return)episode_std_devs_list.append(episode_std_dev)episode_sharpe_ratios_list.append(episode_sharpe_ratio)# Convert lists to tensors for returningtotal_returns = tf.convert_to_tensor(total_returns_list)episode_avg_returns = tf.convert_to_tensor(episode_avg_returns_list)episode_std_devs = tf.convert_to_tensor(episode_std_devs_list)episode_sharpe_ratios = tf.convert_to_tensor(episode_sharpe_ratios_list)return total_returns, episode_avg_returns, episode_std_devs, episode_sharpe_ratiosdef train(self, checkpoint_path=MODELS_PATH, initial_epsilon= EPSILON_START, final_epsilon = EPSILON_END, decay_steps=EPSILON_DECAY):@tf.functiondef _train_step(experience, agent, metrics, global_step, initial_epsilon, final_epsilon, decay_steps):train_loss = agent.train(experience).lossmetrics.update_metrics(losses=train_loss)decayed_epsilon = final_epsilon + (initial_epsilon - final_epsilon) * tf.math.exp(-1. * tf.cast(global_step, tf.float32) / decay_steps)agent.collect_policy._epsilon = decayed_epsilonreturn train_lossprint("Preparing replay memory and dataset")_, iterator = self.init_memory()self.metrics = CustomMetrics()self.global_step = tf.compat.v1.train.get_or_create_global_step()checkpoint_dir = os.path.join(checkpoint_path, 'checkpoint')train_checkpointer = common.Checkpointer(ckpt_dir=checkpoint_dir,max_to_keep=1,agent=agent,metrics=self.metrics,policy=agent.policy,replay_buffer=self.replay_buffer,global_step=self.global_step)status = train_checkpointer.initialize_or_restore()self.global_step = tf.compat.v1.train.get_global_step()print(f'Next step restored: {self.global_step.numpy()} status: {status}')self.policy = agent.policyself.agent.train = common.function(self.agent.train)self.agent.train_step_counter.assign(self.global_step )time_step = self.py_env.reset()collect_driver = py_driver.PyDriver(self.py_env,py_tf_eager_policy.PyTFEagerPolicy(self.agent.collect_policy, use_tf_function=True),[self.rb_observer],max_steps=self.collect_steps_per_iteration)print(f"Running training starting {self.global_step.numpy()} to {self.num_iterations}")for _ in tqdm(range(self.global_step.numpy(), self.num_iterations), desc=f"Training for {self.global_step.numpy() - self.num_iterations}"):time_step, _ = collect_driver.run(time_step)experience, _ = next(iterator)train_loss = _train_step(experience, agent, self.metrics, self.global_step, initial_epsilon= initial_epsilon, final_epsilon = final_epsilon, decay_steps=decay_steps)if self.global_step.numpy() % self.log_interval == 0:print(f'step = {self.global_step.numpy()}: loss = {train_loss}')# Later call: saved_policy = tf.saved_model.load(policy_dir)train_checkpointer.save(self.global_step)if (self.eval_interval is not None) and (self.global_step.numpy() % self.eval_interval == 0):total_returns, episode_avg_returns, episode_std_devs, episode_sharpe_ratios= self.eval_metrics(self.eval_interval // 2)tr = np.mean(total_returns)av = np.mean(episode_avg_returns)sr = np.mean(episode_sharpe_ratios)sd = np.mean(episode_std_devs)print(f'step = {self.global_step.numpy()}: Average Return = {av}, Total Return = {tr}, Avg Sharpe = {sr} -- Saving {self.global_step} Checkpoint')self.metrics.update_metrics(total_return=tr, average_return=av, sharpe_ratio=sr)train_checkpointer.save(self.global_step)print(f'\nTraining completed. Loss: {np.mean(self.metrics.losses.numpy()):.4f}')policy_dir = os.path.join(checkpoint_path, 'policy')tf_policy_saver = policy_saver.PolicySaver(agent.policy)tf_policy_saver.save(policy_dir)self.zip_directories(checkpoint_path)print("Policy saved")self.rb_observer.close()self.reverb_server.stop()return self.metricsdef load_and_eval_policy(self, policy_path=MODELS_PATH, eval_interval=TEST_INTERVALS//4):policy_dir = os.path.join(policy_path, 'policy')self.policy = tf.saved_model.load(policy_dir)total_returns, avg_return, _, sharpe_ratio = self.eval_metrics(eval_interval)print(f'Average Return = {np.mean(avg_return)}, Total Return = {np.mean(total_returns)}, Sharpe = {np.mean(sharpe_ratio)}')return self.policy, total_returns, avg_return, sharpe_ratiodef clear_directories(self, directories = MODELS_PATH):try:if IN_COLAB:shutil.rmtree(f"{GDRIVE}/MyDrive/{directories}")print(f"Successfully cleared {GDRIVE}/MyDrive/{directories}")shutil.rmtree(directories)print(f"Successfully cleared {directories}")except Exception as e:print(f"Error clearing {directories}: {e}")def zip_directories(self, directories = MODELS_PATH, output_filename=f'{MODELS_PATH}/model_files'):"""Creates a zip archive containing the specified directories.Parameters:- directories: List of paths to directories to include in the archive.- output_filename: The base name of the file to create, including the path,minus any format-specific extension. Default is 'training_backup'."""if IN_COLAB:archive_path = shutil.make_archive(f'{GDRIVE}/MyDrive/{directories}', 'zip', root_dir='.', base_dir=directories)print(f"Archived {GDRIVE}/MyDrive/{directories}")else:archive_path = shutil.make_archive(output_filename, 'zip', root_dir='.', base_dir=directories)print(f"Archived {directories} into {archive_path}")def plot_performance(self, metrics):"""Plot the training performance including average returns and Sharpe Ratios on the same plot,with returns on the left y-axis and Sharpe Ratios on the right y-axis."""fig, axs = plt.subplots(1, 2, figsize=(18, 4))axs[0].set_xlabel('Iterations')axs[0].set_ylabel('Average Return')axs[0].plot(range(0, len(metrics.average_returns.numpy())), metrics.average_returns.numpy(), label='Average Return', color="blue")axs[0].tick_params(axis='y')axs[0].legend(loc="upper right")ax12 = axs[0].twinx()ax12.set_ylabel('Sharpe Ratio')ax12.plot(range(0, len(metrics.sharpe_ratios.numpy())), metrics.sharpe_ratios.numpy(), label='Sharpe Ratio', color="yellow")ax12.tick_params(axis='y')ax12.legend(loc="upper left")axs[1].set_xlabel('Iterations')axs[1].set_ylabel('Loss')axs[1].plot(range(0, len(metrics.losses.numpy())), metrics.losses.numpy(), label='Loss', color="red")axs[1].tick_params(axis='y')axs[1].legend()fig.tight_layout()plt.title('Training Performance: Average Returns and Sharpe Ratios')plt.show()def plot_returns_and_actions(self):"""Steps through the environment's data using the given policy and plots the actions,along with a subplot for cumulative returns."""data = self.py_eval_env.data.copy()actions = [ACT_HOLD]cumulative_returns = [0]time_step = self.eval_env.reset()for _ in tqdm(range(1, len(data)), desc=f"Live actions for {len(data)} iters"):action_step = self.policy.action(time_step)time_step = self.eval_env.step(action_step.action)actions.append(action_step.action)if len(cumulative_returns) > 1:cumulative_returns.append(cumulative_returns[-1] + time_step.reward.numpy()[-1] )else:cumulative_returns.append(time_step.reward.numpy()[-1])data['Action'] = actionsdata['Cumulative_Returns'] = np.cumsum(cumulative_returns)fig, axs = plt.subplots(2, 1, figsize=(18, 8), gridspec_kw={'height_ratios': (3, 1)})axs[0].plot(data.index, data[TARGET_FEATURE], label='Close', color='k', alpha=0.6)buys = data[data['Action'] == ACT_LONG]axs[0].scatter(buys.index, buys[TARGET_FEATURE], label='Buy', color='green', marker='^', alpha=1)sells = data[data['Action'] == ACT_SHORT]axs[0].scatter(sells.index, sells[TARGET_FEATURE], label='Sell', color='red', marker='v', alpha=1)axs[0].set_ylabel('Close ($)')axs[0].set_title('Trading Actions and Close Prices')axs[0].legend()axs[1].plot(data.index, data['Cumulative_Returns'], label='Cumulative Returns', color='blue')axs[1].set_ylabel('Cumulative Returns')axs[1].set_title('Cumulative Returns')axs[1].legend()for ax in axs:ax.xaxis.set_major_locator(mdates.AutoDateLocator())ax.xaxis.set_major_formatter(mdates.DateFormatter('%Y-%m-%d'))fig.autofmt_xdate()plt.xlabel('Date')plt.show()
with strategy.scope():sim = TradingSimulator(train_env, test_env, agent=agent)
try:%%time
except:pass
# Uncomment if you are training a new policy
def train():with strategy.scope():loss = sim.train()sim.plot_performance(loss)sim.eval_metrics(25)
train()

        在训练过程中,我们可以观察到 epsilon 贪婪策略的影响,其中初始训练事件会产生很高的损失,因为代理会采取随机操作:

        培训完成后,我们评估新策略并可视化其结果:

        鉴于我们目前设置的 0.6 epsilon 折扣和 +25% 的负回报的小惩罚,该代理的表现相当不错。

{'Annualized Return': 0.4157554099733984,'Annualized Vol': 0.526509836743958,'Sharpe Ratio': 0.6969680274041493,'Downside Deviation': 0.3457139483184213,'Sortino Ratio': 0.00421213131836031,'Max Drawdown': 0.0,'Max Drawdown Days': 261,'Trade Churn': 0.0,'Skewness': 0.7191282724766412,'Kurtosis': 5.44751948119778}

比较了本文的基准测试及其 TQDM 架构,我们在 Tensorflow 设置和架构方面做得更好(我们对一些超参数进行了猜测),除了回撤持续时间、Sharpe 和 Sortino 比率:

让我们尝试获取更多的短期收益,我们将 epsilon 降低到 0.3,将我们的代理变成“日内交易者”:

{'Annualized Return': -0.350555829680856,'Annualized Vol': 0.5188582006668314,'Sharpe Ratio': -0.7696721701935054,'Downside Deviation': 0.37696770151282327,'Sortino Ratio': -0.00420387492724144,'Max Drawdown': 0.0,'Max Drawdown Days': 672,'Trade Churn': 0.0,'Skewness': 0.352979579434461,'Kurtosis': 6.181142153328116}

追逐短期收益并不顺利,代理处于永久回撤状态!

十二、三组实验

        接下来,我们将通过在以下 3 个实验中增强数据和奖励函数来进行测试。

12.1 实验 1 — 夏普比率作为奖励

        这将是我们的下一个奖励功能,假设我们的代理人可以将其投资组合的波动性降至最低。在我们的例子中,我们需要将过去交易时段的数量(最多 252 个,一年中的交易日)平方,以给出年化夏普作为奖励:

def _calculate_sharpe_reward_signal(self, risk_free_rate=0.05, periods_per_year=252, reward_clip=REWARD_CLIP):"""Calculates the annualized Sharpe ratio up to the CURRENT STEP.Parameters:- risk_free_rate (float): The annual risk-free rate. It will be adjusted to match the period of the returns.- periods_per_year (int): Number of periods in a year (e.g., 252 for daily, 12 for monthly).Returns:- float: The annualized Sharpe ratio as reward."""period_risk_free_rate = (1 + risk_free_rate) ** (1 / periods_per_year) - 1observed_returns = self.data['Returns'].iloc[:self.current_step + 1]excess_returns = observed_returns - period_risk_free_ratemean_excess_return = np.mean(excess_returns)std_dev_returns = np.std(observed_returns)sharpe_ratio = mean_excess_return / std_dev_returns if std_dev_returns > 0 else 0annual_sr = sharpe_ratio * np.sqrt(periods_per_year)self.data.at[self.data.index[self.current_step], 'Sharpe'] = annual_srreturn np.clip(annual_sr, -reward_clip, reward_clip)

结果是有希望的,比基线要好——但看看这个回撤的持续时间!

{'Annualized Return': 0.2483099753903375,'Annualized Vol': 0.5315927401982844,'Sharpe Ratio': 0.3753156743014152,'Downside Deviation': 0.3649215721069904,'Sortino Ratio': 0.0021695799842264578,'Max Drawdown': -0.5284848329394528,'Max Drawdown Days': 493,'Trade Churn': 0.0,'Skewness': 0.547255666186771,'Kurtosis': 5.424081523143858}

从图表上看,我们看到它只交易了一次!我们应该改变折扣,以防止它变成只买入并持有的政策。

12.2 实验 2:技术分析 (TA) 信号

使用 Pandas-TA 库,我们用以下信号来增强我们的时间序列:

  • 移动平均收敛散度 (MACD) 可用于确认趋势的存在。此外,它还可以发现与价格的背离,这可能预示着潜在的反转。MACD 由 12 天快速移动平均线 (MA)、26 天慢移动平均线 (MA) 和信号创建,信号是它们差值的 9 天指数移动平均线 (EMA)。
  • 平均真实范围 (ATR) 将表明代理价格波动及其幅度,这将暗示环境的波动性。它是通过分解价格极端值的 14 天移动平均线构建的。

下面的代码创建以下信号:

stock_df = tickers[TARGET].copy()
macd = MACD(close=stock_df["Close"], window_slow=26, window_fast=12, window_sign=9, fillna=True)
stock_df['MACD'] = macd.macd()
stock_df['MACD_HIST'] = macd.macd_diff()
stock_df['MACD_SIG'] = macd.macd_signal()
atr = AverageTrueRange(stock_df["High"], stock_df["Low"], stock_df["Close"], window = 14, fillna = True)
stock_df['ATR'] = atr.average_true_range()
ema = EMAIndicator(stock_df["Close"], window = 14, fillna = True)
stock_df['EMA'] = ema.ema_indicator()
stock_df

与基线和夏普变体相比,结果很差,这是一个失败者,整个事件都是回撤:

{'Annualized Return': 0.08037565358057806,'Annualized Vol': 0.5327752235074609,'Sharpe Ratio': 0.05927596580709699,'Downside Deviation': 0.36637039877343286,'Sortino Ratio': 0.00034205956635066734,'Max Drawdown': 0.0,'Max Drawdown Days': 672,'Trade Churn': 0.0,'Skewness': 0.5699760228074306,'Kurtosis': 5.441197183719924}

视觉结果如下所示:

TA 信号导致代理在错误的时间搅动。从图中我们可以观察到,从战术上讲,代理在交易后立即看到了可观的奖励(奖励条),但是当它的奖励进入负值区域时,代理做了所有错误的行为。

请注意,这两个信号都是滞后信号,这可以解释为什么代理行动太晚。

如果我们有时间,我们会尝试使用单个 TA 和/或折扣因素,以给予代理更多的战略激励。

12.3 实验3:宏信号

在此实验中,我们将通过以下时间序列为智能体提供对其宏观环境的见解:

  1. VIX — 当前时期的波动率和恐惧指数。
  2. 10年期国库券收益率——作为通胀的指标。
  3. 标准普尔 500 指数 — 市场风险因素。

对于代码,它只是添加以下时间序列:

RATES_INDEX = "^TNX"  # 10 Year Treasury Note Yield
VOLATILITY_INDEX = "^VIX"  # CBOE Volatility Index
SMALLCAP_INDEX = "^RUT" # Russell 2000 Index
GOLD_INDEX = "GC=F" # Gold futures
TICKER_SYMBOLS = [TARGET, RATES_INDEX, VOLATILITY_INDEX, SMALLCAP_INDEX, GOLD_INDEX]stock_df[VOLATILITY_INDEX] = tickers[VOLATILITY_INDEX]["Close"]
stock_df[RATES_INDEX] = tickers[RATES_INDEX]["Close"]
stock_df[SMALLCAP_INDEX] = tickers[SMALLCAP_INDEX]["Close"]
stock_df[GOLD_INDEX] = tickers[GOLD_INDEX]["Close"]
stock_df[MARKET] = tickers[MARKET]["Close"]
stock_df

为我们提供了出色的结果,一个可以访问影响其边界环境的信息的代理可以创建优于我们的基线和所有论文基准的策略:

{'Annualized Return': 0.7375854975646395,'Annualized Vol': 0.48576500545216406,'Sharpe Ratio': 1.417950247927827,'Downside Deviation': 0.30906051769218884,'Sortino Ratio': 0.008843886276718567,'Max Drawdown': -0.38977234510237335,'Max Drawdown Days': 142,'Trade Churn': 0.0,'Skewness': 0.7135103541646352,'Kurtosis': 4.722124713372126}

        让我们想象一下代理的行为,它所做的最后一个严重错误,代理的行为就像一个明智的人类交易员:

十三、结论

        在本文中,我们使用我们的信号和改进的 Tensorflow 代理框架和架构,调整并改进了 Théate、Thibaut 和 Ernst, Damien (2021) 的 Trading Deep Q-Network (TDQN) 算法。

        我们的代理现在可以确定最佳交易头寸(买入、卖出或持有),以在模拟环境中最大化其投资组合回报。通过数据增强,代理的性能可以大幅改变,如下表所示:

        这位巴甫洛夫交易员是一个有趣的研究和测试练习。尽管测试和实现的回报和环境是不现实的,并且设置在现实世界环境中会失败。

参考文章:

 ·
Deep Q-Learning Applied to Algorithmic Trading | by Adam | Call For Atlas | Mar, 2024 | Medium

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/1256.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[卷积神经网络]YoloV8

一、YoloV8 1.网络详解 ①backbone部分&#xff1a;第一次卷积的卷积核缩小(由3变为6)&#xff1b;CSP模块的预处理卷积从3次变为2次&#xff1b;借鉴了YoloV7的多分支堆叠结构&#xff08;Multi_Concat_Block&#xff09;。 所小第一次卷积的卷积核尺寸会损失部分感受野&#…

2024年 10 款最佳免费数据恢复软件您值得收藏

免费的数据恢复软件或工具是最重要的工具之一&#xff0c;在我们的生活中发挥着非常重要和关键的作用&#xff0c;尽管现在您可以找到数十种&#xff0c;但事实是它们非常重要。 由于设备故障、勒索软件攻击或意外擦除数据而从设备中丢失数据可能会成为一个真正的头痛问题。 …

专题【二分查找】刷题日记

题目列表 4. 寻找两个正序数组的中位数 33. 搜索旋转排序数组 34. 在排序数组中查找元素的第一个和最后一个位置 35. 搜索插入位置 69. x 的平方根 167. 两数之和 II - 输入有序数组 209. 长度最小的子数组 222. 完全二叉树的节点个数 287. 寻找重复数 2023.04.14 4. 寻找两…

自然语言处理基础面试

文章目录 TF-IDFbag-of-wordsBert 讲道理肯定还得有Transformer&#xff0c;我这边先放着&#xff0c;以后再加吧。 TF-IDF TF&#xff08;全称TermFrequency&#xff09;&#xff0c;中文含义词频&#xff0c;简单理解就是关键词出现在网页当中的频次。 IDF&#xff08;全称…

spring boot: 使用MyBatis从hive中读取数据

一、hive表&#xff1a; 启动hiveserver2 二、添加mybatis starter和hive依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instan…

力扣HOT100 - 24. 两两交换链表中的节点

解题思路&#xff1a; 递归 class Solution {public ListNode swapPairs(ListNode head) {if (head null || head.next null) {return head;}ListNode newHead head.next;head.next swapPairs(newHead.next);newHead.next head;return newHead;} }

案例实践 | InterMat:基于长安链的材料数据发现与共享系统

案例名称&#xff1a;InterMat-基于区块链的材料数据发现与共享系统 ■ 建设单位 北京钢研新材科技有限公司 ■ 用户群体 材料数据上下游单位 ■ 应用成效 已建设10共识节点、50轻节点&#xff0c;1万注册用户 案例背景 材料是构成各种装备和工程的物质载体&#xff0c…

驱动开发-windows驱动设计目标

驱动程序和应用程序不一样的&#xff0c;由于其直接运行于windows r0级&#xff0c;故对于开发有更多和更严格的标准&#xff0c;一般会有以下一些常见的设计目标: 安全性、可移植性、可配置性、 可被中断、多处理器安全、可重用 IRP、 支持异步 I/O这些是基本目标。 1. 安全…

高频前端面试题汇总之Vue篇

1. Vue的基本原理 当一个Vue实例创建时&#xff0c;Vue会遍历data中的属性&#xff0c;用 Object.defineProperty&#xff08;vue3.0使用proxy &#xff09;将它们转为 getter/setter&#xff0c;并且在内部追踪相关依赖&#xff0c;在属性被访问和修改时通知变化。 每个组件实…

Flutter 之 HTTP3/QUIC 和 Cronet 你了解过吗?

虽然 HTTP3/QUIC 和 cronet 跟 Flutter 没太大关系&#xff0c;只是最近在整理 Flutter 相关资料时发现还挺多人不了解&#xff0c;就放到一起聊聊。 本篇也是主要将现有资料做一些简化整合理解。 前言 其实为什么会有 HTTP3/QUIC &#xff1f;核心原因还是现有协议已经无法满…

机器学习周记(第三十五周:语义分割)2024.4.15~2024.4.21

目录 摘要 ABSTRACT 1 语义分割基本概念 1.1 数据集格式 ​编辑 1.2 语义分割评价指标 1.3 语义分割标注工具 2 转置卷积 3 FCN网络结构基本原理 摘要 本周主要学习了语义分割的基本概念及其在计算机视觉领域中的应用。了解了语义分割的几种经典网络&#xff0c;如全卷…

linux系统密码重置的方法

在linux系统中忘记密码&#xff0c;重置&#xff08;重启&#xff1a;shutdown -r now&#xff09; 1、在启动 Linux 时&#xff0c;按键盘上的上下左右键来止 Linux 的正常启动。 2、按下e鍵进入安全模式 3、找到首行是linux16&#xff0c;末尾是UTF-8的段落&#xff0c;在后门…

Python中的设计模式与最佳实践

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 Python中的设计模式与最佳实践 在软件开发中&#xff0c;设计模式是一种解决常见问题的经过…

【Django】调用django的pbkdf2_sha256加密算法测试

基于django搭建的系统中&#xff0c;用到pbkdf2_sha256&#xff08;&#xff08;Password-Based Key Derivation Function 2&#xff09;&#xff09;加密算法&#xff0c;这里做些代码测试、总结。 PBKDF2简介 PBKDF2是一种基于密码的密钥派生函数&#xff0c;用于从用户提供的…

2024-4-狼道

2024-4-狼道 2024-4-9 宋犀堃&#xff08;堃通坤&#xff0c;多用于人名&#xff09; fatux&#xff1a; 做人当如狗&#xff0c;和蔼可亲&#xff1b;做事当如狼&#xff0c;专注果决。 狼道 智慧生存的强者法则 走向卓越的成功之道 狼道&#xff0c;是追求卓越的野心&am…

C++_特殊类的设计和单例模式

文章目录 学习目标&#xff1a;1.请设计一个类&#xff0c;不能被拷贝2. 请设计一个类&#xff0c;只能在堆上创建对象3. 请设计一个类&#xff0c;只能在栈上创建对象4. 请设计一个类&#xff0c;不能被继承5. 请设计一个类&#xff0c;只能创建一个对象(单例模式) 特殊类的设…

如何在原生项目中集成flutter

两个前提条件&#xff1a; 从flutter v1.17版本开始&#xff0c;flutter module仅支持AndroidX的应用在release模式下flutter仅支持一下架构&#xff1a;x84_64、armeabi-v7a、arm6f4-v8a,不支持mips和x86;所以引入flutter前需要在app/build.gradle下配置flutter支持的架构 a…

《设计模式之美》- 总结

《设计模式之美》- 总结 第一章 概述 1.1 为什么学习代码设计 编写高质量的代码应对复杂代码的开发程序员的基本功职业发展的必备技能 1.2 如何评价代码的质量 1.2.1 可维护性 可维护性代码的特性&#xff1a;代码简洁、可读性好、可扩展性好代码分层结构清晰、模块化程度…

maven问题汇总

​ 1、报错 failed to transfer from http://0.0.0.0/ during a previous attempt. com.byd.xxx:xxx-parent:pom:1.1.0-SNAPSHOT failed to transfer from http://0.0.0.0/ during a previous attempt. This failure was cached in the local repository and resolution is no…

【Pytorch】PytorchCPU版或GPU报错异常处理(10X~4090D)

Pytorch为CPU版或GPU使用报错异常处理 文章目录 Pytorch为CPU版或GPU使用报错异常处理0.检查阶段1. 在conda虚拟环境中安装了torch2.卸载cpuonly3.从tsinghua清华源安装不完善误为cpu版本4.用tsinghua清华源安装成cpu错误版本5.conda中torch/vision/cudatoolkit版本与本机cuda版…