无条件图像生成是扩散模型的一种流行应用,它生成的图像看起来像用于训练的数据集中的图像。与文本或图像到图像模型不同,无条件图像生成不依赖于任何文本或图像。它只生成与其训练数据分布相似的图像。通常,通过在特定数据集上微调预训练模型可以获得最佳结果。
本教程主要来自huggingface官方教程,结合一些自己的修改,以支持训练本地数据集。我们首先依据官方教程,利用史密森尼蝴蝶数据集的子集上从头开始训练UNet2DModel
,以生我们自己的的🦋蝴蝶🦋。最后因为我是搞遥感方向的(测绘小卡拉米),所以利用遥感数据进行训练尝试,遥感影像使用的是煤矿区的无人机遥感影像,主要就是裸地和枯草,有的还有一些因为煤矿开采导致的地裂缝。
1、Train配置
为方便起见,创建一个包含训练超参数的TrainingConfig
类(请随意调整它们):
from dataclasses import dataclass@dataclass
class TrainingConfig:image_size = 128 # the generated image resolutiontrain_batch_size = 16eval_batch_size = 16 # how many images to sample during evaluationnum_epochs = 50gradient_accumulation_steps = 1learning_rate = 1e-4lr_warmup_steps = 500save_image_epochs = 10save_model_epochs = 30mixed_precision = "fp16" # `no` for float32, `fp16` for automatic mixed precisionoutput_dir = "ddpm-butterflies-128" # the model name locally and on the HF Hubpush_to_hub = True # whether to upload the saved model to the HF Hubhub_private_repo = Falseoverwrite_output_dir = True # overwrite the old model when re-running the notebookseed = 0config = TrainingConfig()
2、加载数据集
对于在hug 仓库空开的数据集可以使用🤗 Datasets依赖库轻松加载,比如本次的Smithsonian Butterflies:
from datasets import load_datasetconfig.dataset_name = "huggan/smithsonian_butterflies_subset"
dataset = load_dataset(config.dataset_name, split="train")
对于本地数据请用一下代码进行加载(请根据自己情况进行修改):
from datasets import load_datasetdata_dir = "/home/diffusers/datasets/isprsdataset"
dataset = load_dataset('imagefolder', data_dir=data_dir, split='train')
🤗 Datasets使用图像功能自动解码图像数据并将其加载为PIL. Image,我们可以将其可视化:
import matplotlib.pyplot as pltfig, axs = plt.subplots(1, 4, figsize=(16, 4))
for i, image in enumerate(dataset[:4]["image"]):axs[i].imshow(image)axs[i].set_axis_off()
fig.show()
3、图像预处理
由于图像大小不同,所以需要先对其进行预处理,也就是常规的图像增强:
- 调整大小将图像大小更改为配置文件中定义的图像大小—
image_size
。 RandomHorizontalFlip
通过随机镜像图像来增强数据集。Normalize
对于将像素值重新缩放到[-1,1]范围内很重要,这是模型所期望的。
from torchvision import transformspreprocess = transforms.Compose([transforms.Resize((config.image_size, config.image_size)),transforms.RandomHorizontalFlip(),transforms.ToTensor(),transforms.Normalize([0.5835, 0.5820, 0.5841], [0.1149, 0.1111, 0.1064]), # isprs# transforms.Normalize([0.5], [0.5]),]
)
这里使用的是Pytorch
自带的数据增强接口,这里我推荐大家使用albumentations
数据增强库。
使用🤗Datasets
的set_transform
方法在训练期间动态应用预处理函数:
def transform(examples):images = [preprocess(image.convert("RGB")) for image in examples["image"]]return {"images": images}dataset.set_transform(transform)
现在将数据集包装在DataLoader
中进行训练:
import torch
python
train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=config.train_batch_size, shuffle=True)
4、创建UNet2DModel
🧨 Diffusers 中的预训练模型可以使用您想要的参数从它们的模型类轻松创建。例如,要创建UNet2DModel
:
from diffusers import UNet2DModelmodel = UNet2DModel(sample_size=config.image_size, # the target image resolutionin_channels=3, # the number of input channels, 3 for RGB imagesout_channels=3, # the number of output channelslayers_per_block=2, # how many ResNet layers to use per UNet blockblock_out_channels=(128, 128, 256, 256, 512, 512), # the number of output channels for each UNet blockdown_block_types=("DownBlock2D", # a regular ResNet downsampling block"DownBlock2D","DownBlock2D","DownBlock2D","AttnDownBlock2D", # a ResNet downsampling block with spatial self-attention"DownBlock2D",),up_block_types=("UpBlock2D", # a regular ResNet upsampling block"AttnUpBlock2D", # a ResNet upsampling block with spatial self-attention"UpBlock2D","UpBlock2D","UpBlock2D","UpBlock2D",),
)
检查样本图像形状与模型输出形状是否匹配:
sample_image = dataset[0]["images"].unsqueeze(0)
print("Input shape:", sample_image.shape)print("Output shape:", model(sample_image, timestep=0).sample.shape)
接下来创建一个scheduler
为图像添加一些噪点。
5、创建scheduler
根据您是使用模型进行训练还是推理,scheduler的行为会有所不同。在推理期间,scheduler从噪声中生成图像。在训练期间,scheduler从扩散过程中的特定点获取模型输出或样本,并根据噪声时间表和更新规则(比如我们本系列第一张所说的step)将噪声应用于图像。(我们可以看到,遥感影像生成的结果还行,已经能明显的看清楚地表和枯草,甚至能够出现可看清的地裂缝!)
让我们看看DDPMScheduler
并使用add_noise
方法向之前的sample_image
添加一些随机噪声:
import torch
from PIL import Image
from diffusers import DDPMSchedulernoise_scheduler = DDPMScheduler(num_train_timesteps=1000)
noise = torch.randn(sample_image.shape)
timesteps = torch.LongTensor([50])
noisy_image = noise_scheduler.add_noise(sample_image, noise, timesteps)Image.fromarray(((noisy_image.permute(0, 2, 3, 1) + 1.0) * 127.5).type(torch.uint8).numpy()[0])
模型的训练目标是预测添加到图像中的噪声。该步骤的损失可以通过以下方式计算,这里官方教程使用的是mse损失函数:
import torch.nn.functional as Fnoise_pred = model(noisy_image, timesteps).sample
loss = F.mse_loss(noise_pred, noise)
6、训练模型
到目前为止,已经有了开始训练模型的大部分部分,剩下的就是把所有东西放在一起。 首先,您需要一个优化器和一个学习率调度器:
from diffusers.optimization import get_cosine_schedule_with_warmupoptimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate)
lr_scheduler = get_cosine_schedule_with_warmup(optimizer=optimizer,num_warmup_steps=config.lr_warmup_steps,num_training_steps=(len(train_dataloader) * config.num_epochs),
)
然后,您需要一种评估模型的方法。对于评估,您可以使用DDPMPipeline
生成一批示例图像并将其保存为网格格式(官方输出为格网,大家也可自行修改为单张保存):
from diffusers import DDPMPipeline
import math
import osdef make_grid(images, rows, cols):w, h = images[0].sizegrid = Image.new("RGB", size=(cols * w, rows * h))for i, image in enumerate(images):grid.paste(image, box=(i % cols * w, i // cols * h))return griddef evaluate(config, epoch, pipeline):# Sample some images from random noise (this is the backward diffusion process).# The default pipeline output type is `List[PIL.Image]`images = pipeline(batch_size=config.eval_batch_size,generator=torch.manual_seed(config.seed),).images# Make a grid out of the imagesimage_grid = make_grid(images, rows=2, cols=3)# Save the imagestest_dir = os.path.join(config.output_dir, "samples")os.makedirs(test_dir, exist_ok=True)image_grid.save(f"{test_dir}/{epoch + 1:04d}.png")
现在,可以使用🤗Accelerate将所有这些组件包装在一个训练循环中,以便于TensorBoard日志记录、梯度累积和混合精度训练。
def train_loop(config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler):# Initialize accelerator and tensorboard loggingaccelerator = Accelerator(mixed_precision=config.mixed_precision,gradient_accumulation_steps=config.gradient_accumulation_steps,log_with="tensorboard",project_dir=os.path.join(config.output_dir, "logs"),)if accelerator.is_main_process:if config.push_to_hub:repo_name = get_full_repo_name(Path(config.output_dir).name)repo = Repository(config.output_dir, clone_from=repo_name)elif config.output_dir is not None:os.makedirs(config.output_dir, exist_ok=True)accelerator.init_trackers("train_example")# Prepare everything# There is no specific order to remember, you just need to unpack the# objects in the same order you gave them to the prepare method.model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(model, optimizer, train_dataloader, lr_scheduler)global_step = 0# Now you train the modelfor epoch in range(config.num_epochs):progress_bar = tqdm(total=len(train_dataloader), disable=not accelerator.is_local_main_process)progress_bar.set_description(f"Epoch {epoch + 1}")for step, batch in enumerate(train_dataloader):clean_images = batch["images"]# Sample noise to add to the imagesnoise = torch.randn(clean_images.shape).to(clean_images.device)bs = clean_images.shape[0]# Sample a random timestep for each imagetimesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (bs,), device=clean_images.device).long()# Add noise to the clean images according to the noise magnitude at each timestep# (this is the forward diffusion process)noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)with accelerator.accumulate(model):# Predict the noise residualnoise_pred = model(noisy_images, timesteps, return_dict=False)[0]loss = F.mse_loss(noise_pred, noise)accelerator.backward(loss)accelerator.clip_grad_norm_(model.parameters(), 1.0)optimizer.step()lr_scheduler.step()optimizer.zero_grad()progress_bar.update(1)logs = {"loss": loss.detach().item(), "lr": lr_scheduler.get_last_lr()[0], "step": global_step}progress_bar.set_postfix(**logs)accelerator.log(logs, step=global_step)global_step += 1# After each epoch you optionally sample some demo images with evaluate() and save the modelif accelerator.is_main_process:pipeline = DDPMPipeline(unet=accelerator.unwrap_model(model), scheduler=noise_scheduler)if (epoch + 1) % config.save_image_epochs == 0 or epoch == config.num_epochs - 1:print(f'----------------------------------------------------- Evaluate Iter [{(epoch + 1) // config.save_image_epochs}] ------------------------------------------------------------------')evaluate(config, epoch, pipeline)if (epoch + 1) % config.save_model_epochs == 0 or epoch == config.num_epochs - 1:pipeline.save_pretrained(config.output_dir)
接下来使用🤗Accelerate的notebook_launcher
函数启动训练了。将训练循环、所有训练参数和进程数(可以将此值更改为可用于训练的GPU数)传递给该函数:
from accelerate import notebook_launcherargs = (config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler)notebook_launcher(train_loop, args, num_processes=1)
训练完成后,看看扩散模型生成的最终🦋图像(🦋我隔10个epoch生成一次,在下面给大家瞅瞅)和遥感影像(因为我电脑的原因,遥感影像跑了一半停了,不过也保存了一些,感慨一下,扩散模型太吃显存了,比之前跑分割检测啥的更加依赖,可能是我图像整的太大了,之后裁小一点试一试,感觉生成模型用于遥感领域,又困难,也有无限可能!这只是一个简单的扩散生成示例模型,还得再深入研究研究,以后再和大家分享其他更新又有意思的生成模型。
import globsample_images = sorted(glob.glob(f"{config.output_dir}/samples/*.png"))
Image.open(sample_images[-1])