这段代码是为了并行地处理多个 CSV 文件,并使用机器学习模型进行预测和回测。主要涉及以下步骤:
-
初始化环境与设置:
- 引入必要的库,如
ray
用于并行计算,pandas
用于数据处理,tqdm
用于进度条显示等。 - 设置一些路径,用于保存结果、图像、模型等。
- 定义一些处理特征、数据预处理的函数。
- 引入必要的库,如
-
并行处理函数
csv_predict
:- 使用
ray.remote
将csv_predict
函数并行化。 - 在每个函数中,加载训练好的模型,并对新的 CSV 文件进行预测和回测。
- 使用
-
具体步骤:
- 读取 CSV 文件:读取并处理每个 CSV 文件,确保数据格式正确。
- 数据预处理:包括特征计算、标准化等。
- 构建验证数据集:将处理后的数据转换为模型可接受的格式。
- 预测与回测:使用模型对数据进行预测,并根据预测结果进行回测计算,模拟交易策略。
-
结果保存:
- 根据回测结果,将交易数据保存到不同的文件夹中。
- 以不同的策略和条件,将结果分门别类保存。
代码解读
import ray# 验证集数据处理
a = []
sum_dam_data = []# 定义并行处理函数
@ray.remote
def csv_predict(csv_path):# 创建和训练模型参数nhits_params = {'sampling_stride': 8,'eval_metrics': ["mse", "mae"],'batch_size': 32,'max_epochs': 100,'patience': 10}rnn_params = {'sampling_stride': 8,'eval_metrics': ["mse", "mae"],'batch_size': 32,'max_epochs': 100,'patience': 10,}mlp_params = {'sampling_stride': 8,'eval_metrics': ["mse", "mae"],'batch_size': 32,'max_epochs': 100,'patience': 10,'use_bn': True,}# 加载训练好的加权集成预测模型reg = WeightingEnsembleForecaster(in_chunk_len=64,out_chunk_len=1,skip_chunk_len=0,estimators=[(NHiTSModel, nhits_params), (RNNBlockRegressor, rnn_params), (MLPRegressor, mlp_params)])reg = reg.load(os.path.join(model_center, "low_high"))# 读取 CSV 文件new_data = pd.read_csv(csv_path)new_data[['open', 'high', 'low', 'close', 'pre_close', 'change', 'pct_chg', 'vol', 'amount']] = new_data[['open', 'high', 'low', 'close', 'pre_close', 'change', 'pct_chg', 'vol', 'amount']].apply(pd.to_numeric, errors='coerce')# 如果数据长度不足,返回空结果if len(new_data) < 2048:return {}base_case = 0base_num = 0money = 0reverse_data = new_data.iloc[::-1] # 反转数据顺序# 计算特征reverse_data = calculate_features(reverse_data)# 逐天进行预测和回测for day_i in range(64):new_data = reverse_data[-256:-64+day_i]new_data['index_new'] = range(1, len(new_data) + 1)# 构建验证数据集valid_tsdataset = TSDataset.load_from_dataframe(new_data,time_col="index_new",target_cols=['open', 'high', 'low', 'close', 'pre_close', 'change', 'pct_chg', 'MA5', 'MA10', 'MA20', 'EMA12', 'EMA26', 'Volatility_5', 'Volatility_10', 'Volume_MA5', 'Volume_Change_Rate', 'RSI14', 'Momentum_3', 'Momentum_7', 'Middle_Band', 'Upper_Band', 'Lower_Band'])valid_tsdataset = scaler.transform(valid_tsdataset)predicted = reg.recursive_predict(valid_tsdataset, 3)predicted = scaler.inverse_transform(predicted)predicted = predicted.to_dataframe()# 根据预测结果进行回测计算high_value = predicted.max().to_dict()['high']low_value = predicted.min().to_dict()['low']round_value = round((high_value - low_value) / low_value, 3) * 1000high_index = predicted[predicted['high'] == high_value].index.values[0] - len(new_data)low_index = predicted[predicted['low'] == low_value].index.values[0] - len(new_data)if high_value > low_value:if high_index > low_index:if base_num < 100000:if reverse_data[-(64 - day_i)][3] > low_value > reverse_data[-(64 - day_i)][4]:base_case += 10000 * low_valuebase_num += 10000money -= 10000 * low_valueif low_index > high_index:high_value = predicted['high'].tolist()[0]if reverse_data[-(64 - day_i)][3] > high_value > reverse_data[-(64 - day_i)][4]:base_case -= base_num * high_valuemoney += base_num * high_valuebase_num = 0else:base_case -= base_num * high_valuemoney += base_num * high_valuebase_num = 0sum_money = money + base_num * reverse_data[-(64 - day_i)][5]# 保存回测结果deal.append({"base_case": base_case,"base_num": base_num,"money": money,"index": reverse_data[-(64 - day_i)][-1],"close": reverse_data[-(64 - day_i)][5] * 10000,"total": base_num * reverse_data[-(64 - day_i)][5],"sum": sum_money,"rate": 100 * (sum_money / (reverse_data[-(64 - day_i)][5] * 10000))})try:pd.DataFrame(deal).to_csv(os.path.join("./back_test/low_high_128_5_100", last_price_data, str(int(deal[-1]['rate'])) + "_" + export_csv),index=False)except Exception as e:print(e)returnif deal[-1]['rate'] > 10:if pd.DataFrame(deal)['rate'].sum() > 0:pd.DataFrame(deal).to_csv(os.path.join("./back_test/good_low_high_5_100_deal_101", last_price_data, str(int(deal[-1]['rate'])) + "_" + export_csv),index=False)if deal[-1]['rate'] > 50:if pd.DataFrame(deal)['rate'].sum() > 0:pd.DataFrame(deal).to_csv(os.path.join("./back_test/good_low_high_5_100_deal_105", last_price_data, str(int(deal[-1]['rate'])) + "_" + export_csv),index=False)
主要功能
-
模型加载与预测:
- 加载预训练模型
WeightingEnsembleForecaster
并进行预测。 - 预测未来几天的高低价格。
- 加载预训练模型
-
回测策略:
- 根据预测的高低价进行模拟交易,计算收益。
- 基于交易规则买入或卖出,计算资金和持仓。
-
结果保存:
- 将回测结果保存到 CSV 文件中。
- 根据不同的收益率将结果分开保存。
使用说明
- 确保已安装
ray
库用于并行计算。 - 确保所有依赖库(如
pandas
,paddlets
,tqdm
等)已安装。 - 将代码中的路径和参数调整为实际数据和模型的位置。
- 运行代码,通过
ray
并行处理多个 CSV 文件,提高处理效率。
注意事项
- 确保数据格式和模型参数与实际情况匹配。
- 在并行化时,要确保每个子任务的独立性,避免数据冲突。
- 根据需要调整回测策略和交易规则,以满足实际需求。