基于MLP算法实现交通流量预测(Pytorch版)

在海量的城市数据中,交通流量数据无疑是揭示城市运行脉络、洞察出行规律的关键要素之一。实时且精准的交通流量预测不仅能为交通规划者提供科学决策依据,助力提升道路使用效率、缓解交通拥堵,还能为公众出行提供参考,实现个性化导航服务,进而提升整个城市的运行效能。然而,交通流量受到众多因素的交织影响,如天气变化、特殊事件、节假日效应、时段特性等,其动态变化规律呈现出显著的非线性、时变性和不确定性,这无疑给预测工作带来了巨大挑战。

在此背景下,机器学习技术,尤其是深度学习方法,凭借其强大的模型构建能力和对复杂非线性关系的出色捕捉能力,逐渐崭露头角,成为交通流量预测领域的研究热点。其中,多层感知器(MLP)作为一类基础而经典的前馈型人工神经网络,以其简洁的结构、灵活的适应性和良好的泛化性能,在处理高维、非线性问题上展现出独特优势。 MLP通过模拟人脑神经元的工作机制,通过多层非线性变换对输入数据进行深度抽象和特征学习,能够有效挖掘交通流量数据背后的复杂关联与潜在模式,从而实现对未来的流量状态进行精准预测。

模型简介

多层感知器(Multi-Layer Perceptron, MLP)是一种基础且广泛使用的前馈型人工神经网络模型,它是深度学习领域的重要组成部分,尤其在处理非线性关系和模式识别问题中表现出强大的能力。MLP的核心特征在于其多层结构,由多个神经元组成的隐藏层与输入层和输出层相互连接,形成一个分层信息处理系统。下面是对MLP的详细介绍:

基本结构与组件:

  1. 输入层(Input Layer):接收原始数据作为网络的输入。这些数据通常是经过预处理后的数值向量,代表了待解决问题中的各种特征或变量。

  2. 隐藏层(Hidden Layers):位于输入层与输出层之间的中间层。MLP可以有一个或多个隐藏层,每个隐藏层包含多个神经元。隐藏层的主要功能是通过非线性变换对输入数据进行复杂的特征提取和表示学习,从而捕获数据中的潜在关系和模式。神经元之间通常是全连接的,即每个隐藏层神经元接收到前一层所有神经元的加权输出。

  3. 输出层(Output Layer):产生网络的最终输出,根据任务需求可以是一维或多维的。在交通流量预测等回归任务中,输出层通常只有一个神经元,给出连续的预测值;而在分类任务中,输出层神经元数量对应类别数,每个神经元的激活值代表对应类别的概率。

工作原理:

  1. 加权求和与激活函数:每个神经元接收前一层所有神经元的输出(或输入层的原始数据),对这些输入进行加权求和,加上一个偏置项后,通过一个非线性激活函数进行转换。常见的激活函数包括Sigmoid、Tanh、ReLU及其变种等。激活函数引入非线性,使得网络能够表达复杂的非线性关系。

  2. 前向传播(Forward Propagation):信息从输入层依次经过隐藏层直至输出层的过程称为前向传播。在这个过程中,数据通过各层神经元的加权求和和激活函数计算,逐步形成更高级的特征表示,最终在输出层生成预测结果。

  3. 反向传播(Backpropagation):当网络进行预测后,会计算预测结果与实际标签之间的误差(损失函数)。反向传播算法根据这个误差,从输出层开始,逐层反向调整各层神经元的权重和偏置,以最小化总体误差。这是通过链式法则计算梯度并应用梯度下降或其变种算法实现的。反向传播确保了网络在训练过程中能够自我学习并逐步改善其预测性能。

训练与应用:

  1. 训练过程:给定标记好的训练数据集,MLP通过迭代执行前向传播和反向传播,更新模型参数,直到达到预设的停止条件(如达到一定迭代次数、损失函数收敛或验证集性能不再提升等)。训练过程中可能涉及正则化、批量归一化、dropout等技术防止过拟合。

  2. 应用领域:MLP因其灵活性和普适性,被广泛应用于各种领域,如图像识别、语音识别、自然语言处理、时间序列预测(如交通流量预测)、金融风险评估、生物医学信号分析等。在交通流量预测中,MLP可以接收历史流量数据、气象信息、节假日标志等多元输入,学习并建模这些因素与未来流量之间的复杂非线性关系,从而做出准确预测。

综上所述,多层感知器(MLP)作为一种基础的前馈神经网络模型,通过多层非线性变换对输入数据进行抽象和学习,适用于各种非线性预测和分类任务。其训练过程依赖于反向传播算法来优化模型参数,使其能够捕捉数据中的复杂模式,并在诸多实际应用场景中展现出强大的预测性能。在交通流量预测中,MLP能够整合多种影响因素,为交通管理者提供有价值的决策支持。

模型构建

class MLP(nn.Module):def __init__(self, input_size, hidden_sizes, output_size):super(MLP, self).__init__()self.input_size = input_sizeself.hidden_sizes = hidden_sizesself.output_size = output_sizelayers = []prev_size = input_sizefor hidden_size in hidden_sizes:layers.append(nn.Linear(prev_size, hidden_size))layers.append(nn.Sigmoid())prev_size = hidden_sizelayers.append(nn.Linear(prev_size, output_size))self.model = nn.Sequential(*layers)def forward(self, x):# [Batch, Input_len, Node] --> [Batch, Node, Input_len]x = x.permute(0, 2, 1)y = self.model(x)# [Batch, Node, Output_len] --> [Batch, Output_len, Node]y = y.permute(0, 2, 1)return y

其中,input_size代表输入层节点数目,hidden_sizes为隐藏层的节点数目,如[24, 36, 24]则代表有3层隐藏层,隐藏层的节点数目分别为24、36、24,output_size则为输出层节点数目。

一个典型的MLP神经网络如下图所示:

在这里插入图片描述

在时间序列任务中,考虑到数据变化的趋势性,认为未来时间窗的特征和当前时刻的前置时间窗特征相关性较大。

在这里插入图片描述

在单变量的预测场景下,假设时间窗为5分钟,用最近12个时间窗的流量预测未来3个时间窗的流量,则可以将最近12个时间窗的流量作为输入特征,此时输入层的节点数目为12,而需要预测的未来的3个时间窗的流量作为输出特征,此时输出层的节点数目为3。

编写个主函数验证下网络shape变化是否符合预期:

if __name__ == '__main__':parser = argparse.ArgumentParser()parser.add_argument('--window_size', type=int, default=12)parser.add_argument('--horizon', type=int, default=3)parser.add_argument('--hidden_sizes', type=list, default=[24, 36, 24])args = parser.parse_args()model = DNN(input_size=args.window_size, hidden_sizes=args.hidden_sizes, output_size=args.horizon)print(model)x = torch.randn(8, 12, 96)y = model(x)print(y.shape)

输出为:

MLP((model): Sequential((0): Linear(in_features=12, out_features=24, bias=True)(1): Sigmoid()(2): Linear(in_features=24, out_features=36, bias=True)(3): Sigmoid()(4): Linear(in_features=36, out_features=24, bias=True)(5): Sigmoid()(6): Linear(in_features=24, out_features=3, bias=True))
)
torch.Size([8, 3, 96])

输入张量的shape为[batch_size, input_len, node],node代表不同的序列,如果是交通流量预测,代表的是不同路段或交叉口。

输出张量的shape为[batch_size, output_len, node]。

用最近12个时间窗的流量预测未来3个时间窗的流量,输入张量的shape为[8, 12, 96],输出张量的shape为[8, 3, 96],符合预期。

那如果是多变量的预测场景呢?

其实,也很简单,假设时间窗为5分钟,用最近12个时间窗的流量、速度、占有率来预测未来3个时间窗的流量,此时输入特征为最近12个时间窗的流量、速度、占有率,有12*3=36个特征,故输入层的节点数目为36,由于输出还是流量这个单变量,所以输出层的节点数目还是3。

数据输入

本文以PEMS数据集作为算法的训练、验证和测试数据。

PEMS数据集是针对加利福尼亚州不同区域高速公路网络收集的交通数据,数据集可能包含多个传感器站点的数据,每个站点每五分钟记录了特定路段或路口的交通状况,包括但不限于:

  • 流量

单位时间内通过某路段或交叉口的车辆数量,反映道路的使用程度。

  • 速度

车辆在道路上行驶的平均速度,用于评估道路的运行效率和拥堵状况。

  • 占有率

车道被车辆占用的比例,是衡量道路拥挤程度的另一个重要指标。

PEMS03数据,26208*358*3,358个检测器,26208个5分钟时间窗(2012/5/1开始,91天),3个变量分别为流量、速度和占有率。PEMS04数据,16992*307*3,307个检测器,16992个5分钟时间窗(2017/7/1开始,59天),3个变量分别为流量、速度和占有率。PEMS07数据,28224*883*3,883个检测器,28224个5分钟时间窗(2017/5/1开始,98天),3个变量分别为流量、速度和占有率。PEMS08数据,17856*170*3,170个检测器,17856个5分钟时间窗(2012/3/1开始,62天),3个变量分别为流量、速度和占有率。

读取数据后,可以将数据传入自定义的DataSet,以方便后续的训练:

class ForecastDataset(torch_data.Dataset):def __init__(self, df, window_size, horizon, normalize_method=None, norm_statistic=None, interval=1):self.window_size = window_size # 12self.interval = interval  #1self.horizon = horizonself.normalize_method = normalize_methodself.norm_statistic = norm_statisticdf = pd.DataFrame(df)df = df.fillna(method='ffill', limit=len(df)).fillna(method='bfill', limit=len(df)).valuesself.data = dfself.df_length = len(df)self.x_end_idx = self.get_x_end_idx()if normalize_method:self.data, _ = normalized(self.data, normalize_method, norm_statistic)def __getitem__(self, index):hi = self.x_end_idx[index] #12lo = hi - self.window_size #0train_data = self.data[lo: hi] #0:12target_data = self.data[hi:hi + self.horizon] #12:24x = torch.from_numpy(train_data).type(torch.float)y = torch.from_numpy(target_data).type(torch.float)return x, ydef __len__(self):return len(self.x_end_idx)def get_x_end_idx(self):# each element `hi` in `x_index_set` is an upper bound for get training data# training data range: [lo, hi), lo = hi - window_sizex_index_set = range(self.window_size, self.df_length - self.horizon + 1)x_end_idx = [x_index_set[j * self.interval] for j in range((len(x_index_set)) // self.interval)]return x_end_idxdef normalized(data, normalize_method, norm_statistic=None):if normalize_method == 'min_max':if not norm_statistic:norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))scale = norm_statistic['max'] - norm_statistic['min'] + 1e-5data = (data - norm_statistic['min']) / scaledata = np.clip(data, 0.0, 1.0)elif normalize_method == 'z_score':if not norm_statistic:norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))mean = norm_statistic['mean']std = norm_statistic['std']std = [1 if i == 0 else i for i in std]data = (data - mean) / stdnorm_statistic['std'] = stdreturn data, norm_statisticdef de_normalized(data, normalize_method, norm_statistic):if normalize_method == 'min_max':if not norm_statistic:norm_statistic = dict(max=np.max(data, axis=0), min=np.min(data, axis=0))scale = norm_statistic['max'] - norm_statistic['min'] + 1e-8data = data * scale + norm_statistic['min']elif normalize_method == 'z_score':if not norm_statistic:norm_statistic = dict(mean=np.mean(data, axis=0), std=np.std(data, axis=0))mean = norm_statistic['mean']std = norm_statistic['std']std = [1 if i == 0 else i for i in std]data = data * std + meanreturn data

上述代码定义了一个名为ForecastDataset的类,它是基于torch_data.Dataset的子类,用于处理时间序列预测任务的数据集。同时,还提供了normalizedde_normalized两个函数,分别用于数据的标准化(归一化)和反标准化。

ForecastDataset类

  1. 初始化方法__init__

    • 输入参数:
      • df:原始数据。
      • window_size:滑动窗口大小,用于截取历史数据作为模型训练的输入。
      • horizon:预测步长,即模型需要预测未来多少个时间点或时间窗的数据。
      • normalize_method:数据标准化方法,可选值为'min_max'(最小最大值归一化)或'z_score'(Z-score标准化)。
      • norm_statistic:若已知数据的统计信息(如最大值、最小值、均值、标准差),可直接传入;否则,将根据数据计算这些统计量。
      • interval:采样间隔。
    • 方法内部:
      • 将输入的DataFrame填充缺失值,并转化为NumPy数组。
      • 初始化类的属性:滑动窗口大小、采样间隔、预测步长、标准化方法、统计信息等。
      • 计算数据集长度和训练数据结束索引(x_end_idx),用于后续按索引获取训练数据和目标数据。
      • 如果指定了标准化方法,则对数据进行标准化处理,同时更新统计信息。
  2. __getitem__方法:

    • 输入参数:index,表示数据集中第index个样本的索引。
    • 方法内部:
      • 根据x_end_idx列表计算当前样本的训练数据起始索引lo和结束索引hi
      • 提取训练数据(历史数据)和目标数据(未来数据)。
      • 将提取到的训练数据和目标数据转化为PyTorch张量并设置为浮点类型。
      • 返回包含训练数据和目标数据的元组。
  3. __len__方法:

    • 返回数据集的样本数,即x_end_idx列表的长度。
  4. get_x_end_idx方法:

    • 该方法用于生成一个列表,其中每个元素hi表示一个训练数据结束索引。
    • 计算x_index_set,包含所有可能的训练数据结束索引(满足window_sizedf_length - horizon + 1的范围)。
    • 根据采样间隔intervalx_index_set中选取训练数据结束索引,组成x_end_idx列表并返回。

辅助函数

  1. normalized函数:

    • 输入参数:待标准化数据data、标准化方法normalize_method及统计信息字典norm_statistic(可选)。
    • 函数内部:
      • 根据指定的标准化方法进行数据标准化:
        • 若为'min_max'
          • 若未提供统计信息,计算数据的最大值和最小值,然后进行最小最大值归一化。
          • 确保数据在[0, 1]范围内。
        • 若为'z_score'
          • 若未提供统计信息,计算数据的均值和标准差,然后进行Z-score标准化。
          • 避免除以零错误,当标准差为零时将其置为1。
      • 返回标准化后的数据及更新后的统计信息字典。
  2. de_normalized函数:

    • 输入参数:已标准化数据data、标准化方法normalize_method及统计信息字典norm_statistic
    • 函数内部:
      • 根据指定的标准化方法进行数据反标准化:
        • 若为'min_max'
          • 使用提供的统计信息(最大值、最小值)进行反归一化。
        • 若为'z_score'
          • 使用提供的统计信息(均值、标准差)进行反Z-score标准化。
          • 避免除以零错误,当标准差为零时将其置为1。
      • 返回反标准化后的数据。

综上所述,上述代码实现了一个用于时间序列预测任务的数据集类ForecastDataset,支持滑动窗口、预测步长、数据标准化等功能,并提供了数据标准化与反标准化的辅助函数。

代码中,window_size即为前置时间窗的个数,horizon则为预测时间窗的个数,interval为采样间隔,默认为1。

比如现在我们有100个时间窗的历史数据,window_size为12,horizon为12,interval为1。

则第1个样本为:

x: 第1到第12个时间窗
y: 第13到第24个时间窗

第2个样本为:

x: 第2到第13个时间窗
y: 第14到第25个时间窗

以此类推,总共可构建出(his_num-window_size-horizon+1) // interval=(100-12-12+1)//1=77个样本。

同样的,若interval为2,则第1个样本为:

x: 第1到第12个时间窗
y: 第13到第24个时间窗

第2个样本为:

x: 第3到第14个时间窗
y: 第15到第26个时间窗

其他样本以此类推,可以看出,所谓的interval其实就是相邻样本之间的时间窗个数。

模型训练

为了适配不同的模型和数据集,我们采用类继承的方式来编写模型训练代码。

首先,我们定义一个父类。

class Exp_Basic(object):def __init__(self, args):self.args = argsself.device = self._acquire_device()self.model = self._build_model()def _acquire_device(self):if self.args.use_gpu:os.environ["CUDA_VISIBLE_DEVICES"] = str(self.args.gpu) if not self.args.use_multi_gpu else self.args.devicesdevice = torch.device('cuda:{}'.format(self.args.gpu))print('Use GPU: cuda:{}'.format(self.args.gpu))else:device = torch.device('cpu')print('Use CPU')return devicedef _get_data(self):pass# 创建模型def _build_model(self):raise NotImplementedErrorreturn Nonedef train(self):passdef valid(self):passdef test(self):pass

接着,编写具体的实现类Exp_MLP_PEMS(Exp_Basic)。

数据读取

首先实现_get_data方法来获取数据:

def _get_data(self):data_file = os.path.join('../../data/PEMS', self.args.dataset, self.args.dataset+'.npz')print('data file:',data_file)data = np.load(data_file,allow_pickle=True)data = data['data'][:, :, 0]train_ratio = self.args.train_length / (self.args.train_length + self.args.valid_length + self.args.test_length)valid_ratio = self.args.valid_length / (self.args.train_length + self.args.valid_length + self.args.test_length)train_data = data[:int(train_ratio * len(data))]valid_data = data[int(train_ratio * len(data)):int((train_ratio + valid_ratio) * len(data))]test_data = data[int((train_ratio + valid_ratio) * len(data)):]if len(train_data) == 0:raise Exception('Cannot organize enough training data')if len(valid_data) == 0:raise Exception('Cannot organize enough validation data')if len(test_data) == 0:raise Exception('Cannot organize enough test data')if self.args.normtype == 0:train_mean = np.mean(train_data, axis=0)train_std = np.std(train_data, axis=0)train_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}val_mean = np.mean(valid_data, axis=0)val_std = np.std(valid_data, axis=0)val_normalize_statistic = {"mean": val_mean.tolist(), "std": val_std.tolist()}test_mean = np.mean(test_data, axis=0)test_std = np.std(test_data, axis=0)test_normalize_statistic = {"mean": test_mean.tolist(), "std": test_std.tolist()}elif self.args.normtype == 1:data_mean = np.mean(data, axis=0)data_std = np.std(data, axis=0)train_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}val_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}test_normalize_statistic = {"mean": data_mean.tolist(), "std": data_std.tolist()}else:train_mean = np.mean(train_data, axis=0)train_std = np.std(train_data, axis=0)train_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}val_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}test_normalize_statistic = {"mean": train_mean.tolist(), "std": train_std.tolist()}train_set = ForecastDataset(train_data, window_size=self.args.window_size, horizon=self.args.horizon,normalize_method=self.args.norm_method, norm_statistic=train_normalize_statistic)valid_set = ForecastDataset(valid_data, window_size=self.args.window_size, horizon=self.args.horizon,normalize_method=self.args.norm_method, norm_statistic=val_normalize_statistic)test_set = ForecastDataset(test_data, window_size=self.args.window_size, horizon=self.args.horizon,normalize_method=self.args.norm_method, norm_statistic=test_normalize_statistic)train_loader = DataLoader(train_set, batch_size=self.args.batch_size, drop_last=False, shuffle=True,num_workers=1)valid_loader = DataLoader(valid_set, batch_size=self.args.batch_size, shuffle=False, num_workers=1)test_loader = DataLoader(test_set, batch_size=self.args.batch_size, shuffle=False, num_workers=1)node_cnt = train_data.shape[1]return test_loader, train_loader, valid_loader,node_cnt,test_normalize_statistic,val_normalize_statistic

上述Python代码定义了一个名为_get_data的方法,其目的是根据传入的参数配置来读取PEMS 03/04/07/08数据集中的流量数据,进行预处理(如划分训练集、验证集、测试集,标准化),并创建相应的DataLoader对象。下面是代码的详细解读:

  1. 读取数据

    • 使用os.path.join()函数根据传入的args.dataset参数拼接数据文件路径,数据文件位于../../data/PEMS/{dataset}/{dataset}.npz目录下。
    • 使用np.load()函数加载.npz文件,其中数据以字典形式存储,键为'data'。加载后的数据是一个三维数组,第三维代表不同特征(流量、速度、占有率),这里取第一个特征(流量)作为建模数据。
    • 数据现在是一个形状为(len, node)的二维数组,其中len表示时间维度上的观测点数,node表示不同节点(如道路、路口)的数量。
  2. 划分数据集

    • 根据传入的train_lengthvalid_lengthtest_length参数,按照时间维度将数据划分为训练集、验证集和测试集。
    • 计算训练集、验证集、测试集在时间轴上的起始和结束索引,并分别切分数据。
    • 如果划分后任一数据集的长度为0,抛出异常,表示无法组织足够的数据。
  3. 数据标准化

    • 根据normtype参数选择不同的标准化方法:
      • normtype=0:分别计算训练集、验证集、测试集的均值和标准差,进行独立标准化。
      • normtype=1:使用整个数据集(训练集、验证集、测试集组合)的均值和标准差,对所有数据进行统一标准化。
      • normtype=2:仅使用训练集的均值和标准差,对训练集、验证集、测试集进行标准化。
    • 计算所需统计量(均值和标准差),并将结果存储为字典格式,如{"mean": [mean1, mean2, ...], "std": [std1, std2, ...]}
  4. 创建并返回数据集和DataLoader对象

    • 使用ForecastDataset类(未在提供的代码中定义)创建训练集、验证集、测试集对象。传入原始数据、窗口大小(window_size)、预测时域(horizon)、标准化方法(norm_method)和对应的标准化统计量。
    • 为每个数据集对象创建一个DataLoader,设置批次大小(batch_size)、是否丢弃最后一小批(drop_last)、是否打乱数据(shuffle)、并行处理的worker数(num_workers)等参数。
    • 返回测试集、训练集、验证集的DataLoader对象,以及节点数量(node_cnt)和测试集、验证集的标准化统计量。

总结:该方法完成了PEMS数据集的读取、划分、标准化,并为训练、验证、测试准备了相应的DataLoader对象,为后续模型训练和评估提供了数据支持。

构建模型

def _build_model(self):model = MLP(input_size=self.args.window_size, hidden_sizes=self.args.hidden_sizes, output_size=self.args.horizon)print(model)return model

训练

def train(self):my_optim = self._select_optimizer()my_lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer=my_optim, gamma=self.args.decay_rate)test_loader, train_loader, valid_loader, node_cnt, test_normalize_statistic, val_normalize_statistic = self._get_data()forecast_loss = nn.L1Loss()best_validate_mae = np.infbest_test_mae = np.infvalidate_score_non_decrease_count = 0if self.args.resume:self.model, lr, epoch_start = load_model(self.model, self.result_file, model_name=self.args.dataset,horizon=self.args.horizon)else:epoch_start = 0for epoch in range(epoch_start, self.args.epoch):lr = adjust_learning_rate(my_optim, epoch, self.args)epoch_start_time = time.time()self.model.train()loss_total = 0cnt = 0for i, (inputs, target) in enumerate(train_loader):inputs = inputstarget = targetself.model.zero_grad()forecast = self.model(inputs)loss = forecast_loss(forecast, target)cnt += 1loss.backward()my_optim.step()loss_total += float(loss)print('| end of epoch {:3d} | time: {:5.2f}s | train_total_loss {:5.4f} '.format(epoch, (time.time() - epoch_start_time), loss_total / cnt))if (epoch + 1) % self.args.exponential_decay_step == 0:my_lr_scheduler.step()if (epoch + 1) % self.args.validate_freq == 0:is_best_for_now = Falseprint('------ validate on data: VALIDATE ------')valid_metrics = self.validate(self.model, valid_loader, self.args.norm_method,val_normalize_statistic,self.args.window_size, self.args.horizon,test=False)test_metrics = self.validate(self.model, test_loader, self.args.norm_method,test_normalize_statistic,self.args.window_size, self.args.horizon,test=True)if best_validate_mae > valid_metrics['mape']:best_validate_mae = valid_metrics['mape']is_best_for_now = Truevalidate_score_non_decrease_count = 0print('got best validation result:', valid_metrics, test_metrics)else:validate_score_non_decrease_count += 1if best_test_mae > test_metrics['mape']:best_test_mae = test_metrics['mape']print('got best test result:', test_metrics)# save modelif is_best_for_now:save_model(epoch, lr, model=self.model, model_dir=self.result_file, model_name=self.args.dataset,horizon=self.args.horizon)print('saved model!')# early stopif self.args.early_stop and validate_score_non_decrease_count >= self.args.early_stop_step:break

上述代码定义了一个名为train的方法,用于训练一个给定的模型。

  1. 初始化变量与加载模型:

    • 调用_select_optimizer方法选择优化器(optimizer)。
    • 创建指数衰减学习率调度器(learning rate scheduler),使用ExponentialLR类,指定优化器和衰减率(decay rate)。
    • 调用_get_data方法获取训练、验证、测试数据加载器、节点数量、测试数据与验证数据的标准化统计信息。
    • 定义损失函数(loss function)为L1损失(nn.L1Loss)。
    • 设置最佳验证MAE(Mean Absolute Error)和最佳测试MAE初始值为正无穷大。
    • 初始化验证分数非下降计数器(validate score non-decrease count)为0。
  2. 检查是否继续之前训练:

    • 如果args.resume参数为真(即继续之前训练),则加载模型、学习率(lr)和开始的训练轮数(epoch_start)。
    • 否则,设置epoch_start为0,从头开始训练。
  3. 主训练循环:

    • 对于每个训练轮(epoch),从epoch_startargs.epoch
      • 调整学习率(adjust_learning_rate函数)。
      • 记录当前轮开始时间。
      • 将模型设置为训练模式。
      • 初始化累计训练损失(loss_total)和样本计数器(cnt)为0。
      • 遍历训练数据加载器中的样本(enumerate(train_loader)):
        • 输入(inputs)和目标(target)保持不变。
        • 清除模型的梯度。
        • 使用模型对输入进行预测(forecast)。
        • 计算预测与目标之间的损失(loss)。
        • 更新样本计数器和累计训练损失。
        • 反向传播损失并更新模型参数。
      • 打印本训练轮的训练耗时和平均损失。
  4. 学习率调整与验证:

    • 如果当前训练轮数(epoch+1)能被指数衰减步长整除,执行学习率调度器的step方法,降低学习率。
    • 如果当前训练轮数能被验证频率整除,进行验证过程:
      • 验证模型在验证集上的表现,调用validate方法,传入模型、验证数据加载器、标准化方法、验证数据的统计信息、窗口大小、预测步长等参数,并设置test=False
      • 同样地,验证模型在测试集上的表现,此时设置test=True
      • 检查当前验证MAPE(Mean Absolute Percentage Error)是否优于历史最佳验证MAPE:
        • 如果是,则更新最佳验证MAPE、重置验证分数非下降计数器,并记录当前验证和测试结果。
      • 检查当前测试MAPE是否优于历史最佳测试MAPE,如果是,则更新最佳测试MAPE。
      • 若当前验证结果为最佳,保存模型(save_model函数),并打印提示信息。
  5. 早停条件判断:

    • 如果启用了早停(args.early_stop为真)且连续args.early_stop_step个验证周期内验证分数未下降,则跳出训练循环。

对应的_select_optimizer方法为:

def _select_optimizer(self):if self.args.optimizer == 'RMSProp':my_optim = torch.optim.RMSprop(params=self.model.parameters(), lr=self.args.lr, eps=1e-08)else:my_optim = torch.optim.Adam(params=self.model.parameters(), lr=self.args.lr, betas=(0.9, 0.999),weight_decay=self.args.weight_decay)return my_optim

验证

def validate(self, model, dataloader, normalize_method, statistic,window_size, horizon, test=False):if test:print("===================Test Normal=========================")else:print("===================Validate Normal=========================")forecast_norm, target_norm, input_norm = self.inference(model, dataloader, window_size, horizon)if normalize_method and statistic:forecast = de_normalized(forecast_norm, normalize_method, statistic)target = de_normalized(target_norm, normalize_method, statistic)else:forecast, target, input = forecast_norm, target_norm, input_normscore = evaluate(target, forecast)score_final_detail = evaluate(target, forecast, by_step=True)print('by each step: MAPE & MAE & RMSE', score_final_detail)if test:print(f'TEST: RAW : MAE {score[1]:7.2f};MAPE {score[0]:7.2f}; RMSE {score[2]:7.2f}.')else:print(f'VAL: RAW : MAE {score[1]:7.2f};MAPE {score[0]:7.2f}; RMSE {score[2]:7.2f}.')return dict(mae=score[1], mape=score[0], rmse=score[2])

上述代码定义了一个名为validate的方法,用于评估模型在给定数据集上的预测性能。

  1. 判断验证或测试模式:

    • 根据test参数的值(True或False)输出不同的提示信息,表明正在进行的是测试(Test)还是验证(Validate)。
  2. 模型推理与数据标准化恢复:

    • 调用inference方法,传入模型、数据加载器、窗口大小、预测步长,得到模型对数据集的预测结果(forecast_norm)、真实目标值(target_norm)和输入数据(input_norm)。
    • 判断是否进行了数据标准化:
      • 如果指定了标准化方法(normalize_method)且提供了统计信息(statistic):
        • 使用de_normalized函数对预测结果和真实目标值进行反标准化,恢复到原始数值范围。
      • 否则,直接使用标准化后的预测结果、真实目标值和输入数据。
  3. 计算评估指标:

    • 调用evaluate函数,传入真实目标值和模型预测结果,计算各项评估指标(MAE、MAPE、RMSE)。
  4. 输出详细评估结果:

    • 调用evaluate函数,传入额外参数by_step=True,得到按预测步长分解的各项评估指标。
    • 输出按预测步长分解的评估指标(MAPE、MAE、RMSE)。
  5. 打印总体评估结果:

    • 根据test参数的值,打印相应的测试或验证结果标签。
    • 输出总体的MAE、MAPE、RMSE值,保留两位小数。
  6. 返回评估指标字典:

    • 将计算得到的MAE、MAPE、RMSE值封装到一个字典中,以maemapermse为键,对应值为值,返回该字典。

综上,validate方法通过模型推理、数据标准化恢复、计算评估指标、输出结果等步骤,对模型在给定数据集上的预测性能进行评估,并返回评估指标的字典。根据test参数的不同,该方法可用于模型的验证或测试阶段。

测试

def test(self):test_loader, train_loader, valid_loader, node_cnt, test_normalize_statistic, val_normalize_statistic = self._get_data()model, lr, epoch = load_model(self.model, self.result_file, model_name=self.args.dataset, horizon=self.args.horizon)return self.validate(model, test_loader, self.args.norm_method, test_normalize_statistic,self.args.window_size, self.args.horizon, test=True)

实时推理

def inference(self, model, dataloader, window_size, horizon):forecast_set = []target_set = []input_set = []self.model.eval()with torch.no_grad():for i, (inputs, target) in enumerate(dataloader):inputs = inputstarget = targetinput_set.append(inputs.detach().cpu().numpy())step = 0forecast_steps = np.zeros([inputs.size()[0], horizon, inputs.size()[2]], dtype=np.float64)# 适配迭代预测和非迭代预测while step < horizon:forecast_result = model(inputs)len_model_output = forecast_result.size()[1]if len_model_output == 0:raise Exception('Get blank inference result')inputs[:, :window_size - len_model_output, :] = inputs[:, len_model_output:window_size,:].clone()inputs[:, window_size - len_model_output:, :] = forecast_result.clone()forecast_steps[:, step:min(horizon - step, len_model_output) + step, :] = \forecast_result[:, :min(horizon - step, len_model_output), :].detach().cpu().numpy()step += min(horizon - step, len_model_output)forecast_set.append(forecast_steps)target_set.append(target.detach().cpu().numpy())return np.concatenate(forecast_set, axis=0), np.concatenate(target_set, axis=0), np.concatenate(input_set,axis=0)

上述代码定义了一个名为inference的方法,用于执行模型在给定数据集上的推理(预测)。

  1. 初始化变量:

    • 创建三个空列表forecast_settarget_setinput_set,分别用于存储模型预测结果、真实目标值和输入数据。
  2. 设置模型为评估模式并禁用梯度计算:

    • 将模型设为评估模式(self.model.eval()),避免在推理过程中进行不必要的前向传播计算。
    • 使用with torch.no_grad():语句块,确保在该块内的计算不记录梯度,节省内存并提高推理速度。
  3. 遍历数据加载器中的样本:

    • 使用enumerate(dataloader)遍历数据加载器中的每个样本,包括输入数据(inputs)和目标值(target)。
    • 将当前样本的输入数据和目标值分别添加到input_settarget_set列表中。
  4. 进行模型推理:

    • 初始化变量step为0,用于记录当前已预测的时间步数。
    • 初始化forecast_steps数组,用于存储当前样本的所有预测结果,形状为(batch_size, horizon, input_feature_dim),其中batch_size为样本批次大小,horizon为预测步长,input_feature_dim为输入特征维度。
    • 迭代预测逻辑:
      • 使用模型对当前输入数据进行预测,得到forecast_result
      • 检查模型输出长度(len_model_output),若为0则抛出异常。
      • 更新输入数据(inputs):将较旧的历史数据移至左侧,并将最新预测结果移至右侧,以准备下一轮预测。
      • 将模型当前输出的预测结果按需填充到forecast_steps数组中,确保每个样本的预测结果按时间步正确排列。
      • 更新step变量,累加已预测的时间步数。
  5. 处理完所有时间步后,将当前样本的预测结果、真实目标值和输入数据添加到相应列表中。

  6. 合并所有样本的预测结果、真实目标值和输入数据:

    • 使用np.concatenate函数,将forecast_settarget_setinput_set列表中的数据沿第一个维度(样本维度)拼接成一个完整的数组。
  7. 返回结果:

    • 返回合并后的预测结果数组、真实目标值数组和输入数据数组。

综上,inference方法通过遍历数据加载器、执行模型推理并收集预测结果、真实目标值和输入数据,最终返回这些数据的完整数组。该方法支持迭代预测(递归使用模型输出作为下一轮输入的一部分)和非迭代预测(单次模型预测即可得到全部结果),适用于不同类型的预测模型。

模型效果

最后,将我们构建好的MLP网络在PEMS数据集进行了准确性测试,算法测试的相关配置如下:

torch.manual_seed(4321)  # reproducible
parser = argparse.ArgumentParser(description='MLP on pems datasets')
### -------  dataset settings --------------
parser.add_argument('--dataset', type=str, default='PEMS08',choices=['PEMS03', 'PEMS04', 'PEMS07', 'PEMS08'])  # sometimes use: PeMS08
parser.add_argument('--norm_method', type=str, default='z_score')
parser.add_argument('--normtype', type=int, default=0)
### -------  input/output length settings --------------
parser.add_argument('--window_size', type=int, default=12)
parser.add_argument('--horizon', type=int, default=12)
parser.add_argument('--train_length', type=float, default=6)
parser.add_argument('--valid_length', type=float, default=2)
parser.add_argument('--test_length', type=float, default=2)
### -------  training settings --------------
parser.add_argument('--use_gpu', type=bool, default=False)
parser.add_argument('--train', type=bool, default=True)
parser.add_argument('--resume', type=bool, default=False)
parser.add_argument('--evaluate', type=bool, default=False)
parser.add_argument('--finetune', type=bool, default=False)
parser.add_argument('--validate_freq', type=int, default=1)
parser.add_argument('--epoch', type=int, default=80)
parser.add_argument('--lr', type=float, default=0.001)
parser.add_argument('--batch_size', type=int, default=8)
parser.add_argument('--optimizer', type=str, default='N')  #
parser.add_argument('--early_stop', type=bool, default=True)
parser.add_argument('--early_stop_step', type=int, default=5)
parser.add_argument('--exponential_decay_step', type=int, default=5)
parser.add_argument('--decay_rate', type=float, default=0.5)
parser.add_argument('--lradj', type=int, default=1, help='adjust learning rate')
parser.add_argument('--weight_decay', type=float, default=1e-5)
parser.add_argument('--model_name', type=str, default='MLP')
### -------  model settings --------------
parser.add_argument('--hidden_sizes', type=list, default=[24, 36, 24])
args = parser.parse_args()

结果如下:

数据集MAEMAPERMSE
PEMS0319.72260.18440331.5506
PEMS0427.24220.16668742.5976
PEMS0728.80020.12248444.1366
PEMS0820.85040.12366832.5307

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/827572.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

指令和界面【Linux】

指令和界面 前言一、指令 vs 界面交互的需求满足需求的第一阶段——指令满足需求的第二阶段-界面时间 二、指令和界面交互区别为什么要学命令行总结 前言 Linux操作系统提供了丰富的命令行界面和图形用户界面工具&#xff0c;用户可以根据自己的需求选择适合的界面进行操作。命…

【好书推荐7】《机器学习平台架构实战》

【好书推荐7】《机器学习平台架构实战》 写在最前面《机器学习平台架构实战》编辑推荐内容简介作者简介目  录前  言本书读者内容介绍充分利用本书下载示例代码文件下载彩色图像本书约定 &#x1f308;你好呀&#xff01;我是 是Yu欸 &#x1f30c; 2024每日百字篆刻时光&…

论文阅读:BEVBert: Multimodal Map Pre-training for Language-guided Navigation

BEVBert&#xff1a;语言引导导航的多模态地图预训练 摘要 现存的问题&#xff1a;目前大多数现有的预训练方法都采用离散的全景图来学习视觉-文本关联。这要求模型隐式关联全景图中不完整、重复的观察结果&#xff0c;这可能会损害智能体的空间理解。 本文解决方案&#xf…

TikTok账号0播放是限流了吗?想要播放破万,试试这些方法!

前言 账号0播放问题&#xff0c;想必困扰着许多的TikTok运营同学。精心制作的短视频发布在TikTok&#xff0c;不是零播放&#xff0c;就是仅自己可见。那么&#xff0c;TikTok账号0播放是不是真的意味着被限流了呢&#xff1f;本篇总结了账号0播放的原因并附上解决方案&#xf…

[Flutter3] Json转dart模型举例

记录一下 Android studio plugin -> FlutterJsonBeanFactory 处理json转dart 模型 案例 json字符串, 一个 response的data返回数据 {"code":1,"msg":"\u64cd\u4f5c\u6210\u529f","data":{"list":{"id":"8…

SwiftUI 5.0(iOS 17.0)触摸反馈“震荡波”与触发器模式趣谈

概览 要想创作出一款精彩绝伦的 App&#xff0c;绚丽的界面和灵动的动画并不是唯一吸引用户的要素。有时我们还希望让用户真切的感受到操作引发的触觉反馈&#xff0c;直击使用者的灵魂。 所幸的是新版 SwiftUI 原生提供了实现触觉震动反馈的机制。在介绍它之后我们还将进一步…

等保测评之主机测评详解(二级)

等保测评之主机测评详解&#xff08;二级&#xff09;服务器——Windows 身份鉴别: 测评项a&#xff09;&#xff1a; a&#xff09;应对登录的用户进行身份标识和鉴别&#xff0c;身份标识具有唯一性&#xff0c;身份鉴别信息具有复杂度要求并定期更换&#xff1b; 整改方…

antd中Upload上传图片宽高限制以及上传文件的格式限制

项目中有一个需求&#xff0c;要上传轮播图&#xff0c;且有尺寸要求&#xff0c;所以就需要在上传图片的时候进行尺寸限制&#xff0c;使用了Upload组件&#xff0c;需要在组件的beforeUpload方法中进行限制。 定义一个上传前的方法&#xff0c;并且添加一个图片尺寸获取的方…

【Redis】Zset 数据类型

文章目录 常用命令zaddzcard & zcountzrange & zrevrangezpopmax & bzpopmaxzpopmin & bzpopminzrank & zrevrankzscore & zremzremrangebyrank & zremrangebyscorezincrby 多个集合间的交互命令交集 & zinterstore并集 & sunionstore 内部…

【声呐仿真】学习记录0.5-配置ssh远程连接docker、在docker中使用nvidia显卡

【声呐仿真】学习记录0.5-配置ssh远程连接docker、在docker中使用nvidia显卡 配置ssh远程连接docker1.端口映射2.配置ssh 在docker中使用nvidia显卡配置CUDA 注意&#xff1a;之前已经创建过容器的&#xff0c;需要打包成镜像&#xff0c;重新创建容器&#xff0c;因为要在创建…

【C++庖丁解牛】C++11---右值引用和移动语义

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 1 左值引用和右值引用2 左…

第一个Spring Boot程序

目录 一、Spring Boot介绍 二、创建Spring Boot项目 1、插件安装&#xff08;专业版不需要&#xff09; 2、创建SpringBoot项目 &#xff08;1&#xff09;这里如果插件下载失败&#xff0c;解决方案&#xff1a; &#xff08;2&#xff09;项目启动失败&#xff0c;解决…

web测试基础知识

目录 web系统的基础 web概念(worldwideweb) 网络结构 发展 架构 B/S C/S P2P 工作原理 静态页面 动态页面 web客户端技术 浏览器的核心--渲染引擎 web服务器端技术 web服务器 应用服务器 集群环境 数据库 案例-URL 协议类型 主机名 端口 IP地址 分类 …

C#开发的全套成熟的LIS系统源码JavaScript+SQLserver 2012区域云LIS系统源码

C#开发的全套成熟的LIS系统源码JavaScriptSQLserver 2012区域云LIS系统源码 医院云LIS系统是一套成熟的实验室信息管理系统&#xff0c;目前已在多家三级级医院应用&#xff0c;并不断更新。云LIS系统是为病人为中心、以业务处理为基础、以提高检验科室管理水平和工作效率为目标…

贪心算法练习day.1

理论基础 贪心算法是一种常见的解决优化问题的方法&#xff0c;其基本思想就是在问题的每个决策阶段&#xff0c;都选择当前看起来最优的选择&#xff0c;即贪心地做出局部的最优决策&#xff0c;以此得到全局的最优解&#xff0c;例如在十张面额不同的钞票&#xff0c;让我们…

润申信息企业标准化管理系统 AddNewsHandler.ashx 任意用户创建漏洞复现

0x01 产品简介 润申信息科技企业标准化管理系统通过给客户提供各种灵活的标准法规信息化管理解决方案,帮助他们实现了高效的标准法规管理,完成个性化标准法规库的信息化建设。 0x02 漏洞概述 润申信息企业标准化管理系统 AddNewsHandler.ashx 接口处存在任意用户创建漏洞,…

Linux安装部署Tomcat

个人简介&#xff1a;Java领域新星创作者&#xff1b;阿里云技术博主、星级博主、专家博主&#xff1b;正在Java学习的路上摸爬滚打&#xff0c;记录学习的过程~ 个人主页&#xff1a;.29.的博客 学习社区&#xff1a;进去逛一逛~ Linux安装部署Tomcat //将tomcat压缩包解压到对…

python识别电脑是windows还是linux

代码实现 import osif os.name nt:print(当前操作系统是 Windows) elif os.name posix:print(当前操作系统是 Linux 或 Unix 类型的系统) else:print(未知的操作系统)

kubernetes中的副本控制器rc(replicationcontrollers)和rs(replicasets)

一、rc控制器replicationcontrollers rc控制器就是控制相同pod副本数量 使用rc控制器资源创建pod&#xff0c;设定创建pod资源的数量 1.1 案例 1.1.1、创建资源清单 [rootmaster rc-demo]# cat rc.yaml apiVersion: v1 kind: ReplicationController metadata: name: rc01 …

个人搭建alist网盘的经验记录备忘

1、搭建宝塔LINUX面板&#xff0c;安装Docker 2、添加仓库 3、从镜像拉取xhofe/alist:latest 4、添加容器 5、新建一个网站&#xff0c;别忘记申请个SSL证书&#xff0c;重要的是反向代理 6、新建个mysql数据库 7、修改alist数据库的链接地址&#xff0c;方便自己备份&a…