本文使用 TensorFlow 1.15 环境搭建深度神经网络(PINN)求解一维 Poisson 方程:
− Δ u = f in Ω , u = 0 on Γ : = ∂ Ω . \begin{align} -\Delta u &= f \quad & \text{in } \Omega,\\ u & =0 \quad & \text{on } \Gamma:=\partial \Omega. \end{align} −Δuu=f=0in Ω,on Γ:=∂Ω.
其中 Ω = [ X a , X b ] \Omega = [X_a,X_b] Ω=[Xa,Xb] 是一段区间,一维情况 Δ u = u x x \Delta u = u_{xx} Δu=uxx.
完整代码及其注释如下:
import tensorflow as tf
#print(tf.__version__)
import numpy as np
import math
# def is_log2_close_to_int(n, eps=1e-9):
# log_n = math.log2(n)
# return math.isclose(math.fmod(log_n, 1), 0, abs_tol=eps) # 定义Exact类,用于计算精确解
class Exact: def __init__(self, xa, xb): # 初始化类,接受两个参数xa和xb,代表区间的两个端点 self.xa = xa # 区间左端点 self.xb = xb # 区间右端点 def u_exact(self, X): # 计算并返回精确解u(X) # 这里使用正弦函数作为精确解,其频率与区间长度(xb-xa)有关 u = np.sin(2*np.pi*X / (self.xb - self.xa)) return u # 定义Dataset类,用于生成数据
class Dataset: def __init__(self, x_range, N_res, N_b, xa, xb): # 初始化类,接受多个参数 self.x_range = x_range # 区间范围,例如[0, 1] self.N_res = N_res # 内部节点数,用于构建内部网格 self.N_b = N_b # 边界条件点数(通常这里N_b为2,因为有两个边界) self.xa = xa # 区间左端点 self.xb = xb # 区间右端点 def bc(self, X_b): # 计算并返回边界条件上的精确解 # 创建一个Exact对象,用于计算精确解 U_bc = Exact(self.xa, self.xb) u_bc = U_bc.u_exact(X_b) # 计算边界条件X_b上的精确解 return u_bc def build_data(self): # 构建并返回数据集,包括内部网格点、边界网格点和区间端点 x0 = self.x_range[0] # 区间左端点 x1 = self.x_range[1] # 区间右端点 # 区间端点(最小值和最大值) Xmin = np.array([[x0]]) Xmax = np.array([[x1]]) # 构建内部网格点 # 可以选择使用均匀网格或随机网格 ## For the equation, you can choose using the uniform mesh"""N = self.N_resX_res_input = np.linspace(x0, x1, N).reshape((-1, 1))"""# 这里展示了如何使用随机网格 X_res_input = x0 + (x1-x0)*np.random.rand(self.N_res,1) # 生成N_res个在[x0, x1]区间内的随机点 # 边界网格点(在这个例子中,我们手动指定了边界点) X_b0_input = np.array([[x0]]) # 左边界点 X_b1_input = np.array([[x1]]) # 右边界点 # 返回构建的数据集 return X_res_input, X_b0_input, X_b1_input, Xmin, Xmaxdef calculate_errors(sess, x_res_train, u_pred, x_t, u_e): """ 计算并打印L2范数和最大模范数的误差。 """ u_pred_vals = sess.run(u_pred, feed_dict={x_res_train: x_t}) error_l2 = np.linalg.norm(u_pred_vals - u_e, ord=2) / np.linalg.norm(u_e, ord=2) error_max = np.max(np.abs(u_pred_vals - u_e)) / np.max(np.abs(u_e)) print(f"L2 Error: {error_l2:.8f}") print(f"Max Error: {error_max:.8f}")
神经网络及其训练过程所需要的函数定义:
import tensorflow as tf
import numpy as np
import time
import matplotlib.pyplot as plt class Train: def __init__(self, train_dict): """ 初始化Train类。 Args: train_dict (dict): 用于训练的feed_dict,包含训练数据和其他必要的TensorFlow变量。 """ self.train_dict = train_dict self.step = 0 # 初始化训练步数计数器 def callback(self, loss_): """ 回调函数,用于在LBFGS优化器每次迭代后打印损失。 Args: loss_ (float): 当前迭代的损失值。 """ self.step += 1 if math.isclose(math.fmod(math.log2(self.step), 1), 0, abs_tol=1e-9): print('Loss: %.3e' % (loss_)) def nntrain(self, sess, u_pred, loss, test_dict, u_e, x_t, train_adam, train_lbfgs): """ 执行神经网络训练。 Args: sess (tf.Session): TensorFlow会话。 u_pred (tf.Tensor): 预测值的Tensor。 loss (tf.Tensor): 损失函数的Tensor。 test_dict (dict): 用于测试的feed_dict。 u_e (np.array): 精确解的数值数组。 x_t (np.array): 测试点或网格点的x坐标数组。 train_adam (tf.Operation): Adam优化器的TensorFlow操作。 train_lbfgs (LBFGSOptimizer 或类似): 用于精细调整的LBFGS优化器。 Returns: None """ n = 0 # 初始化迭代计数器 nmax = 10000 # 最大迭代次数 loss_c = 1.0e-4 # 收敛条件:当损失小于此值时停止训练 loss_ = 1.0 # 初始化损失值 while n < nmax and loss_ > loss_c: n += 1 # 使用Adam优化器进行训练 u_, loss_, _ = sess.run([u_pred, loss, train_adam], feed_dict=self.train_dict) # 每2^n步打印一次损失并绘制结果 if math.isclose(math.fmod(math.log2(n), 1), 0, abs_tol=1e-9): print('Steps: %d, loss: %.3e' % (n, loss_)) # 在测试集上评估模型 u_test = sess.run(u_pred, feed_dict=test_dict) # 绘制精确解和预测解的对比图 plt.cla() # 清除之前的图表 plt.plot(x_t, u_e, 'bo', markersize=0.4, label='Exact solution') plt.plot(x_t, u_test, 'rv', markersize=0.4, label='PINN solution') plt.legend() plt.show() plt.pause(0.1) # 暂停一段时间以便观察图表 # 使用LBFGS优化器进行精细调整 train_lbfgs.minimize(sess, feed_dict=self.train_dict, fetches=[loss], loss_callback=self.callback)
import tensorflow as tf
import numpy as np class DNN: """ 深度神经网络类,用于构建和训练神经网络。 """ def __init__(self, layer_size, Xmin, Xmax): """ 初始化DNN类。 Args: layer_size (list): 网络各层的神经元数量。 Xmin (numpy.ndarray): 输入数据的最小值。 Xmax (numpy.ndarray): 输入数据的最大值。 """ self.size = layer_size self.Xmin = Xmin self.Xmax = Xmax def hyper_initial(self): """ 初始化网络的权重和偏置。 Returns: tuple: 包含权重和偏置的列表。 """ L = len(self.size) Weights = [] Biases = [] for l in range(1, L): in_dim = self.size[l-1] out_dim = self.size[l] std = np.sqrt(2/(in_dim + out_dim)) weight = tf.Variable(tf.random_normal(shape=[in_dim, out_dim], stddev=std)) bias = tf.Variable(tf.zeros(shape=[1, out_dim])) Weights.append(weight) Biases.append(bias) return Weights, Biases def fnn(self, X, W, b): """ 前馈神经网络的前向传播。 Args: X (tf.Tensor): 输入数据。 W (list): 权重列表。 b (list): 偏置列表。 Returns: tf.Tensor: 网络的输出。 """ A = 2.0*(X - self.Xmin)/(self.Xmax - self.Xmin) - 1.0 # 归一化和缩放输入 L = len(W) for i in range(L-1): A = tf.tanh(tf.add(tf.matmul(A, W[i]), b[i])) # 应用激活函数和线性变换 u = tf.add(tf.matmul(A, W[-1]), b[-1]) # 输出层 return u def pdenn(self, x, W, b): """ 计算物理驱动的神经网络残差(无边界条件)。 Args: x (tf.Tensor): 输入数据。 W (list): 权重列表。 b (list): 偏置列表。 Returns: tf.Tensor: 残差f。 """ u = self.fnn(x, W, b) u_x = tf.gradients(u, x)[0] u_xx = tf.gradients(u_x, x)[0] rhf = np.pi**2 * tf.sin(np.pi*x) # 右侧手边项 f = -u_xx - rhf # 计算残差 return f def fnn_BC(self, X, W, b): """ 应用边界条件的前馈神经网络。 Args: X (tf.Tensor): 输入数据。 W (list): 权重列表。 b (list): 偏置列表。 Returns: tf.Tensor: 应用边界条件后的输出。 """ Xmax = self.XmaxXmin = self.Xminu = self.fnn(X, W, b)ua = self.fnn(tf.cast(Xmin, tf.float32), W, b)ub = self.fnn(tf.cast(Xmax, tf.float32), W, b)K = tf.subtract(ub, ua)/(Xmax[0,0] - Xmin[0,0])c = tf.subtract(ua, Xmin*K)u = tf.subtract(u, tf.add(tf.matmul(X, K), c))return udef pdenn_BC(self, x, W, b): """ 计算物理驱动的神经网络残差(带边界条件)。 Args: x (tf.Tensor): 输入数据。 W (list): 权重列表。 b (list): 偏置列表。 Returns: tf.Tensor: 残差f。 """ u = self.fnn_BC(x, W, b)u_x = tf.gradients(u, x)[0]u_xx = tf.gradients(u_x, x)[0]rhf = np.pi**2 * tf.sin(np.pi*x)f = -u_xx - rhfreturn fdef fnn_BC2(self, X, W, b): """ 另一种应用边界条件的方法(示例)。 Args: X (tf.Tensor): 输入数据。 W (list): 权重列表。 b (list): 偏置列表。 Returns: tf.Tensor: 应用边界条件后的输出。 """ Xmax = self.XmaxXmin = self.Xminu = self.fnn(X, W, b)u = (X-Xmax)*(X-Xmin) * ureturn udef pdenn_BC2(self, x, W, b): """ 使用另一种边界条件计算物理驱动的神经网络残差。 Args: x (tf.Tensor): 输入数据。 W (list): 权重列表。 b (list): 偏置列表。 Returns: tf.Tensor: 残差f。 """ u = self.fnn_BC2(x, W, b)u_x = tf.gradients(u, x)[0]u_xx = tf.gradients(u_x, x)[0]rhf = np.pi**2 * tf.sin(np.pi*x)f = -u_xx - rhfreturn f
画图:
# 导入必要的库
import tensorflow as tf # TensorFlow库,用于构建和训练神经网络
import numpy as np # NumPy库,用于处理数值数据
import matplotlib.pyplot as plt # Matplotlib库,用于绘图
import os # os库,用于与操作系统交互,如文件路径操作 # 设置保存结果的路径
savepath='./Output'
if not os.path.exists(savepath): os.makedirs(savepath) # 如果路径不存在,则创建该路径 # 定义一个类SavePlot,用于保存预测结果并绘制图形
class SavePlot: def __init__(self, sess, x_range, N, xa, xb): # 初始化函数,设置类的属性 self.x_range = x_range # 预测时x的范围 self.N = N # 预测时x的样本数 self.sess = sess # TensorFlow会话,用于执行TensorFlow操作 self.xa = xa # 精确解计算时可能需要的参数a self.xb = xb # 精确解计算时可能需要的参数b def saveplt(self, u_pred, x_res_train): # 在给定范围内生成均匀的x点 x_t = np.linspace(self.x_range[0], self.x_range[1], self.N).reshape((-1, 1)) # 构建feed_dict,用于在TensorFlow会话中执行u_pred test_dict = {x_res_train: x_t} # 使用TensorFlow会话执行u_pred,得到预测结果 u_test = self.sess.run(u_pred, feed_dict=test_dict) # 将预测结果保存到文件 np.savetxt('./Output/u_pred', u_test, fmt='%e') # 计算并保存精确解 Exact_sln = Exact(self.xa, self.xb) u_e = Exact_sln.u_exact(x_t) np.savetxt('./Output/u_e', u_e, fmt='%e') # 计算并打印预测误差 err_ = np.linalg.norm(u_test - u_e)/np.linalg.norm(u_e) print(err_) # 绘制精确解和预测解的对比图 plt.plot(x_t, u_e, 'bo', markersize=0.4, label='Exact solution') plt.plot(x_t, u_test, 'rv', markersize=0.4, label='PINN solution') plt.legend() # 显示图例 plt.show() # 显示图形 # 注意:代码中注释掉的plt.close()和.close()通常不是必要的,除非在循环中多次调用plt.plot或打开文件需要关闭。
# plt.close()用于关闭当前图形窗口,但在这里调用plt.show()后通常不需要。
下面是主函数:
# 导入必要的库
import os
import tensorflow as tf
import numpy as np
import time
import matplotlib.pyplot as plt
import scipy.io # 设置随机数种子以确保结果的可重复性
np.random.seed(1234)
tf.set_random_seed(1234) # 注意:在TensorFlow 2.x中,应使用tf.random.set_seed def main(): # 定义问题域和分辨率等参数 x_range = [-1.0, 1.0] # 定义x的范围 N_res = 50 # 残差点数量 N_bx = 2 # 边界点数量 xa, xb = x_range # 边界值 # 创建数据集对象 data = Dataset(x_range, N_res, N_bx, xa, xb) # 构建数据 X_res, X_b0, X_b1, Xmin, Xmax = data.build_data() # 定义神经网络结构 layers = [1] + 5*[40] + [1] # 神经网络层数和每层的神经元数 # 创建占位符 x_res_train = tf.placeholder(shape=[None, 1], dtype=tf.float32) x_b0_train = tf.placeholder(shape=[None, 1], dtype=tf.float32) x_b1_train = tf.placeholder(shape=[None, 1], dtype=tf.float32) # 创建PINN对象 pinn = DNN(layers, Xmin, Xmax) W, b = pinn.hyper_initial() # 初始化权重和偏置 # 定义网络输出和物理方程残差 u_pred = pinn.fnn_BC2(x_res_train, W, b) # 使用特定边界条件的网络输出 f_pred = pinn.pdenn_BC2(x_res_train, W, b) # 残差,即物理方程的不满足度 # 定义边界条件输出 u_b0_pred = pinn.fnn(x_b0_train, W, b) u_b1_pred = pinn.fnn(x_b1_train, W, b) # 定义损失函数(这里只考虑了残差的平方和) loss = tf.reduce_mean(tf.square(f_pred)) # 定义优化器 train_adam = tf.train.AdamOptimizer(0.0008).minimize(loss) train_lbfgs = tf.contrib.opt.ScipyOptimizerInterface(loss,method = "L-BFGS-B",options = {'maxiter': 80000,'ftol': 1.0*np.finfo(float).eps}) # 注意:tf.contrib在TensorFlow 2.x中已被移除 # TensorFlow 1.x 会话管理 sess = tf.Session() sess.run(tf.global_variables_initializer()) # 准备训练和测试数据字典 train_dict = {x_res_train: X_res, x_b0_train: X_b0, x_b1_train: X_b1}x_t = np.linspace(xa, xb, 101).reshape((-1, 1))test_dict = {x_res_train: x_t}Exact_sln = Exact(xa, xb) # 真实解的计算对象 u_e = Exact_sln.u_exact(x_t) # 真实解 # 训练模型 Model = Train(train_dict) start_time = time.perf_counter()Model.nntrain(sess, u_pred, loss, test_dict, u_e, x_t, train_adam, train_lbfgs) # 打印训练时间 stop_time = time.perf_counter()print('Duration time is %.3f seconds'%(stop_time - start_time))# 在模型训练结束后,计算并打印误差 calculate_errors(sess, x_res_train, u_pred, x_t, u_e) #Save the dataN_test = 101datasave = SavePlot(sess, x_range, N_test, xa, xb)datasave.saveplt(u_pred, x_res_train)if __name__ == '__main__': main()
我是在 Jupyter 文件上运行的,在其他集成开发环境中也可以类似运行。
运行结果:
效果不错!
下期预告:
- Python 机器学习求解 PDE 学习项目——PINN 求解二维 Poisson 方程
本专栏目标从简单的一维 Poisson 方程,到对流扩散方程,Burges 方程,到二维,三维以及非线性方程,发展方程,积分方程等等,所有文章包含全部可运行代码。请持续关注!