import osimport torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoaderfrom torchvision import transforms, datasets
from torchvision.utils import save_imagefrom tqdm import tqdmclass VAE(nn.Module): # 定义VAE模型def __init__(self, img_size, latent_dim): # 初始化方法super(VAE, self).__init__() # 继承初始化方法self.in_channel, self.img_h, self.img_w = img_size # 由输入图片形状得到图片通道数C、图片高度H、图片宽度Wself.h = self.img_h // 32 # 经过5次卷积后,最终特征层高度变为原图片高度的1/32self.w = self.img_w // 32 # 经过5次卷积后,最终特征层宽度变为原图片高度的1/32hw = self.h * self.w # 最终特征层的尺寸hxwself.latent_dim = latent_dim # 采样变量Z的长度self.hidden_dims = [32, 64, 128, 256, 512] # 特征层通道数列表# 开始构建编码器Encoderlayers = [] # 用于存放模型结构for hidden_dim in self.hidden_dims: # 循环特征层通道数列表layers += [nn.Conv2d(self.in_channel, hidden_dim, 3, 2, 1), # 添加convnn.BatchNorm2d(hidden_dim), # 添加bnnn.LeakyReLU()] # 添加leakyreluself.in_channel = hidden_dim # 将下次循环的输入通道数设为本次循环的输出通道数self.encoder = nn.Sequential(*layers) # 解码器Encoder模型结构self.fc_mu = nn.Linear(self.hidden_dims[-1] * hw, self.latent_dim) # linaer,将特征向量转化为分布均值muself.fc_var = nn.Linear(self.hidden_dims[-1] * hw, self.latent_dim) # linear,将特征向量转化为分布方差的对数log(var)# 开始构建解码器Decoderlayers = [] # 用于存放模型结构self.decoder_input = nn.Linear(self.latent_dim, self.hidden_dims[-1] * hw) # linaer,将采样变量Z转化为特征向量self.hidden_dims.reverse() # 倒序特征层通道数列表for i in range(len(self.hidden_dims) - 1): # 循环特征层通道数列表layers += [nn.ConvTranspose2d(self.hidden_dims[i], self.hidden_dims[i + 1], 3, 2, 1, 1), # 添加transconvnn.BatchNorm2d(self.hidden_dims[i + 1]), # 添加bnnn.LeakyReLU()] # 添加leakyrelulayers += [nn.ConvTranspose2d(self.hidden_dims[-1], self.hidden_dims[-1], 3, 2, 1, 1), # 添加transconvnn.BatchNorm2d(self.hidden_dims[-1]), # 添加bnnn.LeakyReLU(), # 添加leakyrelunn.Conv2d(self.hidden_dims[-1], img_size[0], 3, 1, 1), # 添加convnn.Tanh()] # 添加tanhself.decoder = nn.Sequential(*layers) # 编码器Decoder模型结构def encode(self, x): # 定义编码过程result = self.encoder(x) # Encoder结构,(n,1,32,32)-->(n,512,1,1)result = torch.flatten(result, 1) # 将特征层转化为特征向量,(n,512,1,1)-->(n,512)mu = self.fc_mu(result) # 计算分布均值mu,(n,512)-->(n,128)log_var = self.fc_var(result) # 计算分布方差的对数log(var),(n,512)-->(n,128)return [mu, log_var] # 返回分布的均值和方差对数def decode(self, z): # 定义解码过程y = self.decoder_input(z).view(-1, self.hidden_dims[0], self.h,self.w) # 将采样变量Z转化为特征向量,再转化为特征层,(n,128)-->(n,512)-->(n,512,1,1)y = self.decoder(y) # decoder结构,(n,512,1,1)-->(n,1,32,32)return y # 返回生成样本Ydef reparameterize(self, mu, log_var): # 重参数技巧std = torch.exp(0.5 * log_var) # 分布标准差stdeps = torch.randn_like(std) # 从标准正态分布中采样,(n,128)return mu + eps * std # 返回对应正态分布中的采样值def forward(self, x): # 前传函数mu, log_var = self.encode(x) # 经过编码过程,得到分布的均值mu和方差对数log_varz = self.reparameterize(mu, log_var) # 经过重参数技巧,得到分布采样变量Zy = self.decode(z) # 经过解码过程,得到生成样本Yreturn [y, x, mu, log_var] # 返回生成样本Y,输入样本X,分布均值mu,分布方差对数log_vardef sample(self, n, cuda): # 定义生成过程z = torch.randn(n, self.latent_dim) # 从标准正态分布中采样得到n个采样变量Z,长度为latent_dimif cuda: # 如果使用cudaz = z.cuda() # 将采样变量Z加载到GPUimages = self.decode(z) # 经过解码过程,得到生成样本Yreturn images # 返回生成样本Ydef loss_fn(y, x, mu, log_var): # 定义损失函数recons_loss = F.mse_loss(y, x) # 重建损失,MSEkld_loss = torch.mean(0.5 * torch.sum(mu ** 2 + torch.exp(log_var) - log_var - 1, 1), 0) # 分布损失,正态分布与标准正态分布的KL散度return recons_loss + w * kld_loss # 最终损失由两部分组成,其中分布损失需要乘上一个系数wif __name__ == "__main__":total_epochs = 100 # epochsbatch_size = 64 # batch sizelr = 5e-4 # lrw = 0.00025 # kld_loss的系数wnum_workers = 8 # 数据加载线程数image_size = 32 # 图片尺寸image_channel = 1 # 图片通道latent_dim = 128 # 采样变量Z长度sample_images_dir = "sample_images" # 生成样本示例存放路径train_dataset_dir = "../dataset/mnist" # 训练样本存放路径os.makedirs(sample_images_dir, exist_ok=True) # 创建生成样本示例存放路径os.makedirs(train_dataset_dir, exist_ok=True) # 创建训练样本存放路径cuda = True if torch.cuda.is_available() else False # 如果cuda可用,则使用cudaimg_size = (image_channel, image_size, image_size) # 输入样本形状(1,32,32)vae = VAE(img_size, latent_dim) # 实例化VAE模型,传入输入样本形状与采样变量长度if cuda: # 如果使用cudavae = vae.cuda() # 将模型加载到GPU# dataset and dataloadertransform = transforms.Compose( # 图片预处理方法[transforms.Resize(image_size), # 图片resize,(28x28)-->(32,32)transforms.ToTensor(), # 转化为tensortransforms.Normalize([0.5], [0.5])] # 标准化)dataloader = DataLoader( # 定义dataloaderdataset=datasets.MNIST(root=train_dataset_dir, # 使用mnist数据集,选择数据路径train=True, # 使用训练集transform=transform, # 图片预处理download=True), # 自动下载batch_size=batch_size, # batch sizenum_workers=num_workers, # 数据加载线程数shuffle=True # 打乱数据)# optimizeroptimizer = torch.optim.Adam(vae.parameters(), lr=lr) # 使用Adam优化器# train loopfor epoch in range(total_epochs): # 循环epochtotal_loss = 0 # 记录总损失pbar = tqdm(total=len(dataloader), desc=f"Epoch {epoch + 1}/{total_epochs}", postfix=dict,miniters=0.3) # 设置当前epoch显示进度for i, (img, _) in enumerate(dataloader): # 循环iterif cuda: # 如果使用cudaimg = img.cuda() # 将训练数据加载到GPUvae.train() # 模型开始训练optimizer.zero_grad() # 模型清零梯度y, x, mu, log_var = vae(img) # 输入训练样本X,得到生成样本Y,输入样本X,分布均值mu,分布方差对数log_varloss = loss_fn(y, x, mu, log_var) # 计算lossloss.backward() # 反向传播,计算当前梯度optimizer.step() # 根据梯度,更新网络参数total_loss += loss.item() # 累计losspbar.set_postfix(**{"Loss": loss.item()}) # 显示当前iter的losspbar.update(1) # 步进长度pbar.close() # 关闭当前epoch显示进度print("total_loss:%.4f" %(total_loss / len(dataloader))) # 显示当前epoch训练完成后,模型的总损失vae.eval() # 模型开始验证sample_images = vae.sample(25, cuda) # 获得25个生成样本save_image(sample_images.data, "%s/ep%d.png" % (sample_images_dir, (epoch + 1)), nrow=5,normalize=True) # 保存生成样本示例(5x5)
其中计算KLloss的代码的解释如下:
代码的目标是计算变分自编码器(VAE)中近似后验分布q(z∣x) 和标准正态分布 p(z) 之间的KL散度。KL散度公式的具体计算步骤如下:
1. mu ** 2
计算均值的平方项: μ2 这个项是为了衡量均值偏离零的程度。
2. torch.exp(log_var)
对数方差取指数,以获得实际的方差: exp(log(σ2))=σ2 这个项衡量方差的大小。
3. - log_var
减去对数方差: −log(σ2) 这个项衡量分布的扩展程度。
4. - 1
减去 1,是KL散度公式中的常数项,用于归一化。
将这些项加在一起:
μ2+exp(log(σ2))−log(σ2)−1
5. torch.sum(..., 1)
对所有维度求和,计算单个样本的KL散度: ∑(μ2+σ2−log(σ2)−1) 这一步是将每个样本的所有维度的KL散度加起来。
6. 0.5 * ...
乘以 0.5,因KL散度公式中有系数 0.5: 0.5×∑(μ2+σ2−log(σ2)−1)
7. torch.mean(..., 0)
对所有样本取平均,得到最终的KL散度损失: mean(0.5×∑(μ2+σ2−log(σ2)−1))
整个公式的作用是计算出近似后验分布 q(z∣x) 和标准正态分布 p(z) 之间的KL散度,该散度表示了两个分布之间的差异。这种损失通常用于变分自编码器(VAE)训练中,确保生成的潜在变量分布接近标准正态分布。