VectorBT:使用PyTorch+Transformer训练和回测股票模型 进阶五

VectorBT:使用PyTorch+Transformer训练和回测股票模型 进阶五

本方案基于PyTorch框架与Transformer模型,结合VectorBT回测引擎构建多股票量化交易系统,采用滑动窗口技术构建时序特征,通过自注意力机制捕捉市场规律预测收益率,集成双EMA交叉策略动态生成交易信号,利用Optuna优化模型超参与策略参数,支持增量训练更新特征分布,结合波动率调整非线性仓位,并通过分组标准化与股票分组计算严格规避数据泄漏风险,实现端到端的量化策略研发闭环。
文中内容仅限技术学习与代码实践参考,市场存在不确定性,技术分析需谨慎验证,不构成任何投资建议。适合量化新手建立系统认知,为策略开发打下基础。

Backtest Strategy

本文是进阶指南🚀,推荐先阅读了解基础知识‼️

  • VectorBT:Python量化交易策略开发与回测评估详解 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶一 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶二 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶三 🔥
  • VectorBT:使用PyTorch+LSTM训练和回测股票模型 进阶四 🔥

1. 方案概述

本方案基于PyTorch框架与Transformer模型,结合VectorBT回测引擎构建多股票量化交易系统,采用滑动窗口技术构建时序特征,通过自注意力机制捕捉市场规律预测收益率,集成双EMA交叉策略动态生成交易信号,利用Optuna优化模型超参与策略参数,支持增量训练更新特征分布,结合波动率调整非线性仓位,并通过分组标准化与股票分组计算严格规避数据泄漏风险,实现端到端的量化策略研发闭环。

1.1 核心原理

  1. 多时序特征编码:通过滑动窗口技术构建三维特征矩阵(样本×时间步×特征)
  2. Transformer建模:利用自注意力机制捕捉时序依赖关系
  3. 动态仓位管理:结合波动率调整仓位大小(使用tanh函数压缩)
  4. 增量学习机制:支持在线更新模型参数和特征分布

1.2 关键特点

  • 多股票联合训练:共享特征表示,提升模型泛化能力
  • 非线性仓位控制:position_size = tanh(|return|/volatility)
  • 参数自动优化:使用Optuna进行双重优化(模型超参+策略参数)
  • 特征鲁棒处理:分组标准化(趋势类用RobustScaler,成交量用StandardScaler)

1.3 注意事项

  • 数据泄漏风险:严格按股票分组计算收益率和技术指标
  • 设备兼容性:支持CUDA/MPS/CPU多设备自动切换
  • 内存管理:滑动窗口生成时需控制窗口大小(默认5天)
  • 过拟合预防:采用早停机制和学习率动态调整

2. 系统架构

数据层
模型层
训练层
回测层
应用层
调用
调用
调用
加载数据
生成/筛选特征
全量/增量训练
加载/保存模型
构建
执行引擎
策略逻辑
核心组件
注意力机制
data_processing.py
load_data
model_definition.py
TransformerModel
MultiHeadAttention
training.py
OnlineFeatureEngineer
IncrementalTrainer
TrainingStateManager
backtesting.py
BacktestStrategy
DualEMACrossoverStrategy
main.py

架构说明:

  1. 应用层:通过main.py整合

    • 统一训练/回测接口
    • 设备自动检测
    • 全流程种子控制
  2. 数据层:通过data_processing.py实现

    • 多股票数据加载与合并
    • 收益率计算与异常值处理
    • 严格的时间序列管理
  3. 模型层:定义于model_definition.py

    • Transformer架构实现时序预测
    • 包含位置编码和多头注意力机制
    • 支持动态维度调整的Encoder结构
  4. 训练层:通过training.py实现

    • 增量式训练框架
    • 在线特征工程系统
    • 自适应特征选择机制
    • Optuna超参优化集成
  5. 回测层:定义于backtesting.py

    • 双EMA交叉策略引擎
    • 波动率自适应仓位管理
    • 策略参数动态优化模块

2.1 数据层(Data Layer)

对应代码data_processing.py

def load_data(ts_codes, data_path="./data", test=False):# 核心数据加载逻辑combined_df["returns"] = combined_df.groupby("ts_code")["close"].pct_change().shift(-1)
  • 数据源:本地存储的Parquet文件(含复权处理)
  • 关键处理
    • 跨股票数据合并与日期对齐
    • 严格避免未来信息:按股票分组计算次日收益率
    • 收益率截断(±10%边界)
  • 输出格式:带时间戳的DataFrame,包含open/high/low/close/vol等原始字段

2.2 特征工程(Feature Engineering)

对应代码training.py

class OnlineFeatureEngineer:def generate_features(self, df):# 生成8大类技术指标self.feature_groups = {"Trend": ["MA20","EMA12","MACD"...],"Momentum": ["RSI","KDJ_K"...],...}
  • 动态特征生成

    • 趋势类(MA20, MACD, …)
    • 动量类(RSI, KDJ, …)
    • 波动率(布林带, ATR, …)
    • 成交量(OBV, MFI, …)
  • 标准化策略

    self.scalers = {"Trend": RobustScaler(),"Momentum": MinMaxScaler(),"Volatility": RobustScaler(),...
    }
    
  • 特征选择

    selector = RandomForestRegressor(n_estimators=50)
    feature_selector = SelectFromModel(selector)
    

2.3 模型系统(Model System)

对应代码model_definition.py

class TransformerModel(nn.Module):def __init__(self, input_size, d_model=64, num_heads=4...):# Encoder-Only结构self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, dff) for _ in range(num_layers)])
  • 核心架构

    • 可配置的Encoder层数(2-4层)
    • 多头注意力(4-8头)
    • 位置编码使用正弦/余弦混合编码
  • 训练机制

    criterion = nn.HuberLoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-4)
    scheduler = ReduceLROnPlateau(optimizer, 'min', patience=5)
    

2.4 交易策略(Trading Strategy)

对应代码backtesting.py

class DualEMACrossoverStrategy:def generate_signals(self):ema_fast = pred_returns.ewm(span=fast_span).mean()ema_slow = pred_returns.ewm(span=slow_span).mean()position_size = np.tanh(pred_returns.abs()/volatility)
  • 动态参数
    • 快线EMA周期:10-30日
    • 慢线EMA周期:50-100日
  • 仓位控制
    • 基于波动率的tanh函数非线性映射
    • 最大仓位限制在30%-80%之间

2.5 回测引擎(Backtesting)

对应代码backtesting.py

class BacktestStrategy:def _configure_vbt(self):vbt.settings.portfolio["fees"] = 0.0025vbt.settings.portfolio["slippage"] = 0.0025
  • 成本模型

    • 双向0.25%佣金
    • 0.25%滑点成本
  • 执行机制

    Portfolio.from_signals(entries=signals.shift(1) == 1,  # T+1信号执行size=position_size,size_type="percent"
    )
    

3. 系统流程

3.1 训练流程序列图

Main DataProcessing FeatureEngineer FeatureSelector Optuna Trainer Model StateManager load_data() generate_features() feature_selection() 传递特征矩阵 创建study 参数建议 训练评估 返回指标 loop [Optuna超参优化] 正式训练 保存状态 持久化存储 Main DataProcessing FeatureEngineer FeatureSelector Optuna Trainer Model StateManager

3.2 回测流程序列图

Main BacktestStrategy DualEMA Optuna VectorBT 初始化 模型预测 创建策略实例 生成参数组合 模拟交易 返回收益 loop [参数优化] 执行最终回测 返回组合结果 Main BacktestStrategy DualEMA Optuna VectorBT

3.3 流程关键点说明

  1. 训练流程:

    • 采用两阶段优化:特征选择 → 超参优化
    • 使用Optuna进行贝叶斯优化
    • 支持断点续训的模型保存机制
  2. 回测流程:

    • 策略参数动态搜索空间
    • 基于波动率的非线性仓位控制
    • 交易成本的双向收取模型
    • 结果可视化自动适配暗色主题
  3. 跨模块协作:

    • 特征工程与模型输入的维度一致性保证
    • 训练/推理的设备自动适配
    • 时间序列的严格对齐机制

4. 总结与优化建议

4.1 方案优势

  • 架构灵活性:模块化设计支持策略快速迭代
  • 计算效率:GPU加速训练+VectorBT高效回测
  • 风险控制:动态波动率调整+仓位限制(0.3-0.8)

4.2 优化方向

维度当前实现优化建议
特征工程8类技术指标增加市场情绪数据(新闻舆情、资金流向)
模型结构Encoder-only增加Decoder实现Seq2Seq预测
仓位策略固定比例引入强化学习动态调整
数据增强原始序列添加随机时频变换增强
风险控制波动率约束加入VaR压力测试模块

4.3 实践建议

  1. 增量更新频率:建议每月更新模型,每周更新特征分布
  2. 参数搜索空间:可扩展EMA参数范围(快线5-50,慢线30-200)
  3. 硬件配置:推荐使用至少16GB显存的GPU设备
  4. 回测验证:建议采用Walk-Forward分析法,避免过拟合

5. 工程代码

目录结构:

data/
├── processed_600000.SH.parquet
├── processed_600036.SH.parquet
├── processed_600519.SH.parquet
├── processed_000001.SZ.parquet
models/
├── vectorbt_5_model.pth
├── vectorbt_5_preprocessors.pkl
src/
└── vectorbt_5/├── data_processing.py├── model_definition.py├── training.py├── backtesting.py├── main.py└── __init__.py

5.1 data_processing.py

import pandas as pddef load_data(ts_codes, data_path="./data"):"""加载预处理后的股票数据:param ts_code: 股票代码(如["600000.SH", "600519.SH", "000001.SZ"]):param data_path: 数据存储路径):return: 合并后的DataFrame(含ts_code列标识股票)处理步骤:1. 读取parquet格式的本地数据2. 转换交易日期格式3. 计算次日收益率(目标变量)4. 删除缺失值"""dfs = []for ts_code in ts_codes:file_path = f"{data_path}/processed_{ts_code}.parquet"df = pd.read_parquet(file_path)df["ts_code"] = ts_codedfs.append(df)combined_df = pd.concat(dfs)combined_df["trade_date"] = pd.to_datetime(combined_df["trade_date"], format="%Y%m%d")combined_df.set_index("trade_date", inplace=True)combined_df.sort_index(inplace=True)# 按股票分组计算收益率,避免跨股票计算,严格避免未来信息combined_df["returns"] = combined_df.groupby("ts_code", group_keys=False)["close"].apply(lambda x: x.pct_change().shift(-1).clip(-0.1, 0.1)  # 添加收益率截断)combined_df.dropna(inplace=True)return combined_df

5.2 model_definition.py

import mathimport torch
import torch.nn as nnclass MultiHeadAttention(nn.Module):"""多头注意力机制。:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量"""def __init__(self, d_model, num_heads):super().__init__()self.num_heads = num_heads  # 注意力头的数量self.d_model = d_model  # 输入和输出的维度self.depth = d_model // num_heads  # 每个头的维度self.wq = nn.Linear(d_model, d_model)  # 查询线性变换self.wk = nn.Linear(d_model, d_model)  # 键线性变换self.wv = nn.Linear(d_model, d_model)  # 值线性变换self.dense = nn.Linear(d_model, d_model)  # 输出线性变换def split_heads(self, x, batch_size):"""将输入张量分割成多个头。:param x: 输入张量 (batch_size, seq_len, d_model):param batch_size: 批次大小:return: 分割后的张量 (batch_size, num_heads, seq_len, depth)"""x = x.view(batch_size, -1, self.num_heads, self.depth)  # 将d_model维度拆分成num_heads和depthreturn x.permute(0, 2, 1, 3)  # 调整维度顺序为 (batch_size, num_heads, seq_len, depth)def forward(self, q, k, v):"""前向传播函数。:param q: 查询张量 (batch_size, seq_len, d_model):param k: 键张量 (batch_size, seq_len, d_model):param v: 值张量 (batch_size, seq_len, d_model):return: 输出张量 (batch_size, seq_len, d_model) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)"""batch_size = q.size(0)  # 获取批次大小q = self.wq(q)  # 对查询进行线性变换k = self.wk(k)  # 对键进行线性变换v = self.wv(v)  # 对值进行线性变换q = self.split_heads(q, batch_size)  # 将查询张量分割成多个头k = self.split_heads(k, batch_size)  # 将键张量分割成多个头v = self.split_heads(v, batch_size)  # 将值张量分割成多个头# 计算注意力scaled_attention, attention_weights = self.scaled_dot_product_attention(q, k, v)scaled_attention = scaled_attention.permute(0, 2, 1, 3)  # 调整维度顺序为 (batch_size, seq_len, num_heads, depth)concat_attention = scaled_attention.reshape(batch_size, -1, self.d_model)  # 合并头,恢复原始维度output = self.dense(concat_attention)  # 进行最终的线性变换return output, attention_weightsdef scaled_dot_product_attention(self, q, k, v):"""计算缩放点积注意力。:param q: 查询张量 (batch_size, num_heads, seq_len, depth):param k: 键张量 (batch_size, num_heads, seq_len, depth):param v: 值张量 (batch_size, num_heads, seq_len, depth):return: 注意力输出 (batch_size, num_heads, seq_len, depth) 和注意力权重 (batch_size, num_heads, seq_len, seq_len)"""matmul_qk = torch.matmul(q, k.transpose(-1, -2))  # 计算Q和K的点积dk = torch.tensor(k.size(-1), dtype=torch.float32)  # 获取深度dkscaled_attention_logits = matmul_qk / torch.sqrt(dk)  # 缩放点积attention_weights = torch.softmax(scaled_attention_logits, dim=-1)  # 计算注意力权重output = torch.matmul(attention_weights, v)  # 应用注意力权重到值上return output, attention_weightsclass EncoderLayer(nn.Module):"""编码器层。:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量:param dff: 前馈神经网络的中间维度:param dropout: dropout 概率,默认为 0.1"""def __init__(self, d_model, num_heads, dff, dropout=0.1):super().__init__()self.mha = MultiHeadAttention(d_model, num_heads)  # 多头注意力机制self.ffn = nn.Sequential(nn.Linear(d_model, dff),  # 线性变换到dff维度nn.GELU(),  # GELU激活函数nn.Linear(dff, d_model),  # 线性变换回d_model维度)self.layer_norm1 = nn.LayerNorm(d_model)  # 第一个层归一化self.layer_norm2 = nn.LayerNorm(d_model)  # 第二个层归一化self.dropout = nn.Dropout(dropout)  # Dropout层def forward(self, x):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, d_model):return: 输出张量 (batch_size, seq_len, d_model)"""# 多头注意力attn_output, _ = self.mha(x, x, x)  # 计算多头注意力attn_output = self.dropout(attn_output)  # 应用dropoutout1 = self.layer_norm1(x + attn_output)  # 残差连接和层归一化# 前馈神经网络ffn_output = self.ffn(out1)  # 前馈神经网络ffn_output = self.dropout(ffn_output)  # 应用dropoutout2 = self.layer_norm2(out1 + ffn_output)  # 残差连接和层归一化return out2class DecoderLayer(nn.Module):"""解码器层。:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量:param dff: 前馈神经网络的中间维度:param dropout: dropout 概率,默认为 0.1"""def __init__(self, d_model, num_heads, dff, dropout=0.1):super().__init__()self.mha1 = MultiHeadAttention(d_model, num_heads)  # 掩码多头注意力self.mha2 = MultiHeadAttention(d_model, num_heads)  # 编码器-解码器注意力self.ffn = nn.Sequential(nn.Linear(d_model, dff),  # 线性变换到dff维度nn.GELU(),  # GELU激活函数nn.Linear(dff, d_model),  # 线性变换回d_model维度)self.layer_norm1 = nn.LayerNorm(d_model)  # 第一个层归一化self.layer_norm2 = nn.LayerNorm(d_model)  # 第二个层归一化self.layer_norm3 = nn.LayerNorm(d_model)  # 第三个层归一化self.dropout = nn.Dropout(dropout)  # Dropout层def forward(self, x, enc_output):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, d_model):param enc_output: 编码器输出 (batch_size, src_seq_len, d_model):return: 输出张量 (batch_size, seq_len, d_model) 和两个注意力权重"""# 掩码多头注意力attn1, attn_weights1 = self.mha1(x, x, x)  # 计算掩码多头注意力attn1 = self.dropout(attn1)  # 应用dropoutout1 = self.layer_norm1(x + attn1)  # 残差连接和层归一化# 编码器-解码器注意力attn2, attn_weights2 = self.mha2(out1, enc_output, enc_output)  # 计算编码器-解码器注意力attn2 = self.dropout(attn2)  # 应用dropoutout2 = self.layer_norm2(out1 + attn2)  # 残差连接和层归一化# 前馈神经网络ffn_output = self.ffn(out2)  # 前馈神经网络ffn_output = self.dropout(ffn_output)  # 应用dropoutout3 = self.layer_norm3(out2 + ffn_output)  # 残差连接和层归一化return out3, attn_weights1, attn_weights2class PositionalEncoding(nn.Module):"""位置编码。:param d_model: 输入和输出的维度:param max_len: 最大序列长度,默认为5000:param dropout: dropout 概率,默认为 0.1"""def __init__(self, d_model, max_len=5000, dropout=0.1):super().__init__()self.dropout = nn.Dropout(dropout)  # Dropout层pe = torch.zeros(max_len, d_model)  # 初始化位置编码position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # 位置索引div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))  # 用于计算正弦和余弦的位置项pe[:, 0::2] = torch.sin(position * div_term)  # 正弦位置编码pe[:, 1::2] = torch.cos(position * div_term)  # 余弦位置编码pe = pe.unsqueeze(0)  # 增加批次维度self.register_buffer("pe", pe)  # 注册位置编码为缓冲区def forward(self, x):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, d_model):return: 添加了位置编码的张量 (batch_size, seq_len, d_model)"""x = x + self.pe[:, : x.size(1)]  # 添加位置编码return self.dropout(x)  # 应用dropoutclass TransformerModel(nn.Module):"""Transformer模型。:param input_size: 输入特征的维度:param d_model: 输入和输出的维度:param num_heads: 注意力头的数量:param num_layers: 编码器层数:param dff: 前馈神经网络的中间维度:param dropout: dropout 概率,默认为 0.1"""def __init__(self, input_size, d_model, num_heads, num_layers, dff, dropout=0.1):super().__init__()self.embedding = nn.Linear(input_size, d_model)  # 输入嵌入self.pos_encoding = PositionalEncoding(d_model, dropout=dropout)  # 位置编码self.encoder_layers = nn.ModuleList([EncoderLayer(d_model, num_heads, dff, dropout)for _ in range(num_layers)]  # 编码器层)self.fc = nn.Linear(d_model, 1)  # 全连接层def forward(self, x):"""前向传播函数。:param x: 输入张量 (batch_size, seq_len, input_size):return: 输出张量 (batch_size, 1)"""x = self.embedding(x)  # 输入嵌入x = self.pos_encoding(x)  # 位置编码for enc_layer in self.encoder_layers:x = enc_layer(x)  # 通过每个编码器层x = self.fc(x.mean(dim=1))  # 使用全局平均池化并进行最终线性变换return x

5.3 training.py

import osimport joblib
import numpy as np
import optuna
import pandas as pd
import torch
import torch.nn as nn
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import SelectFromModel
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, RobustScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset
from tqdm.auto import tqdmfrom vectorbt_5.model_definition import TransformerModelclass SingleWindowDataset(Dataset):def __init__(self, X, y):"""单窗口数据集类,用于将时间序列数据转换为PyTorch数据集格式。:param X: 滑动窗口特征数据:param y: 目标变量"""self.X = X.astype(np.float32)  # 将滑动窗口特征数据转换为float32类型self.y = y.astype(np.float32)  # 将目标变量转换为float32类型def __len__(self):return len(self.y)  # 返回目标变量的长度def __getitem__(self, idx):x = torch.from_numpy(self.X[idx]).float()  # 将滑动窗口特征数据转换为PyTorch张量label = torch.tensor(self.y[idx], dtype=torch.float32)  # 将目标变量转换为PyTorch张量return x, label  # 返回特征和标签class TrainingStateManager:def __init__(self, model_dir="./models"):"""训练状态管理器类,负责模型和预处理器的保存和加载。:param model_dir: 模型保存目录,defaults to "./models""""self.model_dir = model_dir  # 设置模型保存目录self.model_path = f"{self.model_dir}/vectorbt_5_model.pth"  # 设置模型文件路径self.preprocessors_path = (f"{self.model_dir}/vectorbt_5_preprocessors.pkl"  # 设置预处理器文件路径)os.makedirs(model_dir, exist_ok=True)  # 创建模型保存目录(如果不存在)def save(self, model, feature_engineer, feature_selector, config):"""保存模型和预处理器。:param model: 训练好的模型:param feature_engineer: 特征工程对象:param feature_selector: 特征选择器:param config: 模型配置"""torch.save(model.state_dict(), self.model_path)  # 保存模型参数joblib.dump({"feature_engineer": feature_engineer,"feature_selector": feature_selector,"config": config,},self.preprocessors_path,)  # 保存预处理器和配置print(f"Model saved to {self.model_path}")  # 打印模型保存路径print(f"Preprocessor saved to {self.preprocessors_path}")  # 打印预处理器保存路径def load(self, device):"""加载模型和预处理器。:param device: 计算设备(CPU/GPU):return: 加载的模型、特征工程对象、特征选择器和模型配置"""model = None  # 初始化模型feature_engineer = None  # 初始化特征工程对象feature_selector = None  # 初始化特征选择器config = None  # 初始化配置if os.path.exists(self.preprocessors_path):  # 如果预处理器文件存在preprocess = joblib.load(self.preprocessors_path)  # 加载预处理器feature_engineer = preprocess["feature_engineer"]  # 获取特征工程对象feature_selector = preprocess["feature_selector"]  # 获取特征选择器config = preprocess["config"]  # 获取配置if os.path.exists(self.model_path):  # 如果模型文件存在model = TransformerModel(input_size=config["input_size"],d_model=config["d_model"],num_heads=config["num_heads"],num_layers=config["num_layers"],dff=config["dff"],dropout=config["dropout"],).to(device)model.load_state_dict(torch.load(self.model_path, weights_only=False, map_location=device))  # 加载模型参数return (model,feature_engineer,feature_selector,config,)  # 返回加载的模型、特征工程对象、特征选择器和配置class OnlineFeatureEngineer:def __init__(self, windows=[5]):"""在线特征工程生成器类,用于生成技术指标特征并进行标准化。:param windows: 滑动窗口列表(用于特征生成),defaults to [5]"""self.windows = windows  # 设置滑动窗口列表self.n_features = 10  # 设置特征数量self.scalers = {"Trend": RobustScaler(),  # 趋势类指标使用RobustScaler"Momentum": MinMaxScaler(),  # 动量类指标使用MinMaxScaler"Volatility": RobustScaler(),  # 波动率类指标使用RobustScaler"Volume": StandardScaler(),  # 成交量类指标使用StandardScaler# "Sentiment": StandardScaler(),  # 市场情绪类指标使用StandardScaler"SupportResistance": MinMaxScaler(),  # 支撑阻力类指标使用MinMaxScaler"Statistical": StandardScaler(),  # 统计类指标使用StandardScaler"Composite": RobustScaler(),  # 复合型指标使用RobustScaler}# "price": ["open", "high", "low", "close"],self.feature_groups = {# Trend-Following Indicators 趋势类指标"Trend": ["MA20", "EMA12", "MACD", "ADX", "SAR"],# Momentum Indicators 动量类指标"Momentum": ["RSI", "KDJ_K", "KDJ_D", "KDJ_J", "CCI", "WILLR"],# Volatility Indicators 波动率类指标"Volatility": ["BB_upper", "BB_middle", "BB_lower", "ATR", "STD20"],# Volume Indicators 成交量类指标"Volume": ["OBV", "MFI"],# Market Sentiment Indicators 市场情绪类指标 (需要外部数据)# "Sentiment": [],# Support/Resistance Indicators 支撑阻力类指标"SupportResistance": ["Fib_0.382", "Fib_0.618", "Pivot"],# Statistical Indicators 统计类指标"Statistical": ["LR_slope", "LR_angle"],# Composite Indicators 复合型指标"Composite": ["Ichimoku_tenkan","Ichimoku_kijun","Ichimoku_senkou_a","Ichimoku_senkou_b","Ichimoku_chikou",],}self.all_features = [f for sublist in self.feature_groups.values() for f in sublist]  # 生成所有特征列表def partial_fit(self, new_df):"""对新数据进行部分拟合。:param new_df: 新数据DataFrame"""new_features = self.generate_features(new_df, refit=True)  # 生成新数据的特征for group, features in self.feature_groups.items():  # 遍历每个特征组if hasattr(self.scalers[group], "partial_fit"):  # 如果该缩放器支持部分拟合self.scalers[group].partial_fit(new_features[features])  # 对新数据进行部分拟合def generate_features(self, df, refit=False):"""生成技术指标特征。:param df: 原始数据DataFrame:param refit: 是否重新拟合标准化器,defaults to False:return: 特征DataFrame生成8大类技术指标:1. 趋势类指标(MA, MACD等)2. 动量类指标(RSI, KDJ等)3. 波动率指标(布林带, ATR等)4. 成交量指标(OBV, MFI等)5. 市场情绪类指标 (需要外部数据) -- 忽略6. 支撑阻力指标(斐波那契回撤等)7. 统计指标(线性回归斜率等)8. 复合指标(Ichimoku云图等)"""processed = []  # 初始化处理后的特征列表for group, features in self.feature_groups.items():  # 遍历每个特征组scaler = self.scalers[group]  # 获取对应的缩放器if not refit:  # 如果不重新拟合scaler.fit(df[features])  # 拟合缩放器scaled = scaler.transform(df[features])  # 标准化特征processed.append(scaled)  # 添加到处理后的特征列表processed_df = pd.DataFrame(np.hstack(processed), index=df.index, columns=self.all_features)  # 将处理后的特征合并为DataFrameprocessed_df["ts_code"] = df["ts_code"]  # 添加股票代码processed_df["returns"] = df["returns"]  # 添加收益return processed_df.dropna()  # 删除缺失值def feature_selection(self, X, y):"""进行特征选择。:param X: 特征数据:param y: 目标变量:return: 选择后的特征数据"""selector = RandomForestRegressor(n_estimators=50, n_jobs=-1)  # 初始化随机森林回归器selector.fit(X, y)  # 拟合随机森林回归器feature_selector = SelectFromModel(selector, prefit=True)  # 初始化特征选择器return feature_selector  # 返回特征选择器class IncrementalDataHandler:def __init__(self, feature_engineer, feature_selector, window_size=5):"""增量数据处理器类,用于处理增量数据并生成滑动窗口数据。:param feature_engineer: 特征工程对象:param feature_selector: 特征选择器:param window_size: 滑动窗口大小,defaults to 5"""self.feature_engineer = feature_engineer  # 设置特征工程对象self.feature_selector = feature_selector  # 设置特征选择器self.window_size = window_size  # 设置滑动窗口大小self.buffer = pd.DataFrame()  # 初始化数据缓冲区def update_buffer(self, new_df):"""更新数据缓冲区。:param new_df: 新数据DataFrame"""self.buffer = pd.concat([self.buffer, new_df]).sort_index()  # 更新数据缓冲区并排序def prepare_incremental_data(self):"""准备增量数据。:return: 训练数据和测试数据"""processed_df = self.feature_engineer.generate_features(self.buffer)  # 生成特征X_selected = self.feature_selector.transform(processed_df[self.feature_engineer.all_features].values)  # 选择特征X, y = self.sliding_window(processed_df, X_selected)  # 生成滑动窗口数据X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42, shuffle=False)  # 划分训练集和测试集return (X_train, y_train), (X_test, y_test)  # 返回训练集和测试集def sliding_window(self, df, features):"""生成时序滑动窗口数据"""sequences = []  # 初始化序列列表labels = []  # 初始化标签列表for ts_code, group in df.groupby("ts_code"):  # 按股票代码分组group_features = features[df["ts_code"] == ts_code]  # 获取该股票的特征for i in range(self.window_size, len(group)):  # 生成滑动窗口sequences.append(group_features[i - self.window_size : i])  # 添加滑动窗口特征labels.append(group["returns"].iloc[i])  # 添加标签return np.array(sequences, dtype=np.float32), np.array(labels, dtype=np.float32)  # 返回滑动窗口特征和标签class IncrementalTrainer:def __init__(self, device, window_size=5):"""增量训练控制器类,负责模型的初始训练和增量更新。:param device: 计算设备(CPU/GPU)"""self.device = device  # 设置计算设备self.window_size = window_size  # 设置滑动窗口大小self.state_manager = TrainingStateManager()  # 初始化训练状态管理器self.model = None  # 初始化模型self.feature_engineer = None  # 初始化特征工程对象self.feature_selector = None  # 初始化特征选择器self.config = None  # 初始化配置self.load_state()  # 加载状态def load_state(self):"""加载模型和预处理器。"""(self.model,self.feature_engineer,self.feature_selector,self.config,) = self.state_manager.load(self.device)  # 加载模型、特征工程对象、特征选择器和配置def initial_train(self, df):"""初始训练模型。:param df: 原始数据DataFrame"""# 特征生成self.feature_engineer = OnlineFeatureEngineer(windows=[self.window_size])  # 初始化特征工程对象processed_df = self.feature_engineer.generate_features(df)  # 生成特征# 特征选择X, y = processed_df[self.feature_engineer.all_features], df["returns"].reindex(processed_df.index)  # 准备特征和标签self.feature_selector = self.feature_engineer.feature_selection(X, y)  # 选择特征# 准备训练数据data_handler = IncrementalDataHandler(feature_engineer=self.feature_engineer,feature_selector=self.feature_selector,window_size=self.window_size,)  # 初始化增量数据处理器data_handler.buffer = df  # 更新数据缓冲区(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data())  # 准备训练数据# Optuna超参优化self._optimize_parameters(X_train, y_train, X_test, y_test)  # 优化超参数# 最佳模型训练print(f"Initial Model Config: {self.config}")  # 打印初始模型配置self._train(X_train, y_train)  # 训练模型print("Initial Model Evaluation:")  # 打印初始模型评估self._evaluate(X_test, y_test)  # 评估模型print("Initial Model Save:")  # 打印初始模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config)  # 保存模型def incremental_update(self, new_df):"""增量更新模型。:param new_df: 新数据DataFrame"""if not self.model:print("The model does not exist, please train it first.")  # 如果模型不存在,提示先训练模型returnself.feature_engineer.partial_fit(new_df)  # 对新数据进行部分拟合data_handler = IncrementalDataHandler(self.feature_engineer, self.feature_selector)  # 初始化增量数据处理器data_handler.update_buffer(new_df)  # 更新数据缓冲区(X_train, y_train), (X_test, y_test) = (data_handler.prepare_incremental_data())  # 准备增量数据print(f"Incremental Model Config: {self.config}")  # 打印增量模型配置self._train(X_train, y_train)  # 训练模型print("\nIncremental Model Evaluation:")  # 打印增量模型评估self._evaluate(X_test, y_test)  # 评估模型print("\nIncremental Model Save:")  # 打印增量模型保存self.state_manager.save(self.model, self.feature_engineer, self.feature_selector, self.config)  # 保存模型def _optimize_parameters(self, X_train, y_train, X_test, y_test):"""Optuna超参优化:param X_train: 训练集滑动窗口特征数据:param y_train: 训练集目标变量:param X_test: 测试集滑动窗口特征数据:param y_test: 测试集目标变量"""def objective(trial):self.config = {"num_heads": trial.suggest_categorical("num_heads", [4, 8]),"d_model": trial.suggest_int("d_model", 64, 256, step=32),  # 以32为步长"num_layers": trial.suggest_int("num_layers", 2, 4),"dff": trial.suggest_int("dff", 128, 512, step=64),"dropout": trial.suggest_float("dropout", 0.1, 0.3),"lr": trial.suggest_float("lr", 1e-4, 1e-3, log=True),"input_size": X_train[0].shape[-1],  # 输入特征维度"batch_size": trial.suggest_categorical("batch_size", [32, 64, 128]),"epochs": 100,  # 训练轮数"window_size": self.window_size,  # 滑动窗口大小}self._train(X_train, y_train)  # 训练模型val_loss = self._evaluate(X_test, y_test)  # 评估模型print(f"Val Loss: {val_loss:.4f}")  # 打印验证损失return val_loss  # 返回验证损失# 超参优化study = optuna.create_study(direction="minimize")  # 创建研究study.optimize(objective, n_trials=10, show_progress_bar=True)  # 优化目标函数print(f"Training Best params: {study.best_params}")  # 打印最佳参数# 最佳模型参数self.config.update(study.best_params)  # 更新配置def _train(self, X_train, y_train):"""模型训练内部方法。:param X_train: 训练集滑动窗口特征数据:param y_train: 训练集目标变量"""print(f"Training Model Config: {self.config}")  # 打印模型配置num_heads = self.config.get("num_heads", 8)d_model = self.config.get("d_model", 64)d_model = (d_model // num_heads) * num_headsnum_layers = self.config.get("num_layers", 3)dff = self.config.get("dff", 256)dropout = self.config.get("dropout", 0.1)lr = self.config.get("lr", 1e-4)input_size = self.config.get("input_dim", X_train[0].shape[-1])  # 获取输入特征维度epochs = self.config.get("epochs", 100)  # 获取训练轮数batch_size = self.config.get("batch_size", 128)# 强制维度约束if d_model % num_heads != 0:d_model = (d_model // num_heads) * num_heads  # 自动调整为最近的可整除数dataset = SingleWindowDataset(X_train, y_train)  # 初始化数据集loader = DataLoader(dataset,batch_size=batch_size,shuffle=False,)  # 初始化数据加载器# 初始化模型self.model = TransformerModel(input_size, d_model, num_heads, num_layers, dff, dropout).to(self.device)optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)  # 初始化优化器scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, "min", patience=5)  # 初始化学习率调度器criterion = nn.HuberLoss()  # 初始化损失函数# 训练循环for epoch in tqdm(range(epochs), desc="Training"):  # 进行训练self.model.train()  # 设置模型为训练模式total_loss = 0  # 初始化总损失for X_batch, y_batch in loader:  # 遍历数据加载器X_batch = X_batch.to(self.device)  # 将特征数据移动到指定设备y_batch = y_batch.to(self.device).unsqueeze(1)  # 将标签数据移动到指定设备并扩展维度optimizer.zero_grad()  # 清零梯度preds = self.model(X_batch)  # 前向传播loss = criterion(preds, y_batch)  # 计算损失loss.backward()  # 反向传播nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)  # 梯度裁剪optimizer.step()  # 更新参数total_loss += loss.item()  # 累加损失# 学习率调整avg_loss = total_loss / len(loader)  # 计算平均损失scheduler.step(avg_loss)  # 更新学习率def _evaluate(self, X_test, y_test):"""模型评估内部方法。:param X_test: 测试集滑动窗口特征数据:param y_test: 测试集目标变量:return: 平均损失"""test_dataset = SingleWindowDataset(X_test, y_test)  # 初始化测试数据集test_loader = DataLoader(test_dataset,batch_size=128,shuffle=False,)  # 初始化测试数据加载器self.model.eval()  # 设置模型为评估模式total_loss = 0  # 初始化总损失criterion = nn.HuberLoss()  # 初始化损失函数with torch.no_grad():  # 关闭梯度计算for X_batch, y_batch in test_loader:  # 遍历测试数据加载器X_batch = X_batch.to(self.device)  # 将特征数据移动到指定设备y_batch = y_batch.to(self.device).unsqueeze(1)  # 将标签数据移动到指定设备并扩展维度preds = self.model(X_batch)  # 前向传播loss = criterion(preds, y_batch)  # 计算损失total_loss += loss.item() * len(y_batch)  # 累加损失test_loss = total_loss / len(test_dataset)  # 计算平均损失print(f"Test Loss: {test_loss}")  # 打印测试损失return test_loss  # 返回测试损失

5.4 backtesting.py

import numpy as np
import optuna
import pandas as pd
import torch
import vectorbt as vbtclass DualEMACrossoverStrategy:def __init__(self, pred_returns, volatility, params):"""双EMA交叉策略类。:param pred_returns: 预测的收益率序列:param volatility: 波动率序列(用于仓位计算):param params: 参数字典,包含快慢EMA的时间跨度"""self.pred_returns = pred_returns  # 预测的收益率序列self.volatility = volatility.clip(lower=0.01)  # 防止波动率为0,导致除零错误self.fast_span = params["fast_span"]  # 快速EMA的时间跨度self.slow_span = params["slow_span"]  # 慢速EMA的时间跨度def generate_signals(self):"""生成交易信号。:return: (交易信号, 仓位大小)"""ema_fast = self.pred_returns.ewm(span=self.fast_span, min_periods=self.fast_span).mean()  # 计算快速EMAema_slow = self.pred_returns.ewm(span=self.slow_span, min_periods=self.slow_span).mean()  # 计算慢速EMAsignals = pd.Series(0, index=self.pred_returns.index)  # 初始化信号序列signals[(ema_fast > ema_slow) & (ema_fast.shift(1) <= ema_slow.shift(1))] = (1  # 买入信号)signals[(ema_fast < ema_slow) & (ema_fast.shift(1) >= ema_slow.shift(1))] = (-1)  # 卖出信号# 使用tanh函数压缩仓位大小,实现非线性映射position_size = np.tanh(self.pred_returns.abs() / self.volatility).clip(0.3, 0.8)return signals, position_size  # 返回信号和仓位大小class BacktestStrategy:def __init__(self, model, device):"""回测执行引擎。:param model: 训练好的预测模型:param device: 计算设备(CPU/GPU)"""self.model = model  # 训练好的预测模型self.device = device  # 计算设备self._configure_vbt()  # 配置VectorBT全局参数def _configure_vbt(self):"""配置VectorBT全局参数。"""vbt.settings.array_wrapper["freq"] = "D"  # 设置时间频率为日频vbt.settings.plotting["layout"]["template"] = "vbt_dark"  # 使用暗色主题vbt.settings.plotting["layout"]["width"] = 1200  # 设置图表宽度vbt.settings.portfolio["init_cash"] = 100000.0  # 初始资金10万元vbt.settings.portfolio["fees"] = 0.0025  # 交易成本(手续费)0.25%vbt.settings.portfolio["slippage"] = 0.0025  # 交易成本(滑点)0.25%def _optimize_parameters(self, result_df):"""优化参数逻辑调整。:param result_df: 包含预测收益率的结果DataFrame:return: 最优参数"""def objective(trial):params = {"fast_span": trial.suggest_int("fast_span", 10, 30),  # 建议快速EMA的时间跨度"slow_span": trial.suggest_int("slow_span", 50, 100),  # 建议慢速EMA的时间跨度}strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=params,)  # 创建策略实例signals, position_size = strategy.generate_signals()  # 生成交易信号pf = vbt.Portfolio.from_signals(close=result_df["close"],entries=signals.shift(1) == 1,  # 买入信号exits=signals.shift(1) == -1,  # 卖出信号size=position_size,  # 固定仓位freq="D",)return pf.total_profit()  # 返回总利润study = optuna.create_study(direction="maximize")  # 创建Optuna研究study.optimize(objective, n_trials=10, show_progress_bar=True)  # 优化参数print(f"Strategy Best params: {study.best_params}")  # 打印最优参数return study.best_params  # 返回最优参数def run(self, test_data, df):"""执行完整回测流程。:param test_data: 测试数据集元组(X_test, y_test):param df: 原始数据DataFrame:return: (组合对象, 结果DataFrame)"""X_test = test_data  # 测试数据self.model.eval()  # 将模型设置为评估模式with torch.no_grad():  # 禁用梯度计算test_tensor = torch.FloatTensor(X_test).to(self.device)  # 转换为Tensor并移动到指定设备preds = (self.model(test_tensor).cpu().numpy().flatten())  # 获取预测值并转换为NumPy数组test_dates = df.index[-len(preds) :]  # 获取测试日期result_df = pd.DataFrame({"close": df["close"].values[-len(preds) :],"pred_returns": preds,  # 使用rolling窗口计算动态波动率"volatility": df["ATR"].values[-len(preds) :]/ df["close"].values[-len(preds) :],},index=test_dates,)  # 创建结果DataFramebest_params = self._optimize_parameters(result_df)  # 运行参数优化strategy = DualEMACrossoverStrategy(pred_returns=result_df["pred_returns"],volatility=result_df["volatility"],params=best_params,)  # 创建策略实例signals, position_size = strategy.generate_signals()  # 生成交易信号return vbt.Portfolio.from_signals(close=result_df["close"],entries=signals == 1,  # 买入信号exits=signals == -1,  # 卖出信号size=position_size,size_type="percent",freq="D",)  # 执行组合回测

5.5 main.py

import randomimport numpy as np
import torchfrom vectorbt_5.backtesting import BacktestStrategy
from vectorbt_5.data_processing import load_data
from vectorbt_5.training import IncrementalTrainer, TrainingStateManagerdef set_random_seed(seed=42):"""设置全局随机种子:param seed: 随机种子, defaults to 42影响范围:- Python内置随机模块- Numpy随机数生成- PyTorch CPU/CUDA随机种子"""random.seed(seed)np.random.seed(seed)torch.manual_seed(seed)if torch.cuda.is_available():torch.cuda.manual_seed(seed)torch.cuda.manual_seed_all(seed)  # 如果使用多个GPUtorch.backends.cudnn.deterministic = Truetorch.backends.cudnn.benchmark = Falsedef prepare_backtest_data(ts_code, device):"""准备回测数据。:param ts_code: 股票代码:param device: 计算设备(CPU/GPU):return: 滑动窗口特征数据、原始数据DataFrame、模型、特征工程对象、特征选择器和配置"""state_manager = TrainingStateManager()  # 初始化训练状态管理器model, feature_engineer, feature_selector, config = state_manager.load(device)  # 加载模型和预处理器print(f"Model Config: {config}")  # 打印模型配置# 加载并处理数据test_df = load_data([ts_code])  # 加载数据test_df = test_df[-300:]  # 取最近300条数据processed_df = feature_engineer.generate_features(test_df)  # 生成特征X_selected = feature_selector.transform(processed_df[feature_engineer.all_features].values)  # 选择特征# 构建滑动窗口window_size = config["window_size"]  # 获取滑动窗口大小sequences = []  # 初始化序列列表for i in range(window_size, len(X_selected)):  # 遍历数据sequences.append(X_selected[i - window_size : i])  # 添加滑动窗口特征return (np.array(sequences, dtype=np.float32),  # 返回滑动窗口特征数据test_df,  # 返回原始数据DataFramemodel,  # 返回模型feature_engineer,  # 返回特征工程对象feature_selector,  # 返回特征选择器config,  # 返回配置)if __name__ == "__main__":# 设置随机种子# 函数确保了整个训练过程的可重复性。# 通过设置相同的随机种子,可以保证每次运行时生成的随机数序列一致,这对于调试和实验验证非常重要。set_random_seed()# 检测可用计算设备(优先使用CUDA)device = torch.device("cuda"if torch.cuda.is_available()else "mps" if torch.backends.mps.is_available() else "cpu")# 股票行情# start_date = "20180101"# end_date = "20241231"trainer = IncrementalTrainer(device)# 多股票初始训练示例# 浦发银行(600000.SH)# 招商银行(600036.SH)# 平安银行(000001.SZ)train_codes = ["600000.SH", "600036.SH", "000001.SZ"]train_df = load_data(train_codes)model = trainer.initial_train(train_df)# 增量训练示例# 贵州茅台(600519.SH)# new_df = load_data(["600519.SH"])# model = trainer.incremental_update(new_df)# 单股票回测(test_data, test_df, model, feature_engineer, feature_selector, config) = (prepare_backtest_data("600036.SH", device))backtester = BacktestStrategy(model, device)pf = backtester.run(test_data, test_df)print("回测结果统计:")print(pf.stats())pf.plot().show()

风险提示与免责声明
本文内容基于公开信息研究整理,不构成任何形式的投资建议。历史表现不应作为未来收益保证,市场存在不可预见的波动风险。投资者需结合自身财务状况及风险承受能力独立决策,并自行承担交易结果。作者及发布方不对任何依据本文操作导致的损失承担法律责任。市场有风险,投资须谨慎。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/76904.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

4.3-4.6学习总结 Java:Set系列集合+双列集合+Map

Set系列集合&#xff1a; 元素是唯一的。 HashSet&#xff1a; 哈希值&#xff1a; 如果没有重写hashcode方法和equals方法&#xff0c;那么哈希值是根据地址值计算的。 LinkedHashSet&#xff1a; TreeSet底层为红黑树。 红黑树&#xff1a;两个红色节点不能相连。 双列集合&…

unreal engine5开发仿鬼泣5的游戏,把敌人击飞到空中4连击

UE5系列文章目录 文章目录 UE5系列文章目录前言一、实现思路二、具体蓝图 前言 unreal engine5开发仿鬼泣5的游戏&#xff0c;把敌人击飞到空中4连击&#xff0c;先看下效果 一、实现思路 unreal engine5开发仿鬼泣5的游戏&#xff0c;把敌人击飞到空中4连击 在Unreal Engi…

功耗日志抓取需求

最近罗列了一些功耗分析需要的常见日志&#xff1a; 测试功耗前&#xff1a; adb shell dumpsys batterystats --reset adb shell dumpsys batterystats --enable full-wake-history 测试功耗后&#xff0c;使用脚本导出如下功耗日志&#xff1a; 脚本 chmod x collect_logs.s…

Java后端开发流程

Java后端开发流程 目录 开发流程概述具体实现步骤开发最佳实践项目结构示例代码示例常见问题与解决方案 开发流程概述 Java后端开发是一个系统化的过程&#xff0c;通常包括以下几个主要阶段&#xff1a; 1. 需求分析阶段 业务需求收集&#xff1a;与产品经理、业务方沟通…

Java项目之基于ssm的孩童收养信息管理(源码+文档)

项目简介 孩童收养信息管理实现了以下功能&#xff1a; 实现了用户在线选择试题并完成答题&#xff0c;在线查看考核分数。管理员管理字典管理、收养管理、收养信息更改记录管理、收养者配偶管理、送养管理、员工管理、管理员管理等功能。 &#x1f495;&#x1f495;作者&am…

查询条件与查询数据的ajax拼装

下面我将介绍如何使用 AJAX 动态拼装查询条件和获取查询数据&#xff0c;包括前端和后端的完整实现方案。 一、前端实现方案 1. 基础 HTML 结构 html 复制 <div class"query-container"><!-- 查询条件表单 --><form id"queryForm">…

【算法竞赛】状态压缩型背包问题经典应用(蓝桥杯2019A4分糖果)

在蓝桥杯中遇到的这道题&#xff0c;看上去比较普通&#xff0c;但其实蕴含了很巧妙的“状态压缩 背包”的思想&#xff0c;本文将从零到一&#xff0c;详细解析这个问题。 目录 一、题目 二、思路分析&#xff1a;状态压缩 最小覆盖 1. 本质&#xff1a;最小集合覆盖问题…

STL 性能优化实战:解决项目中标准模板库的性能瓶颈

&#x1f9d1; 博主简介&#xff1a;CSDN博客专家、全栈领域优质创作者、高级开发工程师、高级信息系统项目管理师、系统架构师&#xff0c;数学与应用数学专业&#xff0c;10年以上多种混合语言开发经验&#xff0c;从事DICOM医学影像开发领域多年&#xff0c;熟悉DICOM协议及…

大模型如何优化数字人的实时交互与情感表达

标题:大模型如何优化数字人的实时交互与情感表达 内容:1.摘要 随着人工智能技术的飞速发展&#xff0c;数字人在多个领域的应用愈发广泛&#xff0c;其实时交互与情感表达能力成为提升用户体验的关键因素。本文旨在探讨大模型如何优化数字人的实时交互与情感表达。通过分析大模…

qt designer 软件主题程序设计

对于使用Qt Designer设计的界面&#xff0c;主题切换的实现需要结合Qt的信号槽机制、样式表动态加载以及资源管理。以下是针对Qt Designer UI的详细解决方案&#xff1a; 一、UI文件与主题系统的整合架构 二、核心实现步骤 1. 动态样式表加载系统 // ThemeManager.h class …

一、STM32简介

一、实验器材介绍 二、STM32简介 1.STM32 名词解释 STM32是ST公司基于ARM Cortex-M内核开发的32位微控制器。 ST&#xff0c;指ST公司&#xff08;意法半导体&#xff09;;M&#xff0c;MicroController 微控制器&#xff08;MCU,MicroController Unit 微控制器单元/单片机&…

JVM虚拟机篇(一)深入理解JVM:组成部分、运行流程及程序计数器详解

JVM虚拟机篇&#xff08;一&#xff09;深入理解JVM&#xff1a;组成部分、运行流程及程序计数器详解 JVM虚拟机篇&#xff08;一&#xff09;深入理解JVM&#xff1a;组成部分、运行流程及程序计数器详解一、引言二、JVM的组成部分2.1 类加载子系统2.2 运行时数据区2.3 执行引…

elementui的默认样式修改

今天用element ui &#xff0c;做了个消息提示&#xff0c;发现提示的位置总是在上面&#xff0c;如图&#xff1a; 可是我想让提示的位置到下面来&#xff0c;该怎么办&#xff1f; 最后还是看了官方的api 原来有个自定义样式属性 customClass 设置下就好了 js代码 css代码 效…

游戏引擎学习第204天

回顾并为今天的内容做铺垫 好&#xff0c;现在开始这一集。今天我们将进行一些用户界面编程&#xff0c;觉得这是一个展示如何编写这类代码的好时机。很多人对如何做用户界面代码都很好奇&#xff0c;所以展示一下如何编写是非常有意义的。 我之所以在现在的这个地方做这些工…

我的世界1.20.1forge模组开发进阶教程——TerraBlender

TerraBlender介绍 从模组开发者的视角来看,TerraBlender为Minecraft生物群系类模组的开发提供了全方位的技术支持,显著降低了开发门槛并提升了模组的质量与扩展性: 跨平台兼容性架构支持Forge/Fabric/Quilt/NeoForge四大主流加载器,开发者无需为不同平台单独适配代码客户端…

借助mcpo在open-webui中使用mcp

open-webui前几天发布了0.6版本&#xff0c;我立即进行了升级。新版本中一个重要功能是通过mcpo方式支持了mcp server。本文将介绍mcpo是什么&#xff0c;以及如何在open-webui中使用它。同时&#xff0c;我也会分享几个在接入过程中遇到的问题及解决方案。 首先来介绍mcpo&…

安装gpu版本的dgl

1.先去网址&#xff0c;找到对应版本的dgl,然后下载到本地。 dgl-whl下载地址 我的是python 3.8 &#xff0c;cuda 11.6. windows 2.在虚拟环境里 输入 pip install E:\dgl-1.0.2cu116-cp38-cp38-win_amd64.whl &#xff08;因为我下载到E盘里了&#xff09; 这样GPU版本的d…

PyTorch使用(7)-张量常见运算函数

1. 基本数学运算 1.1 平方根和幂运算 import torchx torch.tensor([4.0, 9.0, 16.0])# 平方根 sqrt_x torch.sqrt(x) # tensor([2., 3., 4.])# 平方 square_x torch.square(x) # tensor([16., 81., 256.])# 任意幂次 pow_x torch.pow(x, 3) # tensor([64., 729., 4096…

Nginx功能及应用全解:从负载均衡到反向代理的全面剖析

Nginx作为一款开源的高性能HTTP服务器和反向代理服务器&#xff0c;凭借其高效的资源利用率和灵活的配置方式&#xff0c;已成为互联网领域中最受欢迎的Web服务器之一。无论是作为HTTP服务器、负载均衡器&#xff0c;还是作为反向代理和缓存服务器&#xff0c;Nginx的多种功能广…

安徽京准:NTP时间同步服务器操作使用说明

安徽京准&#xff1a;NTP时间同步服务器操作使用说明 3.1 连接天线 天线连接到“ANT”口。 3.2 连接电源 将220V电源线连到AC220V座上或将电源适配器&#xff08;7.5V~12V&#xff09;接到DC口上。也可以同时接上&#xff0c;提高供电可靠性。 3.3 LAN网口 网线连接到NTP…