这里写目录标题
- 1.算法简介
- 2. RNN算法原理
- 2.1 RNN基本结构介绍
- 2.2 计算流程
- 3.完整训练代码
1.算法简介
参考论文:Elman J L. Finding structure in time[J]. Cognitive science, 1990, 14(2): 179-211.,谷歌被引次数超16000!
说到循环递归结构就不得不提到其鼻祖RNN网络。首先我们先对RNN有个初步的概念:想象一下,你正在阅读一本非常吸引人的小说。每次你翻开新的一页,你的大脑不仅会处理这一页上的内容,还会结合之前读过的所有内容来理解故事的情节、人物关系和背景设定。这就是一种“记忆”和“上下文理解”的过程,因为你大脑中的信息不是孤立的,而是连续且相互关联的。RNN就是模仿了这种“记忆”功能的神经网络。在传统的神经网络中,假设每个输入都是独立的,没有前后联系。但RNN不同,它专门设计用来处理序列数据——也就是那些按顺序排列,其中每个元素都与前后的元素有关联的数据,比如时间序列数据(股价、温度变化)、自然语言(句子、对话)等。在RNN中,有一个特殊的循环连接,让信息能够从前一个时间点传递到下一个时间点。就像你在读书时,上一页的信息会影响你对下一页的理解。RNN的这个特性让它能够记住前面的信息,这样当处理后续数据时,它就能够利用这些记忆来做出更好的决策或预测。有了一个初步概念之后,我们现在来具体的讲一讲RNN到底是什么以及其是怎么运行的。
2. RNN算法原理
RNN的核心思想是利用序列中的时序信息。与传统的前馈神经网络不同,RNN引入了循环连接,使网络能够保留之前时间步的信息。如上面的GIF图所示(引自:RNN):
其中X表示当前时刻的输入,以一个天气预报任务为例,我们将使用过去3天(前天、昨天和今天)的温度(T’)、湿度(H)和风速(W)来预测下一天的温度(T)。对于这个任务而言,我们的输入数据是[ X T t − 2 X_{T_{t-2}} XTt−2, X T t − 1 X_{T_{t-1}} XTt−1, X T t X_{T_{t}} XTt, X H t − 2 X_{H_{t-2}} XHt−2, X H t − 1 X_{H_{t-1}} XHt−1, X H t X_{H_{t}} XHt, X W t − 2 X_{W_{t-2}} XWt−2, X W t − 1 X_{W_{t-1}} XWt−1, X W t X_{W_{t}} XWt]即X是一个1*9的输入。我们的输出是明天的温度 y T t + 1 y_{T_{t+1}} yTt+1。即我们需要建一个RNN模型 f ( x ) f(x) f(x):
y T t + 1 = f ( X T t − 2 , X T t − 1 , X T t , X H t − 2 , X H t − 1 , X H t , X W t − 2 , X W t − 1 , X W t ) y_{T_{t+1}}=f(X_{T_{t-2}},X_{T_{t-1}},X_{T_{t}},X_{H_{t-2}},X_{H_{t-1}},X_{H_{t}},X_{W_{t-2}},X_{W_{t-1}},X_{W_{t}}) yTt+1=f(XTt−2,XTt−1,XTt,XHt−2,XHt−1,XHt,XWt−2,XWt−1,XWt)
那么具体该怎么实现这个模型呢?待我一步步分解:
2.1 RNN基本结构介绍
RNN的核心是其循环结构,它允许信息在序列中传递。对于我们的天气预报任务,RNN的基本结构可以表示为:
h t = t a n h ( W h h ∗ h t − 1 + W x h ∗ x t + b h ) h_t = tanh(W_hh * h_{t-1} + W_xh * x_t + b_h) ht=tanh(Whh∗ht−1+Wxh∗xt+bh)
y t = W h y ∗ h t + b y y_t = W_hy * h_t + b_y yt=Why∗ht+by
其中:
- h t h_t ht 是当前时间步的隐藏状态
- x t x_t xt 是当前时间步的输入
- W h h W_hh Whh, W x h W_xh Wxh, W h y W_hy Why 是权重矩阵,也就是网络的可学习参数,是未知量。
- b h b_h bh, b y b_y by 是偏置项
- tanh 是激活函数
2.2 计算流程
对于我们的天气预报任务,计算流程如下:
a) 初始化隐藏状态 h 0 h_0 h0(通常为零向量)
b) 对于每个时间步 t = 1 to 3:
h t − 2 = h 0 h_{t-2}=h_0 ht−2=h0
h t − 1 = t a n h ( W h h ∗ h t − 2 + W x h ∗ [ X T t − 1 , X H t − 1 , X W t − 1 ] + b h ) h_{t-1} = tanh(W_{h}h * h_{t-2}+ W_xh*[X_{T_{t-1}}, X_{H_{t-1}}, X_{W_{t-1}}]+ b_h) ht−1=tanh(Whh∗ht−2+Wxh∗[XTt−1,XHt−1,XWt−1]+bh)
h t = t a n h ( W h h ∗ h t − 2 + W x h ∗ [ X T t , X H t , X W t ] + b h ) h_{t} = tanh(W_{h}h * h_{t-2}+ W_xh*[X_{T_t}, X_{H_t}, X_{W_t}]+ b_h) ht=tanh(Whh∗ht−2+Wxh∗[XTt,XHt,XWt]+bh)
c) 最后,我们使用最终的隐藏状态来预测明天的温度:
y T t + 1 = W h y ∗ h t + b y y_{T_{t+1}} = W_hy * h_t + b_y yTt+1=Why∗ht+by
然后我手搓了一个RNN代码便于各位理解:
class RNN:def __init__(self, input_size, hidden_size, output_size):self.input_size = input_sizeself.hidden_size = hidden_sizeself.output_size = output_size# 初始化权重self.Wxh = np.random.randn(hidden_size, input_size) * 0.01self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01self.Why = np.random.randn(output_size, hidden_size) * 0.01# 初始化偏置self.bh = np.zeros((hidden_size, 1))self.by = np.zeros((output_size, 1))# 初始化Adam优化器self.optimizer = Adam({'Wxh': self.Wxh, 'Whh': self.Whh, 'Why': self.Why,'bh': self.bh, 'by': self.by})def forward(self, inputs):h = np.zeros((self.hidden_size, 1))self.last_inputs = inputsself.last_hs = {0: h}# 前向传播for t, x in enumerate(inputs):h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)self.last_hs[t + 1] = hy = np.dot(self.Why, h) + self.byreturn y, hdef backward(self, d_y):n = len(self.last_inputs)# 初始化梯度d_Wxh = np.zeros_like(self.Wxh)d_Whh = np.zeros_like(self.Whh)d_Why = np.zeros_like(self.Why)d_bh = np.zeros_like(self.bh)d_by = np.zeros_like(self.by)d_h = np.dot(self.Why.T, d_y)# 反向传播for t in reversed(range(n)):temp = (1 - self.last_hs[t + 1] ** 2) * d_hd_Wxh += np.dot(temp, self.last_inputs[t].T)d_Whh += np.dot(temp, self.last_hs[t].T)d_bh += tempd_h = np.dot(self.Whh.T, temp)d_Why = np.dot(d_y, self.last_hs[n].T)d_by = d_y# 使用Adam优化器更新参数self.optimizer.step({'Wxh': d_Wxh, 'Whh': d_Whh, 'Why': d_Why,'bh': d_bh, 'by': d_by})
3.完整训练代码
import numpy as npclass Adam:def __init__(self, params, lr=0.001, beta1=0.9, beta2=0.999, epsilon=1e-8):self.params = paramsself.lr = lrself.beta1 = beta1self.beta2 = beta2self.epsilon = epsilonself.m = {k: np.zeros_like(v) for k, v in params.items()}self.v = {k: np.zeros_like(v) for k, v in params.items()}self.t = 0def step(self, grads):self.t += 1for k in self.params.keys():self.m[k] = self.beta1 * self.m[k] + (1 - self.beta1) * grads[k]self.v[k] = self.beta2 * self.v[k] + (1 - self.beta2) * (grads[k] ** 2)m_hat = self.m[k] / (1 - self.beta1 ** self.t)v_hat = self.v[k] / (1 - self.beta2 ** self.t)self.params[k] -= self.lr * m_hat / (np.sqrt(v_hat) + self.epsilon)class RNN:def __init__(self, input_size, hidden_size, output_size):self.input_size = input_sizeself.hidden_size = hidden_sizeself.output_size = output_size# 初始化权重self.Wxh = np.random.randn(hidden_size, input_size) * 0.01self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01self.Why = np.random.randn(output_size, hidden_size) * 0.01# 初始化偏置self.bh = np.zeros((hidden_size, 1))self.by = np.zeros((output_size, 1))# 初始化Adam优化器self.optimizer = Adam({'Wxh': self.Wxh, 'Whh': self.Whh, 'Why': self.Why,'bh': self.bh, 'by': self.by})def forward(self, inputs):h = np.zeros((self.hidden_size, 1))self.last_inputs = inputsself.last_hs = {0: h}# 前向传播for t, x in enumerate(inputs):h = np.tanh(np.dot(self.Wxh, x) + np.dot(self.Whh, h) + self.bh)self.last_hs[t + 1] = hy = np.dot(self.Why, h) + self.byreturn y, hdef backward(self, d_y):n = len(self.last_inputs)# 初始化梯度d_Wxh = np.zeros_like(self.Wxh)d_Whh = np.zeros_like(self.Whh)d_Why = np.zeros_like(self.Why)d_bh = np.zeros_like(self.bh)d_by = np.zeros_like(self.by)d_h = np.dot(self.Why.T, d_y)# 反向传播for t in reversed(range(n)):temp = (1 - self.last_hs[t + 1] ** 2) * d_hd_Wxh += np.dot(temp, self.last_inputs[t].T)d_Whh += np.dot(temp, self.last_hs[t].T)d_bh += tempd_h = np.dot(self.Whh.T, temp)d_Why = np.dot(d_y, self.last_hs[n].T)d_by = d_y# 使用Adam优化器更新参数self.optimizer.step({'Wxh': d_Wxh, 'Whh': d_Whh, 'Why': d_Why,'bh': d_bh, 'by': d_by})# 生成模拟数据
def generate_data(num_samples, time_steps):X = np.random.rand(num_samples, time_steps, 3) # 3个特征:温度、湿度、风速y = np.sum(X[:, :, 0], axis=1) / 3 + np.random.normal(0, 0.1, num_samples) # 简单地用平均温度加噪声作为目标return X, y.reshape(-1, 1)# 数据标准化
def normalize(data):return (data - np.mean(data)) / np.std(data)# 生成训练和测试数据
X_train, y_train = generate_data(1000, 3)
X_test, y_test = generate_data(200, 3)# 标准化数据
X_train_norm = normalize(X_train)
y_train_norm = normalize(y_train)
X_test_norm = normalize(X_test)
y_test_norm = normalize(y_test)# 初始化RNN
input_size = 3
hidden_size = 64
output_size = 1
rnn = RNN(input_size, hidden_size, output_size)# 训练函数
def train(rnn, X, y, epochs):for epoch in range(epochs):total_loss = 0for i in range(len(X)):inputs = [X[i][t].reshape(-1, 1) for t in range(3)]target = y[i]# 前向传播output, _ = rnn.forward(inputs)# 计算损失loss = np.sum((output - target) ** 2)total_loss += loss# 反向传播d_y = 2 * (output - target)rnn.backward(d_y)if epoch % 10 == 0:print(f"Epoch {epoch}, Loss: {total_loss / len(X)}")# 初始化RNN
input_size = 3
hidden_size = 64
output_size = 1
rnn = RNN(input_size, hidden_size, output_size)# 训练模型
train(rnn, X_train_norm, y_train_norm, epochs=100)# 评估函数
def evaluate(rnn, X, y):total_loss = 0for i in range(len(X)):inputs = [X[i][t].reshape(-1, 1) for t in range(3)]target = y[i]output, _ = rnn.forward(inputs)loss = np.sum((output - target) ** 2)total_loss += lossreturn total_loss / len(X)# 评估模型
test_loss = evaluate(rnn, X_test_norm, y_test_norm)
print(f"Test Loss: {test_loss}")# 进行预测
def predict(rnn, X):inputs = [X[t].reshape(-1, 1) for t in range(3)]output, _ = rnn.forward(inputs)return output# 示例预测
sample_data = X_test_norm[0]
prediction = predict(rnn, sample_data)
print(f"Sample input: {X_test[0]}")
print(f"Normalized prediction: {prediction[0][0]}")# 反标准化预测结果
mean_y = np.mean(y_train)
std_y = np.std(y_train)
denormalized_prediction = prediction[0][0] * std_y + mean_y
print(f"Denormalized prediction: {denormalized_prediction}")
print(f"Actual value: {y_test[0][0]}")
创作不易,烦请各位观众老爷给个三连,小编在这里跪谢了!