最近在学习神经网络,主要是依据书本《深度学习入门(基于Python的理论与实现)》,现对第5章“误差反向传播法”中的示例程序进行注释修改如下,以备后续查阅。
编程软件用的是Eric7,界面如下:
神经网络的训练过程就是通过寻找损失函数的最小值(或局部最小值)的点,即损失函数偏导(梯度)为0(或接近于0)的点,确定该点处的权重和偏移矩阵,用于后续的预测predict。误差反向传播法提供了一种求解损失函数偏导(梯度)的简便方法,有效降低了程序的计算量。
1. 神经网络训练主程序
现将train_nerualnet.py注释修改如下:
# coding: utf-8
import sys, os
sys.path.append(os.pardir)import numpy as np
from dataset.mnist import load_mnist
from two_layer_net import TwoLayerNet# 读入数据
(x_train, t_train), (x_test, t_test) = load_mnist(normalize=True, one_hot_label=True) # normalizer: 归一化 /= np.float32(255.0) => 0.0~1.0network = TwoLayerNet(input_size=784, hidden_size=50, output_size=10) # 28*28=784iters_num = 10000 #迭代次数
train_size = x_train.shape[0] #样本数量
print("Train_size:", train_size)
batch_size = 100 #批量大小
learning_rate = 0.1 #学习率train_loss_list = []
train_acc_list = []
test_acc_list = []iter_per_epoch = max(train_size / batch_size, 1)
print("Iters/epoch:", iter_per_epoch)
print("epochs:", int(iters_num/iter_per_epoch)+1)
epochs = 0for i in range(iters_num):#用余数表示每个epoch内mini-batches的序号 batch_index = i % iter_per_epoch#每经历一个epoch,更新一次打乱的数据索引shuffled_indicesif batch_index == 0:shuffled_indices = np.random.permutation(train_size)#将被打乱样本的索引,按顺序取样,一次batch_size个batch_indices = shuffled_indices[int(batch_index * batch_size) : int((batch_index + 1)*batch_size)]#batch_mask = np.random.choice(train_size, batch_size)x_batch = x_train[batch_indices]t_batch = t_train[batch_indices]# 梯度#grad = network.numerical_gradient(x_batch, t_batch)grad = network.gradient(x_batch, t_batch)# 更新for key in ('W1', 'b1', 'W2', 'b2'):network.params[key] -= learning_rate * grad[key]loss = network.loss(x_batch, t_batch)train_loss_list.append(loss)if i % iter_per_epoch == 0:train_acc = network.accuracy(x_train, t_train)test_acc = network.accuracy(x_test, t_test)train_acc_list.append(train_acc)test_acc_list.append(test_acc)print("epochs:", epochs)print("训练和测试精度:", train_acc, test_acc)epochs += 1
其中:
- batch_size为神经网络一次迭代计算使用的样本数量(mini-batches一次的数量),迭代的目的是找到损失函数对权重偏导的局部0值点(损失函数的局部最小值点)。
- 一次epoch是指将所有数据训练遍历一次
- iter_per_epoch = train_size / batch_size 表示经过一个epoch需要迭代的次数
- 完成所有迭代,需要经过的epochs = iters_num / iter_per_epoch
在原始程序中,np.random.choice(train_size, batch_size) 无法保证一个epoch遍历了所有训练数据,不重复或不遗漏,因为每次采样都是独立且有放回的(默认行为)。可以考虑如下方法:每个 epoch 打乱数据后按顺序分批次
核心思路:
在每个 epoch 开始时,将训练数据的索引打乱
shuffled_indices = np.random.permutation(train_size)
然后按顺序分割成多个 batch。这种方式既保证了随机性,又确保每个样本被使用且仅使用一次。
# 按顺序遍历所有 batch
for i in range(0, train_size, batch_size):batch_indices = shuffled_indices[i:i + batch_size]# 用 batch_indices 获取对应的数据,进行训练
程序中用到np.random的几个随机生产数据的方法,介绍如下:
# 生成 3 个 [0, 10) 之间的整数
arr = np.random.randint(0, 10, size=3)# 生成一个 2x3 的数组,元素在 [0.0, 1.0) 之间
arr = np.random.rand(2, 3)# 生成一个 2x2 的数组,元素在 [1.0, 5.0) 之间
arr = np.random.uniform(1.0, 5.0, size=(2, 2))# 生成 2x3 的正态分布矩阵,均值 0,方差 1
normal_data = np.random.randn(2, 3)
2. mnist数据集处理
上述神经网络训练用到了mnist数据集,数据集下载、预处理程序mnist.py内容如下:
# coding: utf-8
try:import urllib.request
except ImportError:raise ImportError('You should use Python 3.x')
import os.path
import gzip
import pickle
import os
import numpy as npurl_base = 'https://ossci-datasets.s3.amazonaws.com/mnist/' # mirror site
key_file = { #文件字典,包括train和test所需的图片和标签'train_img':'train-images-idx3-ubyte.gz','train_label':'train-labels-idx1-ubyte.gz','test_img':'t10k-images-idx3-ubyte.gz','test_label':'t10k-labels-idx1-ubyte.gz'
}dataset_dir = os.path.dirname(os.path.abspath(__file__)) #当前文件绝对路径
save_file = dataset_dir + "/mnist.pkl" #保存文件train_num = 60000
test_num = 10000
img_dim = (1, 28, 28)
img_size = 784def _download(file_name):file_path = dataset_dir + "/" + file_nameif os.path.exists(file_path):returnprint("Downloading " + file_name + " ... ")urllib.request.urlretrieve(url_base + file_name, file_path) #根据文件名从下载目录下载,保存到dataset目录下print("Done")def download_mnist():for v in key_file.values():_download(v)def _load_label(file_name): file_path = dataset_dir + "/" + file_nameprint("Converting " + file_name + " to NumPy Array ...") #将文件转换成numpy数组with gzip.open(file_path, 'rb') as f:labels = np.frombuffer(f.read(), np.uint8, offset=8)print("Done")return labelsdef _load_img(file_name): file_path = dataset_dir + "/" + file_nameprint("Converting " + file_name + " to NumPy Array ...") with gzip.open(file_path, 'rb') as f:data = np.frombuffer(f.read(), np.uint8, offset=16)data = data.reshape(-1, img_size)print("Done")return datadef _convert_numpy():dataset = {}dataset['train_img'] = _load_img(key_file['train_img'])dataset['train_label'] = _load_label(key_file['train_label']) dataset['test_img'] = _load_img(key_file['test_img'])dataset['test_label'] = _load_label(key_file['test_label'])return datasetdef init_mnist(): download_mnist()dataset = _convert_numpy()print("Creating pickle file ...")with open(save_file, 'wb') as f:# Pickle the list using the highest protocol available.pickle.dump(dataset, f, -1)print("Done!")def _change_one_hot_label(X):T = np.zeros((X.size, 10)) #(x.size, 10)的0矩阵,x.size为图片的数量for idx, row in enumerate(T): #idx为序号,row为一维数组row[X[idx]] = 1 #图片对应的label是几,对应的数组的第几位就为1return Tdef load_mnist(normalize=True, flatten=True, one_hot_label=False):if not os.path.exists(save_file):init_mnist()with open(save_file, 'rb') as f:dataset = pickle.load(f)if normalize:'''将图像数据进行归一化处理data = np.random.randint(0, 256, size=(4, 16), dtype=np.uint8) #四张单通道照片,照片尺寸4x4data.astype(np.float32)'''for key in ('train_img', 'test_img'):#文件存储格式为np.uint8,转换数据类型为np.float32后进行归一化处理,uint8除以255.0默认格式为float64,会增加内存占用dataset[key] = dataset[key].astype(np.float32)dataset[key] /= np.float32(255.0)if one_hot_label:dataset['train_label'] = _change_one_hot_label(dataset['train_label'])dataset['test_label'] = _change_one_hot_label(dataset['test_label'])if not flatten:'''该参数设置为False,则输人图像为1x28x28的三维数组;若设置为True,则输入图像会保存为由784个元素构成的一维数组。'''for key in ('train_img', 'test_img'):#reshape方法动态调整数组形状时,利用-1来实现自动计算某个维度的大小,第二个1表示单通道(灰度图),正常图片为rgb三通道dataset[key] = dataset[key].reshape(-1, 1, 28, 28)return (dataset['train_img'], dataset['train_label']), (dataset['test_img'], dataset['test_label']) if __name__ == '__main__':init_mnist()
读入MNIST数据集参数
- normalize : 将图像的像素值正规化为0.0~1.0
- one_hot_label : one_hot_label为True的情况下,标签作为one-hot数组返回,one-hot数组格式为[0,0,1,0,0,0,0,0,0,0],此标签表示正确的数为第三个数,即为2。
- flatten : 是否将图像展开为一维数组
返回值Returns格式:
(训练图像, 训练标签), (测试图像, 测试标签)
上述程序中_load_label(file_name)是读取标签数据,offset=8是因为前8字节包含了魔法数字和标签的数量信息(4字节魔法数字 + 4字节图像数量)。
- 魔术数字(Magic Number):一个4字节的无符号整数,用于标识文件类型和版本。对于标签数据,训练集和测试集的魔术数字都是0x00000801。
- 数量(Number of Items):一个4字节的无符号整数,表示标签的数量。
- 标签数据:紧随其后的所有字节都是标签数据,每个字节对应一个图像的标签(即数字0到9)。
上述程序中,_load_img(file_name)是读取图像数据,offset=16是因为前16字节包含了魔法数字和图像的尺寸信息(4字节魔法数字 + 4字节图像数量 + 4字节行数 + 4字节列数)。因此,从第17个字节开始才是实际的图像数据。 第17个字节开始每个图像都是28x28=784字节的连续像素值。
- 魔术数字(Magic Number):一个4字节的无符号整数(通常是32位),用于标识文件类型和版本。
- 对于图像数据,训练集的魔术数字是0x00000803,测试集的魔术数字是0x00000801。
- 数量(Number of Images):一个4字节的无符号整数,表示文件中图像的数量。
- 行数(Number of Rows):一个4字节的无符号整数,表示图像的高度(通常是28)。
- 列数(Number of Columns):一个4字节的无符号整数,表示图像的宽度(通常是28)。
- 图像数据:紧随其后的所有字节都是图像数据,每个图像的像素值按行优先顺序存储。像素值是灰度值,范围从0到255,一个像素点占用一个字节。
上述程序中init_mnist()是初始化mnist数据集:下载、转换图像和标签数据,并保存为pickle文件,pkl格式文件,是Python中一种用于序列化对象的文件格式,全称是pickle。它可以将Python中的任意对象转换为一种可以保存到磁盘上或通过网络传输的格式,然后再将这些对象从磁盘上读取出来或者从网络上接收过来,重新还原为原来的Python对象。这种能力使得pkl格式文件在Python编程中非常有用,尤其是在需要保存和加载复杂数据结构或自定义对象时。pkl格式文件的使用依赖于Python的pickle模块。pickle模块提供了两个主要的函数:
- pickle.dump([,protocal])用于将Python对象序列化并保存到文件中。protocal:如果该项省略,则默认为0。如果为负值或HIGHEST_PROTOCOL,则使用最高的协议版本。
- pickle.load()用于从文件中读取序列化的对象并还原为原来的Python对象。二进制读取模式(‘rb’),二进制写入模式(‘wb’)。
3. 神经网络隐藏层
神经网络隐藏层定义,two_layer_net.py注释如下:
# coding: utf-8
import sys, os
sys.path.append(os.pardir) # 为了导入父目录的文件而进行的设定
import numpy as np
from common.layers import *
from common.gradient import numerical_gradient
from collections import OrderedDictclass TwoLayerNet:def __init__(self, input_size, hidden_size, output_size, weight_init_std = 0.01):# 初始化权重self.params = {}self.params['W1'] = weight_init_std * np.random.randn(input_size, hidden_size) # np.random.randn平均值为0、均方差为1的正态分布数据self.params['b1'] = np.zeros(hidden_size)self.params['W2'] = weight_init_std * np.random.randn(hidden_size, output_size) self.params['b2'] = np.zeros(output_size)# 生成层self.layers = OrderedDict()#仿射层1, Affine1, a1 = x@W1 + b1self.layers['Affine1'] = Affine(self.params['W1'], self.params['b1'])#激活层1, Relu1, z1 = Relu(a1)self.layers['Relu1'] = Relu()#仿射层2, Affine2, a2 = z1@W2 + b2self.layers['Affine2'] = Affine(self.params['W2'], self.params['b2'])#最后一层, SoftmaxWithLoss, y = softmax(a2), loss = cross_entropy_error(y, t)self.lastLayer = SoftmaxWithLoss()def predict(self, x):# 正向传播,上一层的输出为下一层的输入。self.layers按先后顺序依次为各生成层for layer in self.layers.values(): # self.layers.values()是按顺序排列的值,self.layers.keys()是按顺序排列的键x = layer.forward(x)return x# x:输入数据, t:监督数据def loss(self, x, t):y = self.predict(x)return self.lastLayer.forward(y, t) #返回softmax-with-loss的值def accuracy(self, x, t):y = self.predict(x)y = np.argmax(y, axis=1) #每组数据输出值(每一行)中最大值(即概率最大值)的索引号,二维向量变一维数组if t.ndim != 1 : t = np.argmax(t, axis=1) # 如果t不是一维数组,将其转换成非one-hot格式# y == t 返回一个bool数组,y和t相同位置的元素如果相等,bool数组同位置的值为True,其它为False# np.sum(y == t)返回bool数组中True的数量。x.shape[0]为batch_sizeaccuracy = np.sum(y == t) / float(x.shape[0])return accuracy# x:输入数据, t:监督数据def numerical_gradient(self, x, t):loss_W = lambda W: self.loss(x, t)grads = {}grads['W1'] = numerical_gradient(loss_W, self.params['W1'])grads['b1'] = numerical_gradient(loss_W, self.params['b1'])grads['W2'] = numerical_gradient(loss_W, self.params['W2'])grads['b2'] = numerical_gradient(loss_W, self.params['b2'])return gradsdef gradient(self, x, t):# forwardself.loss(x, t)# backwarddout = 1dout = self.lastLayer.backward(dout)layers = list(self.layers.values())layers.reverse() #list倒序for layer in layers:dout = layer.backward(dout) #下一层backward的输出dout作为上一层backward的输入,同时计算各Affine层的dW和db# 设定grads = {}grads['W1'], grads['b1'] = self.layers['Affine1'].dW, self.layers['Affine1'].dbgrads['W2'], grads['b2'] = self.layers['Affine2'].dW, self.layers['Affine2'].dbreturn grads
其中gradient函数就是反向求偏导(梯度)函数,下一层的输出(梯度)作为上一层的输入。
4. 各层调用的激活函数
神经网络的隐藏层会调用了不同的激活函数对输出进行处理,layers.py定义各隐藏层及激活函数,注释如下:
# coding: utf-8
import numpy as np
from common.functions import *
from common.util import im2col, col2imclass Relu:def __init__(self):self.mask = Nonedef forward(self, x):self.mask = (x <= 0) # bool向量,元素≤0的位置为True,其余为Falseout = x.copy() # 如果out = x,则out为x的内存地址引用,修改x也会改变out值 out[self.mask] = 0 # out向量中x元素≤0的位置(为True位置)的值为0return outdef backward(self, dout):dout[self.mask] = 0 # dx = ∂L/∂x = ∂L/∂out * ∂out/∂x = dout * ∂out/∂x,out为零的位置∂out/∂x为0,对应位置的dx=0dx = doutreturn dxclass Sigmoid: # sigmoid(x) = 1 / ( 1 + np.exp(-x))def __init__(self):self.out = Nonedef forward(self, x):out = sigmoid(x)self.out = out #定义全局变量self.outreturn outdef backward(self, dout):# dx = ∂L/∂x = ∂L/∂out * ∂out/∂x = dout * ∂out/∂x = dout*(1-out)*outdx = dout * (1.0 - self.out) * self.outreturn dxclass Affine:'''定义Affine层,仿射层,即:out = X@W + b__init__(self, W, b): 初始化,输入参数W,b'''def __init__(self, W, b):self.W =Wself.b = bself.x = Noneself.original_x_shape = None# 权重和偏置参数的导数self.dW = Noneself.db = Nonedef forward(self, x):'''forward(self, x): 正向传递,输入参数x首先将x转换成二维张量1. 若输入是多维数据(如图像的卷积层输出,形状为 (batch, C, H, W)),该操作将其展平为二维矩阵 (batch, C*H*W),使每个样本变为一维向量,适配全连接的矩阵乘法 np.dot(x, W)。2. 若输入已是二维(如普通全连接层的输出),则保持原状。'''self.original_x_shape = x.shapex = x.reshape(x.shape[0], -1)self.x = xout = np.dot(self.x, self.W) + self.breturn outdef backward(self, dout):'''out = X@W + bbackward(self, dout): 反向传递,输入参数dout,dout = ∂L/∂out,故:self.dW = ∂L/∂out * ∂out/∂W = (self.x.T)@doutself.db = ∂L/∂out * ∂out/∂b = np.sum(dout, axis=0)self.dx = ∂L/∂out * ∂out/∂x = dout@(self.W.T)'''dx = np.dot(dout, self.W.T)self.dW = np.dot(self.x.T, dout)self.db = np.sum(dout, axis=0)dx = dx.reshape(*self.original_x_shape) # 还原输入数据的形状(对应张量)return dxclass SoftmaxWithLoss:'''loss = loss(y,t) 交叉熵损失函数y = softmax(x)'''def __init__(self):self.loss = Noneself.y = None # softmax的输出self.t = None # 监督数据def forward(self, x, t):self.t = tself.y = softmax(x)self.loss = cross_entropy_error(self.y, self.t)return self.lossdef backward(self, dout=1):batch_size = self.t.shape[0]if self.t.size == self.y.size: # 当数据t是one-hot-vector的情况dx = (self.y - self.t) / batch_sizeelse: #此时x.shape为(batch_size,)dx = self.y.copy()#dx = y - t,self.t为一维数组,其第i个元素即为第i组数据的正确值#dx只需在输出y中,把第i组数据对应的输出(第i行)的第self.t[i]个值减去1即可。'''| 0, self.t[0] || 1, self.t[1] |dx | 2, self.t[2] | -= 1,第一列由np.arange(batch_size)生成,n = batch_size - 1 | ...... || n, self.t[n] |如果将self.t转换成one-hot格式,self.t在上述dx对应位置的值为1,其它都为0,即:temp_t = np.zeros_like(y)temp_t[np.arange(batch_size), self.t] = 1'''dx[np.arange(batch_size), self.t] -= 1dx = dx / batch_sizereturn dxclass Dropout:"""http://arxiv.org/abs/1207.0580"""def __init__(self, dropout_ratio=0.5):self.dropout_ratio = dropout_ratioself.mask = Nonedef forward(self, x, train_flg=True):if train_flg:self.mask = np.random.rand(*x.shape) > self.dropout_ratioreturn x * self.maskelse:return x * (1.0 - self.dropout_ratio)def backward(self, dout):return dout * self.maskclass BatchNormalization:"""http://arxiv.org/abs/1502.03167"""def __init__(self, gamma, beta, momentum=0.9, running_mean=None, running_var=None):self.gamma = gammaself.beta = betaself.momentum = momentumself.input_shape = None # Conv层的情况下为4维,全连接层的情况下为2维 # 测试时使用的平均值和方差self.running_mean = running_meanself.running_var = running_var # backward时使用的中间数据self.batch_size = Noneself.xc = Noneself.std = Noneself.dgamma = Noneself.dbeta = Nonedef forward(self, x, train_flg=True):self.input_shape = x.shapeif x.ndim != 2:N, C, H, W = x.shapex = x.reshape(N, -1)out = self.__forward(x, train_flg)return out.reshape(*self.input_shape)def __forward(self, x, train_flg):if self.running_mean is None:N, D = x.shapeself.running_mean = np.zeros(D)self.running_var = np.zeros(D)if train_flg:mu = x.mean(axis=0)xc = x - muvar = np.mean(xc**2, axis=0)std = np.sqrt(var + 10e-7)xn = xc / stdself.batch_size = x.shape[0]self.xc = xcself.xn = xnself.std = stdself.running_mean = self.momentum * self.running_mean + (1-self.momentum) * muself.running_var = self.momentum * self.running_var + (1-self.momentum) * var else:xc = x - self.running_meanxn = xc / ((np.sqrt(self.running_var + 10e-7)))out = self.gamma * xn + self.beta return outdef backward(self, dout):if dout.ndim != 2:N, C, H, W = dout.shapedout = dout.reshape(N, -1)dx = self.__backward(dout)dx = dx.reshape(*self.input_shape)return dxdef __backward(self, dout):dbeta = dout.sum(axis=0)dgamma = np.sum(self.xn * dout, axis=0)dxn = self.gamma * doutdxc = dxn / self.stddstd = -np.sum((dxn * self.xc) / (self.std * self.std), axis=0)dvar = 0.5 * dstd / self.stddxc += (2.0 / self.batch_size) * self.xc * dvardmu = np.sum(dxc, axis=0)dx = dxc - dmu / self.batch_sizeself.dgamma = dgammaself.dbeta = dbetareturn dxclass Convolution:def __init__(self, W, b, stride=1, pad=0):self.W = W #卷积层过滤器,W.shape: (Filter_Num, Channel, Filter_Height, Filter_Width)self.b = b #卷积层偏置,b.shape: (Filter_Num,)self.stride = strideself.pad = pad# 中间数据(backward时使用)self.x = None self.col = Noneself.col_W = None# 权重和偏置参数的梯度self.dW = Noneself.db = Nonedef forward(self, x):FN, C, FH, FW = self.W.shapeN, C, H, W = x.shapeout_h = 1 + int((H + 2*self.pad - FH) / self.stride) #列窗口数,卷积层输出高度,Out_Height, OHout_w = 1 + int((W + 2*self.pad - FW) / self.stride) #行窗口数,卷积层输出宽度,Out_Width, OWcol = im2col(x, FH, FW, self.stride, self.pad) #col.shape: (N×OH×OW, C×FH×FW),卷积层对每个窗口与过滤器求乘积,然后所有通道求和col_W = self.W.reshape(FN, -1).T #col_W.shape: (C×FH×FW, FN)out = np.dot(col, col_W) + self.b #np.dot(col, col_W).shape: (N×OH×OW, FN), self.b.shape: (FN)out = out.reshape(N, out_h, out_w, -1).transpose(0, 3, 1, 2) #(N, OH, OW, FN) ==> (N, FN, OH, OW)self.x = xself.col = colself.col_W = col_Wreturn outdef backward(self, dout):FN, C, FH, FW = self.W.shape #过滤器shapedout = dout.transpose(0,2,3,1).reshape(-1, FN) #dout.shape:(N, FN, OH, OW) ==> (N, OH, OW, FN) ==> (N×OH×OW, FN)self.db = np.sum(dout, axis=0)self.dW = np.dot(self.col.T, dout) # Out = self.col @ self.col_W dW = self.col.T @ doutself.dW = self.dW.transpose(1, 0).reshape(FN, C, FH, FW) # dW.shape: (C×FH×FW, FN) ==> (FN, C×FH×FW) ==> (FN, C, FH, FW)dcol = np.dot(dout, self.col_W.T) # Out = self.col @ self.col_W dcol = dout @ self.col_W.T (N×OH×OW, FN) @ (FN, C×FH×FW) ==> (N×OH×OW, C×FH×FW)dx = col2im(dcol, self.x.shape, FH, FW, self.stride, self.pad) # dcol ==> dx: (N×OH×OW, C×FH×FW) ==> (N, C, H, W)return dxclass Pooling:def __init__(self, pool_h, pool_w, stride=1, pad=0):self.pool_h = pool_hself.pool_w = pool_wself.stride = strideself.pad = padself.x = Noneself.arg_max = Nonedef forward(self, x):N, C, H, W = x.shapeout_h = int(1 + (H - self.pool_h) / self.stride)out_w = int(1 + (W - self.pool_w) / self.stride)col = im2col(x, self.pool_h, self.pool_w, self.stride, self.pad)col = col.reshape(-1, self.pool_h*self.pool_w)arg_max = np.argmax(col, axis=1)out = np.max(col, axis=1)out = out.reshape(N, out_h, out_w, C).transpose(0, 3, 1, 2)self.x = xself.arg_max = arg_maxreturn outdef backward(self, dout):dout = dout.transpose(0, 2, 3, 1)pool_size = self.pool_h * self.pool_wdmax = np.zeros((dout.size, pool_size))dmax[np.arange(self.arg_max.size), self.arg_max.flatten()] = dout.flatten()dmax = dmax.reshape(dout.shape + (pool_size,)) dcol = dmax.reshape(dmax.shape[0] * dmax.shape[1] * dmax.shape[2], -1)dx = col2im(dcol, self.x.shape, self.pool_h, self.pool_w, self.stride, self.pad)return dx