在TensorFlow框架中,实现卷积层(2维)的代码是 tf.keras.layers.Conv2D()。它主要接收如下几个参数,
filters:卷积核的个数,也就是卷积层输出的通道数(沿axis=-1的维度)
kernel_size:一个含有两个整数的元组(kernel_row,kernel_column),它分别规定了卷积核的高和宽。也可以是一个整数,此时卷积核的高和宽是一样的,都等于kernel_size。
strides:一个含有两个整数的元组(row_strides,column_strides),它分别规定了卷积核每次计算后,沿高和宽移动的步长。也可以是一个整数,此时沿高和沿宽的步长是一样的,都等于strides
padding:一个字符串,padding = 'valid' 时不采用padding机制。
data_format:一个字符串。它规定了输入的通道数是否在最后一维,即(batch_size,row_pixels,column_pixels,channels)→默认类型。此外另一种表达类型为(batch_size,channels,row_pixels,column_pixels )
input_shape:当卷积层作为模型的第一层时,需要指定输入的shape(除batch_size轴外)。
假设输入input(batch_size,input_row,input_column,channels)
卷积层会根据指定参数生成一个核kernel(kernel_row,kernel_column,input_channels,filters),其中input_channels = channels。
经过卷积层后会得到一个output(batch_size,output_row,output_column,filters)。在没采用Padding机制的情况下,
在其底层的源码实现中,2维的卷积层主要干了这么几件事,
- 先将kernel(kernel_row,kernel_column,input_channels,filters)重构成一个二维的reshape_kernel(kernel_row * kernel_column * input_channels,filters)
- 同时将input(batch_size,row_pixels,column_pixels,channels)重构成一个input_patchs(batch_size,output_row,output_column,kernel_row * kernel_column * input_channels)。input_patchs在axis=-1上代表着一个个patch,这些patch是卷积核在每步计算上所截取的input部分。
- 最后将input_patchs和reshape_kernel进行"abcd,de -> abce"张量运算,得到卷积结果。之后再加上bias和激活函数,得到卷积层的最终输出。
接着仿照上述步骤,我将实现卷积步长strides = 1,Padding = ' valid ' 时的卷积层的前向传播,
''' 在这里核移动步长为1,且不采用Padding机制。input->(batch_size, row_pixel, column_pixel, channels)kernel->(kernel_row, kernel_column, input_channels, filters)bias->(filters,)return->(batch_size, row_pixel-kernel_row+1, column_pixel-kernel_column+1, filters)'''def ConV_2D(input, kernel, bias):''' 重构kernel '''reshape_kernel = np.empty((kernel.shape[0] * kernel.shape[1] * kernel.shape[2], kernel.shape[3]))i = 0for kernel_row in range(kernel.shape[0]):for kernel_column in range(kernel.shape[1]):for input_channels in range(kernel.shape[2]):reshape_kernel[i] = kernel[kernel_row][kernel_column][input_channels]i += 1''' 从input中提取出一个个patch '''out_row = input.shape[1] - kernel.shape[0] + 1out_column = input.shape[2] - kernel.shape[1] + 1patchs_length = reshape_kernel.shape[0]input_patchs = np.empty((input.shape[0], out_row, out_column, patchs_length))for a in range(input_patchs.shape[0]):for b in range(out_row):for c in range(out_column):j = 0for d in range(kernel.shape[0]):for e in range(kernel.shape[1]):for f in range(kernel.shape[2]):input_patchs[a][b][c][j] = input[a][d+b][e+c][f]j += 1''' 经过前面的张量处理,卷积操作转变成了向量相乘 '''output = tf.add(tf.einsum('abcd, de -> abce', input_patchs, reshape_kernel), bias)return output
除此之外我本人还想出了另外一种实现形式,
def Convolution_2D(input, kernel, bias):output = np.empty((input.shape[0], input.shape[1] - kernel.shape[0] + 1, input.shape[2] - kernel.shape[1] + 1, kernel.shape[-1]))for batch_size in range(output.shape[0]):for row in range(output.shape[1]):for column in range(output.shape[2]):for filters in range(output.shape[3]):sum = 0for kernel_row in range(kernel.shape[0]):for kernel_column in range(kernel.shape[1]):for channels in range(input.shape[-1]):sum += \input[batch_size][kernel_row + row][kernel_column + column][channels] * \kernel[kernel_row][kernel_column][channels][filters]output[batch_size][row][column][filters] = sumoutput = tf.add(output, bias)return output
为了验证前面编写的卷积层前向传播的正确性,我训练了一个简单的卷积神经网络。
import tensorflow as tfinput_shape = (2, 5, 5, 3)class model(tf.keras.Model):def __init__(self, filters, kernel_size, strides):super().__init__()self.Conv_2D = tf.keras.layers.Conv2D(filters=filters,kernel_size=kernel_size,strides=strides,input_shape=input_shape[1:])self.flatten = tf.keras.layers.Flatten()self.output_dense = tf.keras.layers.Dense(units=1)def call(self, x):x = self.Conv_2D(x)x = self.flatten(x)x = self.output_dense(x)return x''' 为了验证试验的方便,整个神经网络中均未使用激活函数 '''
model = model(filters=2, kernel_size=2, strides=1)
model.compile(loss=tf.keras.losses.BinaryCrossentropy(from_logits=True),optimizer="Adam",metrics=['accuracy'])input_train = tf.constant([[[[1, 2, 3], [4, 5, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]]],[[[1, 2, 3], [4, 5, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]],[[1, 1, 1], [2, 2, 6], [4, 5, 6], [4, 5, 6], [4, 5, 6]]]], dtype=tf.float32)output_label = tf.constant([[1], [0]], dtype=tf.float32)tf_callback = tf.keras.callbacks.TensorBoard(log_dir="./logs")
model.fit(x=input_train,y=output_label,epochs=10,callbacks=[tf_callback])tf.saved_model.save(model, 'ConV_2D')
打印模型参数,
import tensorflow as tfsave_path = 'ConV_2D/variables/variables' #reader = tf.train.load_checkpoint(save_path) # 得到CheckpointReader""" 打印Checkpoint中存储的所有参数名和参数shape """
for variable_name, variable_shape in reader.get_variable_to_shape_map().items():print(f'{variable_name} : {variable_shape}')print(reader.get_tensor("variables/0/.ATTRIBUTES/VARIABLE_VALUE")) //Conv_2D_kernel
print(reader.get_tensor("variables/1/.ATTRIBUTES/VARIABLE_VALUE")) //Conv_2D_bias
print(reader.get_tensor("variables/2/.ATTRIBUTES/VARIABLE_VALUE")) //Dense_kernel
print(reader.get_tensor("variables/3/.ATTRIBUTES/VARIABLE_VALUE")) //Dense_bias
编写模型前向传播,
import tensorflow as tf
import numpy as np''' 在这里核移动步长为1,且不采用Padding机制。input->(batch_size, row_pixel, column_pixel, channels)kernel->(kernel_row, kernel_column, input_channels, filters)bias->(filters,)return->(batch_size, row_pixel-kernel_row+1, column_pixel-kernel_column+1, filters)'''def ConV_2D(input, kernel, bias):''' 重构kernel '''reshape_kernel = np.empty((kernel.shape[0] * kernel.shape[1] * kernel.shape[2], kernel.shape[3]))i = 0for kernel_row in range(kernel.shape[0]):for kernel_column in range(kernel.shape[1]):for input_channels in range(kernel.shape[2]):reshape_kernel[i] = kernel[kernel_row][kernel_column][input_channels]i += 1''' 从input中提取出一个个patch '''out_row = input.shape[1] - kernel.shape[0] + 1out_column = input.shape[2] - kernel.shape[1] + 1patchs_length = reshape_kernel.shape[0]input_patchs = np.empty((input.shape[0], out_row, out_column, patchs_length))for a in range(input_patchs.shape[0]):for b in range(out_row):for c in range(out_column):j = 0for d in range(kernel.shape[0]):for e in range(kernel.shape[1]):for f in range(kernel.shape[2]):input_patchs[a][b][c][j] = input[a][d+b][e+c][f]j += 1''' 经过前面的张量处理,卷积操作转变成了向量相乘 '''output = tf.add(tf.einsum('abcd, de -> abce', input_patchs, reshape_kernel), bias)return outputdef Convolution_2D(input, kernel, bias):output = np.empty((input.shape[0], input.shape[1] - kernel.shape[0] + 1, input.shape[2] - kernel.shape[1] + 1, kernel.shape[-1]))for batch_size in range(output.shape[0]):for row in range(output.shape[1]):for column in range(output.shape[2]):for filters in range(output.shape[3]):sum = 0for kernel_row in range(kernel.shape[0]):for kernel_column in range(kernel.shape[1]):for channels in range(input.shape[-1]):sum += \input[batch_size][kernel_row + row][kernel_column + column][channels] * \kernel[kernel_row][kernel_column][channels][filters]output[batch_size][row][column][filters] = sumoutput = tf.add(output, bias)return output'''除了batch_size维度外,将一个四维张量的其它维度铺平'''def Flatten_4D(input):output = np.empty((input.shape[0], input.shape[1] * input.shape[2] * input.shape[3]))for a in range(input.shape[0]):i = 0for b in range(input.shape[1]):for c in range(input.shape[2]):for d in range(input.shape[3]):output[a][i] = input[a][b][c][d]i += 1return output''' ab, bc -> ac a is batch_sizeb is sequence_lengthc is Dense_units '''def Dense_Multiply(input, kernel):output = np.empty((input.shape[0], kernel.shape[1]))for a in range(input.shape[0]):for c in range(kernel.shape[1]):sum = 0for b in range(input.shape[1]):sum += input[a][b] * kernel[b][c]output[a][c] = sumreturn outputclass model():def __init__(self, inputs):self.inputs = inputsdef __call__(self, *args, **kwargs):x = tf.cast(self.inputs, dtype=tf.double)x = Convolution_2D(x, Conv_2D_kernel, Conv_2D_bias)x = Flatten_4D(x)x = tf.add(Dense_Multiply(x, Dense_kernel), Dense_bias)return x
最后验证成功