昇思25天学习打卡营第15天|探索 Diffusion 扩散模型:从构建到应用的全过程

目录

环境配置

构建Diffusion模型

位置向量

ResNet/ConvNeXT块

Attention模块

组规一化

条件U-Net

正向扩散

数据准备与处理

训练过程

推理过程


环境配置


        首先进行环境配置、库的导入和一些设置操作,具体代码如下

%%capture captured_output  
# 这行可能是用于捕获后续代码块的输出  
# 卸载已安装的 MindSpore 库  
!pip uninstall mindspore -y  
# 从指定源安装特定版本(2.2.14)的 MindSpore 库  
!pip install -i https://pypi.mirrors.ustc.edu.cn/simple mindspore==2.2.14  
# 查看当前安装的 MindSpore 版本  
!pip show mindspore  
# 导入一些必要的数学和绘图相关的库和模块  
import math  
from functools import partial  
%matplotlib inline  # 使 matplotlib 绘图可以在当前 notebook 中直接显示  
import matplotlib.pyplot as plt  
from tqdm.auto import tqdm  # 用于显示进度条  
import numpy as np  
from multiprocessing import cpu_count  # 获取 CPU 核心数量  
from download import download  # 导入自定义的下载模块  import mindspore as ms  # 导入 MindSpore 库  
import mindspore.nn as nn  # 导入 MindSpore 中的神经网络模块  
import mindspore.ops as ops  # 导入 MindSpore 中的操作模块  
from mindspore import Tensor, Parameter  # 导入 MindSpore 中的张量和参数类  
from mindspore import dtype as mstype  # 导入 MindSpore 中的数据类型  
from mindspore.dataset.vision import Resize, Inter, CenterCrop, ToTensor, RandomHorizontalFlip, ToPIL  # 导入图像相关的操作  
from mindspore.common.initializer import initializer  # 导入初始化器  
from mindspore.amp import DynamicLossScaler  # 导入动态损失缩放器  ms.set_seed(0)  # 设置 MindSpore 的随机数种子为 0,以确保结果的可重复性  

        运行结果:

Name: mindspore  
Version: 2.2.14  
Summary: MindSpore is a new open source deep learning training/inference framework that could be used for mobile, edge and cloud scenarios.  
Home-page: https://www.mindspore.cn  
Author: The MindSpore Authors  
Author-email: contact@mindspore.cn  
License: Apache 2.0  
Location: /home/nginx/miniconda/envs/jupyter/lib/python3.9/site-packages  
Requires: asttokens, astunparse, numpy, packaging, pillow, protobuf, psutil, scipy  
Required-by:   

构建Diffusion模型


        首先,我们精心定义了一系列的辅助函数以及类,而这些函数和类将会在神经网络的实现过程中得以运用。与此同时,我们还特意为上采样和下采样操作设定了相应的别名。具体代码如下

def rearrange(head, inputs):  """ 这个函数接收输入 `inputs` 的形状信息和 `head` 值,将 `inputs` 重新整形 具体是将其从原始形状 `(b, hc, x, y)` 转变为 `(b, head, c, x * y)`,其中 `c = hc // head` """  b, hc, x, y = inputs.shape  c = hc // head  return inputs.reshape((b, head, c, x * y))  def rsqrt(x):  """ 这个函数计算输入 `x` 的平方根的倒数 首先计算平方根,然后取其倒数 """  res = ops.sqrt(x)  return ops.inv(res)  def randn_like(x, dtype=None):  """ 此函数生成与输入 `x` 形状相同的随机数数组 如果未指定 `dtype`,则使用 `x` 的数据类型;否则,按照指定的 `dtype` 生成 """  if dtype is None:  dtype = x.dtype  res = ops.standard_normal(x.shape).astype(dtype)  return res  def randn(shape, dtype=None):  """ 这个函数生成指定形状 `shape` 的随机数数组 如果未指定 `dtype`,默认为 `ms.float32`;否则,按照指定的 `dtype` 生成 """  if dtype is None:  dtype = ms.float32  res = ops.standard_normal(shape).astype(dtype)  return res  def randint(low, high, size, dtype=ms.int32):  """ 生成指定范围内的随机整数数组 范围是从 `low` 到 `high`(不包括 `high`),数组大小为 `size`,数据类型为 `dtype` """  res = ops.uniform(size, Tensor(low, dtype), Tensor(high, dtype), dtype=dtype)  return res  def exists(x):  """ 检查输入 `x` 是否不为 `None` """  return x is not None  def default(val, d):  """ 如果 `val` 存在(不为 `None`),则返回 `val`;否则,如果 `d` 是可调用的,则调用 `d` 并返回结果,否则直接返回 `d` """  if exists(val):  return val  return d() if callable(d) else d  def _check_dtype(d1, d2):  """ 检查两个数据类型 `d1` 和 `d2` 如果其中一个是 `ms.float32`,则返回 `ms.float32` 如果两者相同,则返回该数据类型 否则抛出 `ValueError` 异常 """  if ms.float32 in (d1, d2):  return ms.float32  if d1 == d2:  return d1  raise ValueError('dtype is not supported.')  class Residual(nn.Cell):  """ 定义了一个名为 `Residual` 的类,继承自 `nn.Cell` 在其构造函数中初始化了一个函数 `fn` 在 `construct` 方法中执行 `fn` 函数对输入 `x` 及其他参数的处理,并将结果与输入 `x` 相加后返回 """  def __init__(self, fn):  super().__init__()  self.fn = fn  def construct(self, x, *args, **kwargs):  return self.fn(x, *args, **kwargs) + x  def Upsample(dim):  """ 定义一个上采样函数,创建一个转置卷积层用于上采样操作,输入和输出通道数都为 `dim` """  return nn.Conv2dTranspose(dim, dim, 4, 2, pad_mode="pad", padding=1)  def Downsample(dim):  """ 定义一个下采样函数,创建一个卷积层用于下采样操作,输入和输出通道数都为 `dim` """  return nn.Conv2d(dim, dim, 4, 2, pad_mode="pad", padding=1)  

位置向量


        定义了一个被称作 SinusoidalPositionEmbeddings 的类,该类继承自 nn.Cell 。

        在 __init__ 方法(即构造函数)里:

        首先,初始化了一个属性 dim ,用于存储输入的维度数值。

        接着,计算出 half_dim ,其为 dim 的一半。

        随后,计算了一个值 emb ,并将其转换为 Tensor 类型进行保存。

        在 construct 方法(即构建方法)中:

        计算了 emb ,它是输入 x 与 self.emb 的乘积。

        通过 concat 函数将 sin(emb) 和 cos(emb) 沿着最后一个维度进行连接,最终返回所得结果。

        总体而言,这个类的主要功能是依据输入的 x 以及预先定义的参数来进行计算,并返回一种位置编码。

        代码如下:

class SinusoidalPositionEmbeddings(nn.Cell):  def __init__(self, dim):  super().__init__()  self.dim = dim  half_dim = self.dim // 2  emb = math.log(10000) / (half_dim - 1)  emb = np.exp(np.arange(half_dim) * - emb)  self.emb = Tensor(emb, ms.float32)  def construct(self, x):  emb = x[:, None] * self.emb[None, :]  emb = ops.concat((ops.sin(emb), ops.cos(emb)), axis=-1)  return emb  

ResNet/ConvNeXT块


        首先,定义了一个名为 Block 的类,它继承自 nn.Cell 。

        在 Block 类的 __init__ 方法(构造函数)中:

        初始化了一个卷积层 self.proj 用于进行维度转换。

        初始化了一个组归一化层 self.norm 。

        初始化了一个激活函数 self.act 为 SiLU 。

        在 Block 类的 construct 方法(构建方法)中:

        对输入 x 进行卷积操作。

        对卷积结果进行归一化。

        如果存在 scale_shift ,则对结果进行相应的调整。

        最后应用激活函数并返回结果。

        接下来,定义了一个名为 ConvNextBlock 的类,同样继承自 nn.Cell 。

        在 ConvNextBlock 类的 __init__ 方法中:

        如果存在 time_emb_dim ,创建一个包含 GELU 和 Dense 层的多层感知机 self.mlp 。

        创建一个深度可分离卷积层 self.ds_conv 。

        创建一个包含多个层的序列 self.net ,包括归一化、卷积、激活函数和再次归一化、卷积等操作。

        创建一个用于处理输入和输出维度不一致的卷积层 self.res_conv 。

        在 ConvNextBlock 类的 construct 方法中:

        对输入 x 进行深度可分离卷积操作。

        如果存在 mlp 和 time_emb ,将条件信息添加到卷积结果中。

        对结果进行 self.net 中的一系列操作。

        将结果与经过 self.res_conv 处理的输入相加并返回。

        总的来说,这两个类可能用于构建神经网络中的特定模块,实现特定的卷积和处理逻辑。代码如下

class Block(nn.Cell):  def __init__(self, dim, dim_out, groups=1):  super().__init__()  self.proj = nn.Conv2d(dim, dim_out, 3, pad_mode="pad", padding=1)  self.proj = c(dim, dim_out, 3, padding=1, pad_mode='pad')  self.norm = nn.GroupNorm(groups, dim_out)  self.act = nn.SiLU()  def construct(self, x, scale_shift=None):  x = self.proj(x)  x = self.norm(x)  if exists(scale_shift):  scale, shift = scale_shift  x = x * (scale + 1) + shift  x = self.act(x)  return x  class ConvNextBlock(nn.Cell):  def __init__(self, dim, dim_out, *, time_emb_dim=None, mult=2, norm=True):  super().__init__()  self.mlp = (  nn.SequentialCell(nn.GELU(), nn.Dense(time_emb_dim, dim))  if exists(time_emb_dim)  else None  )  self.ds_conv = nn.Conv2d(dim, dim, 7, padding=3, group=dim, pad_mode="pad")  self.net = nn.SequentialCell(  nn.GroupNorm(1, dim) if norm else nn.Identity(),  nn.Conv2d(dim, dim_out * mult, 3, padding=1, pad_mode="pad"),  nn.GELU(),  nn.GroupNorm(1, dim_out * mult),  nn.Conv2d(dim_out * mult, dim_out, 3, padding=1, pad_mode="pad"),  )  self.res_conv = nn.Conv2d(dim, dim_out, 1) if dim != dim_out else nn.Identity()  def construct(self, x, time_emb=None):  h = self.ds_conv(x)  if exists(self.mlp) and exists(time_emb):  assert exists(time_emb), "time embedding must be passed in"  condition = self.mlp(time_emb)  condition = condition.expand_dims(-1).expand_dims(-1)  h = h + condition  h = self.net(h)  return h + self.res_conv(x)  

Attention模块


        Attention 类:

        __init__ 方法:

        初始化一些参数,如缩放因子 scale 、头的数量 heads 、每个头的维度 dim_head 。

        创建用于将输入转换为查询(q)、键(k)和值(v)的卷积层 to_qkv ,以及将处理后的结果转换回输出维度的卷积层 to_out 。

        定义了一些操作函数 map 和 partial 。

        construct 方法:

        获取输入的形状信息。

        将 to_qkv 的输出拆分为 q、k、v 。

        对 q 进行缩放。

        计算 q 和 k 的相似度矩阵 sim ,并进行 softmax 操作得到注意力权重 attn 。

        根据 attn 计算输出 out ,并将其重新整形。

        最后通过 to_out 卷积层得到最终的输出。

        LayerNorm 类:

        __init__ 方法:创建一个可学习的参数 g 。

        construct 方法:计算输入的方差 var 和均值 mean ,进行归一化处理并乘以 g 。

        LinearAttention 类:

        与 Attention 类类似的初始化参数和层。

        construct 方法:

        获取输入形状。

        处理得到 q、k、v 。

        对 q 和 k 进行 softmax 操作。

        计算上下文 context 和输出 out 。

        将 out 重新整形,并通过 to_out 序列中的层得到最终输出。

        总的来说,定义了三种与注意力机制相关的类,用于在神经网络中进行特征的处理和转换

        代码如下:

class Attention(nn.Cell):  def __init__(self, dim, heads=4, dim_head=32):  super().__init__()  self.scale = dim_head ** -0.5  self.heads = heads  hidden_dim = dim_head * heads  self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, pad_mode='valid', has_bias=False)  self.to_out = nn.Conv2d(hidden_dim, dim, 1, pad_mode='valid', has_bias=True)  self.map = ops.Map()  self.partial = ops.Partial()  def construct(self, x):  b, _, h, w = x.shape  qkv = self.to_qkv(x).chunk(3, 1)  q, k, v = self.map(self.partial(rearrange, self.heads), qkv)  q = q * self.scale  # 'b h d i, b h d j -> b h i j'  sim = ops.bmm(q.swapaxes(2, 3), k)  attn = ops.softmax(sim, axis=-1)  # 'b h i j, b h d j -> b h i d'  out = ops.bmm(attn, v.swapaxes(2, 3))  out = out.swapaxes(-1, -2).reshape((b, -1, h, w))  return self.to_out(out)  class LayerNorm(nn.Cell):  def __init__(self, dim):  super().__init__()  self.g = Parameter(initializer('ones', (1, dim, 1, 1)), name='g')  def construct(self, x):  eps = 1e-5  var = x.var(1, keepdims=True)  mean = x.mean(1, keep_dims=True)  return (x - mean) * rsqrt((var + eps)) * self.g  class LinearAttention(nn.Cell):  def __init__(self, dim, heads=4, dim_head=32):  super().__init__()  self.scale = dim_head ** -0.5  self.heads = heads  hidden_dim = dim_head * heads  self.to_qkv = nn.Conv2d(dim, hidden_dim * 3, 1, pad_mode='valid', has_bias=False)  self.to_out = nn.SequentialCell(  nn.Conv2d(hidden_dim, dim, 1, pad_mode='valid', has_bias=True),  LayerNorm(dim)  )  self.map = ops.Map()  self.partial = ops.Partial()  def construct(self, x):  b, _, h, w = x.shape  qkv = self.to_qkv(x).chunk(3, 1)  q, k, v = self.map(self.partial(rearrange, self.heads), qkv)  q = ops.softmax(q, -2)  k = ops.softmax(k, -1)  q = q * self.scale  v = v / (h * w)  # 'b h d n, b h e n -> b h d e'  context = ops.bmm(k, v.swapaxes(2, 3))  # 'b h d e, b h d n -> b h e n'  out = ops.bmm(context.swapaxes(2, 3), q)  out = out.reshape((b, -1, h, w))  return self.to_out(out)  

组规一化


        定义了一个名为 PreNorm 的类,它继承自 nn.Cell 。

        在 __init__ 方法(构造函数)中:

        接收两个参数:dim(可能表示维度)和 fn(可能是一个函数或其他可调用对象)。

        初始化了一个属性 fn 来保存传入的函数。

        创建了一个组归一化层 self.norm 。

        在 construct 方法(构建方法)中:

        首先对输入 x 进行组归一化操作。

        然后将归一化后的 x 传入保存的函数 fn 并返回其结果。

        总的来说,这个类的作用是在对输入进行特定函数处理之前先进行组归一化操作。

        代码如下:

class PreNorm(nn.Cell):  def __init__(self, dim, fn):  super().__init__()  self.fn = fn  self.norm = nn.GroupNorm(1, dim)  def construct(self, x):  x = self.norm(x)  return self.fn(x)  

条件U-Net


        定义了一个名为 Unet 的类,它是一个基于卷积神经网络的模型。

        __init__ 方法用于初始化模型的参数和构建模型的各个部分,包括初始卷积层、时间嵌入处理、下采样和上采样的模块等。

        construct 方法定义了模型的前向传播过程。首先进行初始卷积,然后通过下采样、中间处理和上采样等步骤对输入进行处理,最终通过最终的卷积层得到输出。

        例如,如果输入图像为 [batch_size, channels, height, width] 的张量,并且提供了时间信息,模型会按照上述的流程对输入进行处理,逐步提取特征并生成最终的输出。

        代码如下:

# 定义 Unet 类,继承自 `nn.Cell`  
class Unet(nn.Cell):  def __init__(  self,  dim,  init_dim=None,  out_dim=None,  dim_mults=(1, 2, 4, 8),  channels=3,  with_time_emb=True,  convnext_mult=2,  ):  # 调用父类的初始化方法  super().__init__()  # 保存输入图像的通道数  self.channels = channels  # 如果未提供 init_dim,计算默认值  init_dim = default(init_dim, dim // 3 * 2)  # 初始化卷积层,用于将输入的图像通道转换为 init_dim  self.init_conv = nn.Conv2d(channels, init_dim, 7, padding=3, pad_mode="pad", has_bias=True)  # 计算不同层次的维度  dims = [init_dim, *map(lambda m: dim * m, dim_mults)]  in_out = list(zip(dims[:-1], dims[1:]))  # 创建部分应用的 ConvNextBlock 类  block_klass = partial(ConvNextBlock, mult=convnext_mult)  # 如果启用时间嵌入  if with_time_emb:  # 计算时间嵌入的维度  time_dim = dim * 4  # 创建时间嵌入的多层感知机  self.time_mlp = nn.SequentialCell(  SinusoidalPositionEmbeddings(dim),  nn.Dense(dim, time_dim),  nn.GELU(),  nn.Dense(time_dim, time_dim),  )  else:  # 否则,将时间嵌入相关设置为 None  time_dim = None  self.time_mlp = None  # 创建用于下采样的单元格列表  self.downs = nn.CellList([])  # 创建用于上采样的单元格列表  self.ups = nn.CellList([])  num_resolutions = len(in_out)  # 构建下采样部分  for ind, (dim_in, dim_out) in enumerate(in_out):  is_last = ind >= (num_resolutions - 1)  self.downs.append(  nn.CellList(  [  block_klass(dim_in, dim_out, time_emb_dim=time_dim),  block_klass(dim_out, dim_out, time_emb_dim=time_dim),  Residual(PreNorm(dim_out, LinearAttention(dim_out))),  Downsample(dim_out) if not is_last else nn.Identity(),  ]  )  )  # 中间块部分  mid_dim = dims[-1]  self.mid_block1 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)  self.mid_attn = Residual(PreNorm(mid_dim, Attention(mid_dim)))  self.mid_block2 = block_klass(mid_dim, mid_dim, time_emb_dim=time_dim)  # 构建上采样部分  for ind, (dim_in, dim_out) in enumerate(reversed(in_out[1:])):  is_last = ind >= (num_resolutions - 1)  self.ups.append(  nn.CellList(  [  block_klass(dim_out * 2, dim_in, time_emb_dim=time_dim),  block_klass(dim_in, dim_in, time_emb_dim=time_dim),  Residual(PreNorm(dim_in, LinearAttention(dim_in))),  Upsample(dim_in) if not is_last else nn.Identity(),  ]  )  )  # 如果未提供 out_dim,计算默认值  out_dim = default(out_dim, channels)  # 最终的卷积层  self.final_conv = nn.SequentialCell(  block_klass(dim, dim), nn.Conv2d(dim, out_dim, 1)  )  def construct(self, x, time):  # 对输入进行初始卷积  x = self.init_conv(x)  # 获取时间嵌入  t = self.time_mlp(time) if exists(self.time_mlp) else None  # 存储下采样过程中的中间结果  h = []  # 下采样过程  for block1, block2, attn, downsample in self.downs:  x = block1(x, t)  x = block2(x, t)  x = attn(x)  h.append(x)  x = downsample(x)  # 中间处理  x = self.mid_block1(x, t)  x = self.mid_attn(x)  x = self.mid_block2(x, t)  len_h = len(h) - 1  # 上采样过程  for block1, block2, attn, upsample in self.ups:  x = ops.concat((x, h[len_h]), 1)  len_h -= 1  x = block1(x, t)  x = block2(x, t)  x = attn(x)  x = upsample(x)  # 最终的卷积输出  return self.final_conv(x)  

正向扩散


        定义了生成线性 beta 调度的函数,并根据指定的步数生成了 betas。

        基于 betas 计算了相关的参数,如 alphas、alphas_cumprod 等,并将一些结果转换为 Tensor 类型。

        定义了用于提取特定位置值的函数 extract。

        下载了一个包含猫猫图像的压缩文件,并打开并调整了其中一张图像的大小,最后显示了该图像。

        代码如下:

# 定义一个函数用于生成线性的 beta 调度  
def linear_beta_schedule(timesteps):  # 定义起始和结束的 beta 值  beta_start = 0.0001  beta_end = 0.02  # 使用线性插值生成指定步数的 beta 值,并转换为 float32 类型  return np.linspace(beta_start, beta_end, timesteps).astype(np.float32)  # 设置扩散的步数  
timesteps = 200  # 调用函数定义 beta 调度  
betas = linear_beta_schedule(timesteps=timesteps)  # 计算 alphas  
alphas = 1. - betas  
# 计算累积乘积的 alphas  
alphas_cumprod = np.cumprod(alphas, axis=0)  
# 对 alphas_cumprod 进行填充处理  
alphas_cumprod_prev = np.pad(alphas_cumprod[:-1], (1, 0), constant_values=1)  # 将一些计算结果转换为 Tensor 类型  
sqrt_recip_alphas = Tensor(np.sqrt(1. / alphas))  
sqrt_alphas_cumprod = Tensor(np.sqrt(alphas_cumprod))  
sqrt_one_minus_alphas_cumprod = Tensor(np.sqrt(1. - alphas_cumprod))  # 计算后验方差  
posterior_variance = betas * (1. - alphas_cumprod_prev) / (1. - alphas_cumprod)  # 计算 p2_loss_weight  
p2_loss_weight = (1 + alphas_cumprod / (1 - alphas_cumprod)) ** -0.  
p2_loss_weight = Tensor(p2_loss_weight)  # 定义一个函数用于提取特定位置的值  
def extract(a, t, x_shape):  b = t.shape[0]  # 从 a 中根据 t 提取值  out = Tensor(a).gather(t, -1)  # 对提取的值进行形状调整  return out.reshape(b, *((1,) * (len(x_shape) - 1)))  # 下载猫猫图像  
url = 'https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/image_cat.zip'  
path = download(url, './', kind="zip", replace=True)  
from PIL import Image  # 打开图像  
image = Image.open('./image_cat/jpg/000000039769.jpg')  
# 调整图像大小  
base_width = 160  
image = image.resize((base_width, int(float(image.size[1]) * float(base_width / float(image.size[0])))))  
# 显示图像  
image.show()  

        运行结果:

        定义了一系列对图像的处理变换操作,包括调整大小、裁剪、转换为张量、数值缩放以及随机水平翻转。

        从指定路径加载图像文件夹数据集,并对其进行处理和分批。

        获取一批数据并打印其形状。

        定义了反向变换的操作和组合变换的函数,对获取的数据进行反向变换并显示图像。

        例如,通过调整大小和裁剪将图像统一为 128x128 的尺寸,然后进行随机水平翻转、数值处理等,最后可以将处理后的数据再通过反向变换恢复并显示出来,以便观察处理前后的效果。

        代码如下:

from mindspore.dataset import ImageFolderDataset  # 定义图像的处理变换  
image_size = 128  
transforms = [  Resize(image_size, Inter.BILINEAR),  CenterCrop(image_size),  ToTensor(),  lambda t: (t * 2) - 1  
]  # 图像数据集的路径  
path = './image_cat'  # 创建 ImageFolderDataset 对象  
dataset = ImageFolderDataset(dataset_dir=path, num_parallel_workers=cpu_count(),  extensions=['.jpg', '.jpeg', '.png', '.tiff'],  num_shards=1, shard_id=0, shuffle=False, decode=True)  # 只选择 'image' 列  
dataset = dataset.project('image')  # 在变换列表中插入随机水平翻转  
transforms.insert(1, RandomHorizontalFlip())  # 应用变换并将数据集分批处理  
dataset_1 = dataset.map(transforms, 'image')  
dataset_2 = dataset_1.batch(1, drop_remainder=True)  # 获取一批数据中的第一个元素  
x_start = next(dataset_2.create_tuple_iterator())[0]  
print(x_start.shape)  import numpy as np  # 定义反向变换的操作  
reverse_transform = [  lambda t: (t + 1) / 2,  lambda t: ops.permute(t, (1, 2, 0)),  # 从 CHW 转换为 HWC  lambda t: t * 255.,  lambda t: t.asnumpy().astype(np.uint8),  ToPIL()  
]  # 定义组合变换的函数  
def compose(transform, x):  for d in transform:  x = d(x)  return x  # 对获取的数据进行反向变换并显示  
reverse_image = compose(reverse_transform, x_start[0])  
reverse_image.show()  

        运行结果:

        q_sample 函数用于根据输入的起始图像、时间步和可能的噪声生成带噪的样本。

        get_noisy_image 函数通过调用 q_sample 得到带噪图像,并将其转换为 PIL 图像。

        plot 函数用于绘制图像,支持设置是否包含原始图像以及行标题等。

        p_losses 函数计算模型的损失,涉及生成带噪样本、获取模型预测的噪声、计算损失并进行一些处理。

        例如,通过设置不同的时间步 t ,可以观察到不同程度噪声的图像效果。而 p_losses 函数用于在训练过程中计算损失,以优化模型。

        代码如下:

# 定义 q_sample 函数,用于根据起始图像、时间步和噪声生成带噪样本  
def q_sample(x_start, t, noise=None):  if noise is None:  noise = randn_like(x_start)  return (extract(sqrt_alphas_cumprod, t, x_start.shape) * x_start +  extract(sqrt_one_minus_alphas_cumprod, t, x_start.shape) * noise)  # 定义 get_noisy_image 函数,获取添加噪声后的图像并转换为 PIL 图像  
def get_noisy_image(x_start, t):  # 添加噪音  x_noisy = q_sample(x_start, t=t)  # 转换为 PIL 图像  noisy_image = compose(reverse_transform, x_noisy[0])  return noisy_image  # 设置时间步  
t = Tensor([40])  
noisy_image = get_noisy_image(x_start, t)  
print(noisy_image)  
noisy_image.show()  import matplotlib.pyplot as plt  # 定义 plot 函数用于绘制图像  
def plot(imgs, with_orig=False, row_title=None, **imshow_kwargs):  if not isinstance(imgs[0], list):  imgs = [imgs]  num_rows = len(imgs)  num_cols = len(imgs[0]) + with_orig  _, axs = plt.subplots(figsize=(200, 200), nrows=num_rows, ncols=num_cols, squeeze=False)  for row_idx, row in enumerate(imgs):  row = [image] + row if with_orig else row  for col_idx, img in enumerate(row):  ax = axs[row_idx, col_idx]  ax.imshow(np.asarray(img), **imshow_kwargs)  ax.set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])  if with_orig:  axs[0, 0].set(title='Original image')  axs[0, 0].title.set_size(8)  if row_title is not None:  for row_idx in range(num_rows):  axs[row_idx, 0].set(ylabel=row_title[row_idx])  plt.tight_layout()  # 调用 plot 函数绘制多个时间步的噪声图像  plot([get_noisy_image(x_start, Tensor([t])) for t in [0, 50, 100, 150, 199]])  # 定义 p_losses 函数计算损失  
def p_losses(unet_model, x_start, t, noise=None):  if noise is None:  noise = randn_like(x_start)  x_noisy = q_sample(x_start=x_start, t=t, noise=noise)  predicted_noise = unet_model(x_noisy, t)  loss = nn.SmoothL1Loss()(noise, predicted_noise)  # todo  loss = loss.reshape(loss.shape[0], -1)  loss = loss * extract(p2_loss_weight, t, loss.shape)  return loss.mean()  

        运行结果:

数据准备与处理


        下载 MNIST 数据集的压缩文件。

        从 mindspore.dataset 中导入 FashionMnistDataset 类。

        定义了一些数据集的参数,如图像尺寸、通道数、批大小等。

        从指定路径创建用于训练的 FashionMnistDataset 对象,并设置了一些数据加载的参数,如是否打乱、并行处理数量等。

        定义了数据变换操作,包括随机水平翻转、转换为张量和数值处理。

        对数据集进行了选择列、打乱、应用变换和分批处理。

        获取一批数据并打印其键。

        例如,通过这些操作,对 MNIST 数据集进行了预处理和分批,为后续的模型训练或数据处理做好了准备。打印键可以查看这批数据中包含的字段信息。

        代码如下:

# 下载 MNIST 数据集  
url = 'https://mindspore-website.obs.cn-north-4.myhuaweicloud.com/notebook/datasets/dataset.zip'  
path = download(url, './', kind="zip", replace=True)  from mindspore.dataset import FashionMnistDataset  # 定义图像的尺寸、通道数和批大小  
image_size = 28  
channels = 1  
batch_size = 16  # 数据集的路径  
fashion_mnist_dataset_dir = "./dataset"  # 创建 FashionMnistDataset 对象用于训练数据  
dataset = FashionMnistDataset(dataset_dir=fashion_mnist_dataset_dir, usage="train", num_parallel_workers=cpu_count(), shuffle=True, num_shards=1, shard_id=0)  # 定义数据的变换操作  
transforms = [  RandomHorizontalFlip(),  ToTensor(),  lambda t: (t * 2) - 1  
]  # 只选择 'image' 列  
dataset = dataset.project('image')  
# 对数据集进行随机打乱  
dataset = dataset.shuffle(64)  
# 应用变换操作  
dataset = dataset.map(transforms, 'image')  
# 将数据集分批处理  
dataset = dataset.batch(16, drop_remainder=True)  # 获取一批数据并打印其键  
x = next(dataset.create_dict_iterator())  
print(x.keys())  

        运行结果:

        p_sample 函数根据模型输出、输入数据、时间步等计算采样结果,在不同的时间步索引下有不同的处理逻辑。

        p_sample_loop 函数通过一个循环多次调用 p_sample ,从纯噪声开始逐步生成采样结果,并将每次的结果保存到 imgs 列表中。

        sample 函数用于设置采样的形状参数,并调用 p_sample_loop 进行采样操作。

        例如,在模型训练完成后,可以使用 sample 函数生成采样图像,通过逐步调整时间步来观察生成的效果。

        代码如下:

# 定义 p_sample 函数,用于根据模型预测、输入图像、时间步和时间步索引生成采样结果  
def p_sample(model, x, t, t_index):  # 提取相关的参数  betas_t = extract(betas, t, x.shape)  sqrt_one_minus_alphas_cumprod_t = extract(  sqrt_one_minus_alphas_cumprod, t, x.shape  )  sqrt_recip_alphas_t = extract(sqrt_recip_alphas, t, x.shape)  # 计算模型均值  model_mean = sqrt_recip_alphas_t * (x - betas_t * model(x, t) / sqrt_one_minus_alphas_cumprod_t)  if t_index == 0:  return model_mean  # 提取后验方差  posterior_variance_t = extract(posterior_variance, t, x.shape)  # 生成噪声  noise = randn_like(x)  return model_mean + ops.sqrt(posterior_variance_t) * noise  # 定义 p_sample_loop 函数,通过循环进行多次 p_sample 操作  
def p_sample_loop(model, shape):  b = shape[0]  # 从纯噪声开始  img = randn(shape, dtype=None)  imgs = []  for i in tqdm(reversed(range(0, timesteps)), desc='sampling loop time step', total=timesteps):  img = p_sample(model, img, ms.numpy.full((b,), i, dtype=mstype.int32), i)  imgs.append(img.asnumpy())  return imgs  # 定义 sample 函数,用于调用 p_sample_loop 进行采样  
def sample(model, image_size, batch_size=16, channels=3):  return p_sample_loop(model, shape=(batch_size, channels, image_size, image_size))  

训练过程


        定义了一个动态变化的学习率 lr。

        构建了 Unet 模型,并处理了模型参数的名称。

        定义了优化器 optimizer 和损失缩放器 loss_scaler。

        定义了前向计算函数 forward_fn、梯度计算函数 grad_fn 和梯度更新函数 train_step。

        在训练循环中,每次迭代从数据集中获取一批数据,生成随机的时间步和噪声,计算损失并进行梯度更新。

        按照一定的间隔打印训练过程中的损失信息。

        在每个 epoch 结束时,计算训练时间,并展示模型的随机采样效果。

        例如,通过不断迭代训练数据,模型的参数会根据梯度进行调整,以优化模型的性能。同时,通过观察损失的变化和采样效果,可以评估模型的训练进展和效果。

        代码如下:

# 定义动态学习率  
lr = nn.cosine_decay_lr(min_lr=1e-7, max_lr=1e-4, total_step=10*3750, step_per_epoch=3750, decay_epoch=10)  # 定义 Unet 模型  
unet_model = Unet(  dim=image_size,  channels=channels,  dim_mults=(1, 2, 4,)  
)  name_list = []  
for (name, par) in list(unet_model.parameters_and_names()):  name_list.append(name)  
i = 0  
for item in list(unet_model.trainable_params()):  item.name = name_list[i]  i += 1  # 定义优化器  
optimizer = nn.Adam(unet_model.trainable_params(), learning_rate=lr)  
loss_scaler = DynamicLossScaler(65536, 2, 1000)  # 定义前向过程  
def forward_fn(data, t, noise=None):  loss = p_losses(unet_model, data, t, noise)  return loss  # 计算梯度  
grad_fn = ms.value_and_grad(forward_fn, None, optimizer.parameters, has_aux=False)  # 梯度更新  
def train_step(data, t, noise):  loss, grads = grad_fn(data, t, noise)  optimizer(grads)  return loss  import time  # 由于时间原因,epochs 设置为 1,可根据需求进行调整  
epochs = 1  for epoch in range(epochs):  begin_time = time.time()  for step, batch in enumerate(dataset.create_tuple_iterator()):  unet_model.set_train()  batch_size = batch[0].shape[0]  t = randint(0, timesteps, (batch_size,), dtype=ms.int32)  noise = randn_like(batch[0])  loss = train_step(batch[0], t, noise)  if step % 500 == 0:  print(" epoch: ", epoch, " step: ", step, " Loss: ", loss)  end_time = time.time()  times = end_time - begin_time  print("training time:", times, "s")  # 展示随机采样效果  unet_model.set_train(False)  samples = sample(unet_model, image_size=image_size, batch_size=64, channels=channels)  plt.imshow(samples[-1][5].reshape(image_size, image_size, channels), cmap="gray")  
print("Training Success!")  

        运行结果

推理过程


        首先对模型设置为评估模式,并进行采样得到 64 个图像的结果。

        展示了采样结果中最后一个图像的第 5 个图像的效果。

        然后选择了一个新的随机索引 53 。

        通过循环创建了一系列图像,并将它们存储在 ims 列表中。

        使用 matplotlib.animation 库创建了一个动画,展示每个时间步对应的采样图像。

        将动画保存为 diffusion.gif 并显示出来。

        例如,通过这个动画,可以直观地观察到随着时间步的变化,采样图像的演变过程。

        代码如下:

# 采样 64 个图片  
unet_model.set_train(False)  
samples = sample(unet_model, image_size=image_size, batch_size=64, channels=channels)  # 展示一个随机效果  
random_index = 5  
plt.imshow(samples[-1][random_index].reshape(image_size, image_size, channels), cmap="gray")  import matplotlib.animation as animation  random_index = 53  fig = plt.figure()  
ims = []  for i in range(timesteps):  # 显示每个时间步对应的图像  im = plt.imshow(samples[i][random_index].reshape(image_size, image_size, channels), cmap="gray", animated=True)  ims.append([im])  # 创建动画  
animate = animation.ArtistAnimation(fig, ims, interval=50, blit=True, repeat_delay=100)  
# 保存动画为 'diffusion.gif'  
animate.save('diffusion.gif')  
plt.show() 

        运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/news/876881.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

土体的有效应力原理

土体的有效应力原理 有效应力原则1. 总应力的测定2. 孔隙水压力的测定3. 有效应力的确定 有效应力的重要性 土体中的有效应力原理是卡尔太沙基在1936年提出的重要理论之一。它是总应力和孔隙水压力之间的差值。下面简要说明土壤中有效应力的更多特征和测定。 有效应力原则 有…

人工智能入门第一篇:简单理解GPU和CPU

目录 1,GPU就是显卡吗2,CPU和GPU到底是什么区别3,CUDA是什么4,为什么人工智能离不开GPU 1,GPU就是显卡吗 ‌不是,显卡和‌GPU是两个相关但不完全相同的概念,GPU是显卡的核心部分,但…

Google Test 学习笔记(简称GTest)

文章目录 一、介绍1.1 介绍1.2 教程 二、使用2.1 基本使用2.1.1 安装GTest (下载和编译)2.1.2 编写测试2.1.3 运行测试2.1.4 高级特性2.1.5 调试和分析 2.2 源码自带测试用例2.3 TEST 使用2.3.1 TestCase的介绍2.3.2 TEST宏demo1demo2 2.3.3 TEST_F宏2.3…

wincc 远程和PLC通讯方案

有 5个污水厂 的数据需要集中监控到1个组态软件上,软件是WINCC。每个污水厂监控系统都是独立的,已经投入运行了, 分站也是WINCC 和西门子PLC 。采用巨控远程模块的话,有两种方式:一种是从现场的PLC取数据,一种是从分站…

2019数字经济公测大赛-VMware逃逸

文章目录 环境搭建漏洞点exp 环境搭建 ubuntu :18.04.01vmware: VMware-Workstation-Full-15.5.0-14665864.x86_64.bundle 这里环境搭不成功。。patch过后就报错,不知道咋搞 发现可能是IDA加载后的patch似乎不行对原来的patch可能有影响,重新下了patch&…

【8月EI会议推荐】第四届区块链技术与信息安全国际会议

一、会议信息 大会官网:http://www.bctis.nhttp://www.icbdsme.org/ 官方邮箱:icbctis126.com 组委会联系人:杨老师 19911536763 支持单位:中原工学院、西安工程大学、齐鲁工业大学(山东省科学院)、澳门…

科大讯飞语音转写demo go语言版

上传了一个语音文件,识别效果。 package audioimport ("bytes""crypto/hmac""crypto/md5""crypto/sha1""encoding/base64""encoding/json""fmt""io/ioutil""net/http"…

【图文详解】Spring是如何解决循环依赖的?

Spring是如何解决循环依赖的呢? 很多小伙伴在面试时都被问到过这个问题,刷到过这个题的同学马上就能回答出来:“利用三级缓存”。面试官接着追问:“哪三级缓存呢?用两级行不行呢?” 这时候如果没有深入研究…

Vs2022+QT+Opencv 一些需要注意的地方

要在vs2022创建QT项目,先要安装一个插件Qt Visual Studio Tools,根据个人经验选择LEGACY Qt Visual Studio Tools好一些,看以下内容之前建议先在vs2022中配置好opencv,配置方式建议以属性表的形式保存在硬盘上。 设置QT路径 打开v…

清华计算几何-算法LowBound和ConvexHull(凸包)-GrahamScan

算法复杂度最低界限LowBound 算法求解复杂度是否存在一个最低界限,有时候想尽一切办法优化一个算法,去优化其复杂度,比如 清华计算几何-ConvexHull(凸包)-求极点InTriangle/ToLeft Test-CSDN博客 清华计算几何-ConvexHull(凸包)-求极边_计…

DeFi革命:揭秘去中心化金融的核心技术与实操指南

目录 DeFi(去中心化金融)综述 基本特点 第一,DeFi 是无许可的金融 第二,DeFi 是无门槛的金融 第三,DeFi 是无人驾驶的金融 典型商业模式 闪电贷 MakerDAO 面临的挑战 DeFi技术要点 椭圆曲线签名 EIP-712:…

模拟依赖关系和 AI 是Vue.js测试的下一个前沿领域

Vue.js 是一个流行的 JavaScript 框架,因此,确保其组件按预期工作至关重要:有效,更重要的是,可靠。模拟依赖项是最有效的测试方法之一,我们将在本文中发现。 模拟依赖项的必要性 模拟依赖项是一种对测试施加…

个人定制化形象生成,FaceChain最新模型部署

FaceChain是阿里巴巴达摩院推出的一个开源的人物写真和个人数字形象的AI生成框架。 FaceChain利用了Stable Diffusion模型的文生图功能,并结合人像风格化LoRA模型训练及人脸相关感知理解模型,将输入的图片进行训练后推理输出生成个人写真图像。 FaceCh…

Live555源码阅读笔记:哈希表的实现(C++)

😁博客主页😁:🚀https://blog.csdn.net/wkd_007🚀 🤑博客内容🤑:🍭嵌入式开发、Linux、C语言、C、数据结构、音视频🍭 🤣本文内容🤣&a…

算法日记day 20(中序后序遍历序列构造二叉树|最大、合并、搜索二叉树)

一、中序后序序列构造二叉树 题目: 给定两个整数数组 inorder 和 postorder ,其中 inorder 是二叉树的中序遍历, postorder 是同一棵树的后序遍历,请你构造并返回这颗 二叉树 。 示例 1: 输入:inorder [9,3,15,20,…

使用SpringEvent解决WebUploader大文件上传解耦问题

目录 前言 一、SpringEvent涉及的相关组件 1、 事件(Event) 2、事件监听器 3、事件发布器 二、WebUploader大文件处理的相关事件分析 1、事件发布的时机 2、事件发布的代码 三、事件监听器及实际的业务处理 1、文件上传处理枚举 2、文件上传监…

Python+selenium web自动化测试知识点合集2

选择元素 对于百度搜索页面,如果我们想自动化输入“selenium”,怎么做呢? 这就是在网页中,操控界面元素。 web界面自动化,要操控元素,首先需要 选择 界面元素 ,或者说 定位 界面元素 就是 先…

C++客户端Qt开发——界面优化(QSS)

1.QSS 如果通过QSS设置的样式和通过C代码设置的样式冲突,则QSS优先级更高 ①基本语法 选择器{属性名:属性值; } 例如: QPushButton {color: red; } 1>指定控件设置样式 #include "widget.h" #include "ui_widget.h&qu…

qt--做一个拷贝文件器

一、项目要求 使用线程完善文件拷贝器的操作 主窗口不能假死主窗口进度条必须能动改写文件大小的单位(自适应) 1TB1024GB 1GB1024MB 1MB1024KB 1KB1024字节 二、所需技术 1.QFileDialog 文件对话框 QFileDialog也继承了QDialog类,直接使用静态…

Redis缓存数据库进阶——Redis与分布式锁(6)

分布式锁简介 1. 什么是分布式锁 分布式锁是一种在分布式系统环境下,通过多个节点对共享资源进行访问控制的一种同步机制。它的主要目的是防止多个节点同时操作同一份数据,从而避免数据的不一致性。 线程锁: 也被称为互斥锁(Mu…