这个代码参考了时间序列预测模型实战案例(三)(LSTM)(Python)(深度学习)时间序列预测(包括运行代码以及代码讲解)_lstm预测模型-CSDN博客
结合我之前所学的lstm-seq2seq里所学习到的知识对其进行预测
import time
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from matplotlib import pyplot as plt
from sklearn.preprocessing import MinMaxScalernp.random.seed(0)def calculate_mae(y_true, y_pred):# 平均绝对误差mae = np.mean(np.abs(y_true - y_pred))return maetrue_data = pd.read_csv(r"C:\Users\33746\Desktop\DailyDelhiClimateTrain.csv") # 填你自己的数据地址target = 'meanpressure'# 这里加一些数据的预处理, 最后需要的格式是pd.seriestrue_data = np.array(true_data['meanpressure'])# 定义窗口大小
test_data_size = 32
# 训练集和测试集的尺寸划分
test_size = 0.15
train_size = 0.85
# 标准化处理
scaler_train = MinMaxScaler(feature_range=(0, 1))
scaler_test = MinMaxScaler(feature_range=(0, 1))
train_data = true_data[:int(train_size * len(true_data))]
test_data = true_data[-int(test_size * len(true_data)):]
print("训练集尺寸:", len(train_data))
print("测试集尺寸:", len(test_data))
train_data_normalized = scaler_train.fit_transform(train_data.reshape(-1, 1))
test_data_normalized = scaler_test.fit_transform(test_data.reshape(-1, 1))
# 转化为深度学习模型需要的类型Tensor
train_data_normalized = torch.FloatTensor(train_data_normalized).view(-1)
test_data_normalized = torch.FloatTensor(test_data_normalized).view(-1)def create_inout_sequences(input_data, tw, pre_len):inout_seq = []L = len(input_data)for i in range(L - tw):train_seq = input_data[i:i + tw]if (i + tw + 4) > len(input_data):breaktrain_label = input_data[i + tw:i + tw + pre_len]inout_seq.append((train_seq, train_label))return inout_seqpre_len = 4
train_window = 16
# 定义训练器的的输入
train_inout_seq = create_inout_sequences(train_data_normalized, train_window, pre_len)class LSTM(nn.Module):def __init__(self, input_dim=1, hidden_dim=350, output_dim=1):super(LSTM, self).__init__()self.hidden_dim = hidden_dimself.lstm = nn.LSTM(input_dim, hidden_dim, batch_first=True)self.fc = nn.Linear(hidden_dim, output_dim)def forward(self, x):x = x.unsqueeze(1)h0_lstm = torch.zeros(1, self.hidden_dim).to(x.device)c0_lstm = torch.zeros(1, self.hidden_dim).to(x.device)out, _ = self.lstm(x, (h0_lstm, c0_lstm))out = out[:, -1]out = self.fc(out)return outlstm_model = LSTM(input_dim=1, output_dim=pre_len, hidden_dim=train_window)
loss_function = nn.MSELoss()
optimizer = torch.optim.Adam(lstm_model.parameters(), lr=0.001)
epochs = 10
Train = False # 训练还是预测if Train:losss = []lstm_model.train() # 训练模式start_time = time.time() # 计算起始时间for i in range(epochs):for seq, labels in train_inout_seq:lstm_model.train()optimizer.zero_grad()y_pred = lstm_model(seq)single_loss = loss_function(y_pred, labels)single_loss.backward()optimizer.step()print(f'epoch: {i:3} loss: {single_loss.item():10.8f}')losss.append(single_loss.detach().numpy())torch.save(lstm_model.state_dict(), 'save_model.pth')print(f"模型已保存,用时:{(time.time() - start_time) / 60:.4f} min")plt.plot(losss)# 设置图表标题和坐标轴标签plt.title('Training Error')plt.xlabel('Epoch')plt.ylabel('Error')# 保存图表到本地plt.savefig('training_error.png')
else:# 加载模型进行预测lstm_model.load_state_dict(torch.load('save_model.pth'))lstm_model.eval() # 评估模式results = []reals = []losss = []test_inout_seq = create_inout_sequences(test_data_normalized, train_window, pre_len)for seq, labels in train_inout_seq:pred = lstm_model(seq)[0].item()results.append(pred)mae = calculate_mae(pred, labels.detach().numpy()) # MAE误差计算绝对值(预测值 - 真实值)reals.append(labels.detach().numpy())losss.append(mae)print("模型预测结果:", results)print("预测误差MAE:", losss)plt.style.use('ggplot')# 创建折线图plt.plot(results, label='real', color='blue') # 实际值plt.plot(reals, label='forecast', color='red', linestyle='--') # 预测值# 增强视觉效果plt.grid(True)plt.title('real vs forecast')plt.xlabel('time')plt.ylabel('value')plt.legend()plt.savefig('test——results.png')
train_data = true_data[:int(train_size * len(true_data))]
test_data = true_data[-int(test_size * len(true_data)):]是在干什么
这两行代码的作用是将数据集 true_data
划分为训练集 train_data
和测试集 test_data
。具体来说:
代码解释:
-
train_data = true_data[:int(train_size * len(true_data))]
:-
train_size
是一个比例值(例如0.8,表示80%),表示训练集占总数据集的比例。 -
len(true_data)
返回true_data
的长度。 -
train_size * len(true_data)
计算出训练集的大小。 -
int(train_size * len(true_data))
将计算结果转换为整数,表示训练集的实际样本数量。 -
true_data[:int(train_size * len(true_data))]
使用切片操作从true_data
的开头到训练集的样本数量,提取出训练集。
-
-
test_data = true_data[-int(test_size * len(true_data)):]
:-
test_size
是另一个比例值(例如0.2,表示20%),表示测试集占总数据集的比例。 -
test_size * len(true_data)
计算出测试集的大小。 -
int(test_size * len(true_data))
将计算结果转换为整数,表示测试集的实际样本数量。 -
true_data[-int(test_size * len(true_data)):]
使用切片操作从true_data
的末尾到测试集的样本数量,提取出测试集。
-
示例:
假设 true_data
有100个样本,train_size
为0.8,test_size
为0.2:
-
train_size * len(true_data) = 0.8 * 100 = 80
-
test_size * len(true_data) = 0.2 * 100 = 20
那么:
-
train_data = true_data[:80]
会提取true_data
的前80个样本作为训练集。 -
test_data = true_data[-20:]
会提取true_data
的后20个样本作为测试集。
注意事项:
-
数据顺序:如果
true_data
是有序的(例如时间序列数据),这种划分方式可能会导致训练集和测试集之间存在时间上的重叠,从而引入数据泄露。在这种情况下,应该按照时间顺序进行划分。 -
随机性:如果
true_data
是无序的,这种划分方式是合理的。但如果数据没有被打乱,建议在划分之前先打乱数据,以确保训练集和测试集能够代表整体数据的分布。 -
假设
input_data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
,tw = 3
,pre_len = 2
: -
第一次循环:
-
i = 0
-
train_seq = [1, 2, 3]
-
train_label = [4, 5]
-
inout_seq = [([1, 2, 3], [4, 5])]
-
-
第二次循环:
-
i = 1
-
train_seq = [2, 3, 4]
-
train_label = [5, 6]
-
inout_seq = [([1, 2, 3], [4, 5]), ([2, 3, 4], [5, 6])]
-
-
以此类推,直到
i = 7
时,train_seq = [8, 9, 10]
,train_label = []
,此时i + tw + pre_len
超出input_data
的范围,循环结束。