Pytorch实现图片异常检测

图片异常检测

异常检测指的是在正常的图片中找到异常的数据,由于无法通过规则进行识别判断,这样的应用场景通常都是需要人工进行识别,比如残次品的识别,图片异常识别模型的目标是可以代替或者辅助人工进行识别异常图片。

AnoGAN 模型

由于正常图片的数据量远大于异常图片,可能只有 1/100 的图片是异常图片,甚至更小。通过图片分类模型很难实现异常图片的识别,因为无法找到足够的异常数据进行训练。因此,只能通过正常图片去构建异常检测模型。如何通过正常的图片实现检测异常图片的模型,可以使用之前用的对抗网络,通过识别网络进行检测,图片是正常数据还是伪造数据。AnoGAN 模型是用于识别异常图片的模型,如果只用GAN 模型中的识别网络进行判别,效果并不好,所以 AnoGAN 网络不光依靠识别网络,生成网络在其中也发挥重要的作用。

对于AnoGAN,对于输入的数据,AnoGAN 网络首先会对图片生成噪声 Z。通过噪声 Z 输入生成网络生成可以被识别的图片,如果训练集中不存在这样的图片,例如异常图片,那么生成网络是无法生成的,这类图片就是异常图片。

噪声 Z 的生成方式,初始状态噪声是随机生成的,随后噪声通过网络生成图片,把生成的图片训练集作比较,比较的方式是通过像素差值的绝对值求和,最后算出损失值,最后通过网络进行训练以减少损失值。

上述的这种损失值在AutoGen 中被称为 Residual Loss,如果只有 Residual Loss,模型效果有限。所以,AnoGAN 这里也利用了判别网络,将测试图像和生成图像输入到判别网络,并对判别网络的输出特征进行差值计算,这个差值称为 Discrimination loss。最后通过 Discrimination Loss 和 Residual Loss 合并组成损失函数。

数据准备
import os
import urllib.request
import zipfile
import tarfileimport matplotlib.pyplot as plt
%matplotlib inline
from PIL import Image
import numpy as np#不存在“data”文件夹时创建
data_dir = "./data/"
if not os.path.exists(data_dir):os.mkdir(data_dir)import sklearn
# 下载并读取MNIST的手写数字图像。
from sklearn.datasets import fetch_openmlmnist = fetch_openml('mnist_784', version=1, data_home="./data/")  #data_home指定保存地址。# 数据的提取
X = mnist.data
y = mnist.target# 将MNIST的第一个数据可视化
plt.imshow(np.array(X.iloc[0]).reshape(28, 28), cmap='gray')
print("这个图像数据的标签是{}".format(y[0]))#在文件夹“data”下创建文件夹“img_78”
data_dir_path = "./data/img_78/"
if not os.path.exists(data_dir_path):os.mkdir(data_dir_path)#从MNIST将数字7、8的图像作为图像保存到“img_78”文件夹中
count7=0
count8=0
max_num=200  # 每制作200张图片for i in range(len(X)):# 图像7的制作if (y[i] == "7") and (count7<max_num):file_path="./data/img_78/img_7_"+str(count7)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28×28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC)  # 扩大到64×64pil_img_f.save(file_path)  # 保存count7+=1 #图像8的制作if (y[i] == "8") and (count8<max_num):file_path="./data/img_78/img_8_"+str(count8)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28*28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC)  # 扩大到64×64pil_img_f.save(file_path)  # 保存count8+=1# 制作200张7和8之后,breakif (count7>=max_num) and (count8>=max_num):break# 在文件夹“data”下面创建文件夹“test”
data_dir_path = "./data/test/"
if not os.path.exists(data_dir_path):os.mkdir(data_dir_path)# 在上述制作7,8图像时使用的index的最终值
i_start = i+1
print(i_start)# 从MNIST将数字7、8的图像作为图像保存到“img_78”文件夹中
count2=0
count7=0
count8=0
max_num=5  #每制作五张图片for i in range(i_start,len(X)):  # 从i_start开始#图像2的制作if (y[i] == "2") and (count2<max_num):file_path="./data/test/img_2_"+str(count2)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28×28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC)  # 扩大到64×64pil_img_f.save(file_path)  # 保存count2+=1# 图像7的制作if (y[i] == "7") and (count7<max_num):file_path="./data/test/img_7_"+str(count7)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  #将图像变形为28×28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC)  # 6扩大到64×64pil_img_f.save(file_path)  # 保存count7+=1 # 图像8的制作if (y[i] == "8") and (count8<max_num):file_path="./data/test/img_8_"+str(count8)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28*28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f = pil_img_f.resize((64, 64), Image.BICUBIC)  # 扩大到64×64pil_img_f.save(file_path)  # 保存count8+=1 # 在文件夹“data”下创建文件夹“img_78_28size”
data_dir_path = "./data/img_78_28size/"
if not os.path.exists(data_dir_path):os.mkdir(data_dir_path)# 从MNIST将数字7、8的图像作为图像保存到“img_78_28size”文件夹中
count7=0
count8=0
max_num=200  # 每制作200张图片for i in range(len(X)):# 图像7的制作if (y[i] == "7") and (count7<max_num):file_path="./data/img_78_28size/img_7_"+str(count7)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28×28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f.save(file_path)  # 保存count7+=1 # 图像8的制作if (y[i] == "8") and (count8<max_num):file_path="./data/img_78_28size/img_8_"+str(count8)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28*28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 画像变成PILpil_img_f.save(file_path)  # 保存count8+=1if (count7>=max_num) and (count8>=max_num):break# 在文件夹“data”下面创建文件夹“test”
data_dir_path = "./data/test_28size/"
if not os.path.exists(data_dir_path):os.mkdir(data_dir_path)# 在上述制作7,8图像时使用的index的最终值
i_start = i+1
print(i_start)# 从MNIST将数字7、8的图像作为图像保存到“img_78”文件夹中
count2=0
count7=0
count8=0
max_num=5  # 每制作五张图片for i in range(i_start,len(X)):  #从i_start开始# 图像2的制作if (y[i] == "2") and (count2<max_num):file_path="./data/test_28size/img_2_"+str(count2)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28×28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f.save(file_path)  # 保存count2+=1 # 画像7的制作if (y[i] == "7") and (count7<max_num):file_path="./data/test_28size/img_7_"+str(count7)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28×28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f.save(file_path)  # 保存count7+=1 # 图像8的制作if (y[i] == "8") and (count8<max_num):file_path="./data/test_28size/img_8_"+str(count8)+".jpg"im_f=(np.array(X.iloc[i]).reshape(28, 28))  # 将图像变形为28*28pil_img_f = Image.fromarray(im_f.astype(np.uint8))  # 把图像变成PILpil_img_f.save(file_path)  # 保存count8+=1 
AnoGAN 实现

AnoGAN 网络实现以及训练、验证

# 导入软件包
import random
import math
import time
import pandas as pd
import numpy as np
from PIL import Imageimport torch
import torch.utils.data as data
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optimfrom torchvision import transforms# Setup seeds
torch.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)class Generator(nn.Module):def __init__(self, z_dim=20, image_size=64):super(Generator, self).__init__()self.layer1 = nn.Sequential(nn.ConvTranspose2d(z_dim, image_size * 8,kernel_size=4, stride=1),nn.BatchNorm2d(image_size * 8),nn.ReLU(inplace=True))self.layer2 = nn.Sequential(nn.ConvTranspose2d(image_size * 8, image_size * 4,kernel_size=4, stride=2, padding=1),nn.BatchNorm2d(image_size * 4),nn.ReLU(inplace=True))self.layer3 = nn.Sequential(nn.ConvTranspose2d(image_size * 4, image_size * 2,kernel_size=4, stride=2, padding=1),nn.BatchNorm2d(image_size * 2),nn.ReLU(inplace=True))self.layer4 = nn.Sequential(nn.ConvTranspose2d(image_size * 2, image_size,kernel_size=4, stride=2, padding=1),nn.BatchNorm2d(image_size),nn.ReLU(inplace=True))self.last = nn.Sequential(nn.ConvTranspose2d(image_size, 1, kernel_size=4,stride=2, padding=1),nn.Tanh())#注意 :由于是黑白图像,因此输出通道数量为1def forward(self, z):out = self.layer1(z)out = self.layer2(out)out = self.layer3(out)out = self.layer4(out)out = self.last(out)return out# 动作确认
import matplotlib.pyplot as plt
%matplotlib inlineG = Generator(z_dim=20, image_size=64)# 输入的随机数
input_z = torch.randn(1, 20)# 将张量尺寸变形为(1,20,1,1)
input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1)# 输出假图像
fake_images = G(input_z)img_transformed = fake_images[0][0].detach().numpy()
plt.imshow(img_transformed, 'gray')
plt.show()class Discriminator(nn.Module):def __init__(self, z_dim=20, image_size=64):super(Discriminator, self).__init__()self.layer1 = nn.Sequential(nn.Conv2d(1, image_size, kernel_size=4,stride=2, padding=1),nn.LeakyReLU(0.1, inplace=True))#注意 :由于是黑白图像,因此输出通道数量为1self.layer2 = nn.Sequential(nn.Conv2d(image_size, image_size*2, kernel_size=4,stride=2, padding=1),nn.LeakyReLU(0.1, inplace=True))self.layer3 = nn.Sequential(nn.Conv2d(image_size*2, image_size*4, kernel_size=4,stride=2, padding=1),nn.LeakyReLU(0.1, inplace=True))self.layer4 = nn.Sequential(nn.Conv2d(image_size*4, image_size*8, kernel_size=4,stride=2, padding=1),nn.LeakyReLU(0.1, inplace=True))self.last = nn.Conv2d(image_size*8, 1, kernel_size=4, stride=1)def forward(self, x):out = self.layer1(x)out = self.layer2(out)out = self.layer3(out)out = self.layer4(out)feature = out  #最后将通道集中到一个特征量中feature = feature.view(feature.size()[0], -1)  #转换为二维out = self.last(out)return out, feature# 确认程序执
D = Discriminator(z_dim=20, image_size=64)#生成伪造图像
input_z = torch.randn(1, 20)
input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1)
fake_images = G(input_z)#将伪造的图像输入判别器D中
d_out = D(fake_images)# 将输出值d_out乘以Sigmoid函数,将其转换成0~1的值
print(nn.Sigmoid()(d_out[0]))# feature
print(d_out[1].shape)def make_datapath_list():"""制作用于学习、验证的图像数据和标注数据的文件路径表。 """train_img_list = list()  # 保存图像文件路径for img_idx in range(200):img_path = "./data/img_78/img_7_" + str(img_idx)+'.jpg'train_img_list.append(img_path)img_path = "./data/img_78/img_8_" + str(img_idx)+'.jpg'train_img_list.append(img_path)return train_img_listclass ImageTransform():"""图像的预处理类"""def __init__(self, mean, std):self.data_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize(mean, std)])def __call__(self, img):return self.data_transform(img)
class GAN_Img_Dataset(data.Dataset):"""图像的Dataset类。继承PyTorch的Dataset类"""def __init__(self, file_list, transform):self.file_list = file_listself.transform = transformdef __len__(self):'''返回图像的张数'''return len(self.file_list)def __getitem__(self, index):'''获取经过预处理后的图像的张量格式的数据'''img_path = self.file_list[index]img = Image.open(img_path)  # [高][宽]黑白#图像的预处理img_transformed = self.transform(img)return img_transformed#创建DataLoader并确认操作#创建文件列表
train_img_list=make_datapath_list()# 创建Dataset
mean = (0.5,)
std = (0.5,)
train_dataset = GAN_Img_Dataset(file_list=train_img_list, transform=ImageTransform(mean, std))# 创建DataLoader
batch_size = 64train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)#确认执行结果
batch_iterator = iter(train_dataloader)  # 转换成迭代器
imges = next(batch_iterator)   #取出位于第一位的元素
print(imges.size())  # torch.Size([64, 1, 64, 64])#网络的初始化
def weights_init(m):classname = m.__class__.__name__if classname.find('Conv') != -1:#Conv2d和ConvTranspose2d的初始化nn.init.normal_(m.weight.data, 0.0, 0.02)nn.init.constant_(m.bias.data, 0)elif classname.find('BatchNorm') != -1:# BatchNorm2d的初始化nn.init.normal_(m.weight.data, 1.0, 0.02)nn.init.constant_(m.bias.data, 0)# 初始化的实施
G.apply(weights_init)
D.apply(weights_init)print("网络已经成功地完成了初始化")# 创建一个函数来学习模型def train_model(G, D, dataloader, num_epochs):#确认是否可以使用GPUdevice = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")print("使用设备:", device)# 优化方法的设定g_lr, d_lr = 0.0001, 0.0004beta1, beta2 = 0.0, 0.9g_optimizer = torch.optim.Adam(G.parameters(), g_lr, [beta1, beta2])d_optimizer = torch.optim.Adam(D.parameters(), d_lr, [beta1, beta2])# 定义误差函数criterion = nn.BCEWithLogitsLoss(reduction='mean')# 使用硬编码的参数z_dim = 20mini_batch_size = 64#将网络变成GPUG.to(device)D.to(device)G.train()  # 将模型转换为训练模式D.train()  # 将模型转换为训练模式#如果网络相对固定,则开启加速torch.backends.cudnn.benchmark = True# 图像的张数num_train_imgs = len(dataloader.dataset)batch_size = dataloader.batch_size# 设置了迭代计数器iteration = 1logs = []#epoch循环for epoch in range(num_epochs):# 保存开始时间t_epoch_start = time.time()epoch_g_loss = 0.0  # epoch损失总和epoch_d_loss = 0.0  # epoch损失总和print('-------------')print('Epoch {}/{}'.format(epoch, num_epochs))print('-------------')print('(train)')# 以minibatch为单位从数据加载器中读取数据的循环for imges in dataloader:# --------------------# 1. 判别器D的学习# --------------------# 如果小批次的尺寸设置为1,会导致批次归一化处理产生错误,因此需要避免if imges.size()[0] == 1:continue#如果能使用GPU,则将数据送入GPU中imges = imges.to(device)#创建正确答案标签和伪造数据标签#在epoch最后的迭代中,小批次的数量会减少mini_batch_size = imges.size()[0]label_real = torch.full((mini_batch_size,), 1).to(device)label_fake = torch.full((mini_batch_size,), 0).to(device)#对真正的图像进行判定d_out_real, _ = D(imges)#生成伪造图像并进行判定input_z = torch.randn(mini_batch_size, z_dim).to(device)input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1)fake_images = G(input_z)d_out_fake, _ = D(fake_images)#计算误差d_loss_real = criterion(d_out_real.view(-1), label_real.float())d_loss_fake = criterion(d_out_fake.view(-1), label_fake.float())d_loss = d_loss_real + d_loss_fake#反向传播处理g_optimizer.zero_grad()d_optimizer.zero_grad()d_loss.backward()d_optimizer.step()# --------------------# 2.生成器G的学习# --------------------#生成伪造图像并进行判定input_z = torch.randn(mini_batch_size, z_dim).to(device)input_z = input_z.view(input_z.size(0), input_z.size(1), 1, 1)fake_images = G(input_z)d_out_fake, _ = D(fake_images)#计算误差g_loss = criterion(d_out_fake.view(-1), label_real.float())#反向传播处理g_optimizer.zero_grad()d_optimizer.zero_grad()g_loss.backward()g_optimizer.step()# --------------------# 3. 记录结果# --------------------epoch_d_loss += d_loss.item()epoch_g_loss += g_loss.item()iteration += 1#epoch的每个phase的loss和准确率t_epoch_finish = time.time()print('-------------')print('epoch {} || Epoch_D_Loss:{:.4f} ||Epoch_G_Loss:{:.4f}'.format(epoch, epoch_d_loss/batch_size, epoch_g_loss/batch_size))print('timer:  {:.4f} sec.'.format(t_epoch_finish - t_epoch_start))t_epoch_start = time.time()print("总迭代次数:", iteration)return G, D# 进行训练和验证
num_epochs = 300
G_update, D_update = train_model(G, D, dataloader=train_dataloader, num_epochs=num_epochs)# 将生成图像和训练数据可视化device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")# 输入的随机数生成
batch_size = 8
z_dim = 20
fixed_z = torch.randn(batch_size, z_dim)
fixed_z = fixed_z.view(fixed_z.size(0), fixed_z.size(1), 1, 1)
fake_images = G_update(fixed_z.to(device))# 训练数据
batch_iterator = iter(train_dataloader) #转换成迭代器
imges = next(batch_iterator) #提取第1个要素#图像的可视化处理
fig = plt.figure(figsize=(15, 6))
for i in range(0, 5):#上层显示测试图像plt.subplot(2, 5, i+1)plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray')#下层显示生成图像plt.subplot(2, 5, 5+i+1)plt.imshow(fake_images[i][0].cpu().detach().numpy(), 'gray')def Anomaly_score(x, fake_img, D, Lambda=0.1):#求测试图像x和生成图像fake_img的像素级差的绝对值,并对每个迷你批求和residual_loss = torch.abs(x-fake_img)residual_loss = residual_loss.view(residual_loss.size()[0], -1)residual_loss = torch.sum(residual_loss, dim=1)# 将测试图像x和生成图像fake_img输入到识别器D,取出特征量_, x_feature = D(x)_, G_feature = D(fake_img)# 求测试图像x和生成图像fake_img的特征量之差的绝对值,对每个迷你批次求和discrimination_loss = torch.abs(x_feature-G_feature)discrimination_loss = discrimination_loss.view(discrimination_loss.size()[0], -1)discrimination_loss = torch.sum(discrimination_loss, dim=1)# 将两种损失对每个迷你批进行加法运算loss_each = (1-Lambda)*residual_loss + Lambda*discrimination_loss#求迷你批的全部损失total_loss = torch.sum(loss_each)return total_loss, loss_each, residual_loss# 创建测试用的DataLoaderdef make_test_datapath_list():"""制作用于学习、验证的图像数据和标注数据的文件路径表。 """train_img_list = list()  # 保存图像文件路径for img_idx in range(5):img_path = "./data/test/img_7_" + str(img_idx)+'.jpg'train_img_list.append(img_path)img_path = "./data/test/img_8_" + str(img_idx)+'.jpg'train_img_list.append(img_path)img_path = "./data/test/img_2_" + str(img_idx)+'.jpg'train_img_list.append(img_path)return train_img_list# 制作文件列表
test_img_list = make_test_datapath_list()# 制作Dataset
mean = (0.5,)
std = (0.5,)
test_dataset = GAN_Img_Dataset(file_list=test_img_list, transform=ImageTransform(mean, std))# 制作DataLoader
batch_size = 5test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)# 测试数据的确认
batch_iterator = iter(test_dataloader)  # 转换成迭代器
imges = next(batch_iterator)  # 取出第一个迷你批次fig = plt.figure(figsize=(15, 6))
for i in range(0, 5):plt.subplot(2, 5, i+1)plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray')# 想检测异常的图像
x = imges[0:5]
x = x.to(device)# 用于生成想要异常检测的图像的初始随机数
z = torch.randn(5, 20).to(device)
z = z.view(z.size(0), z.size(1), 1, 1)# 変将requires_grad设为True,使得变量z可以求导数
z.requires_grad = True#求z的优化函数,以便能够更新变量z
z_optimizer = torch.optim.Adam([z], lr=1e-3)#求z
for epoch in range(5000+1):fake_img = G_update(z)loss, _, _ = Anomaly_score(x, fake_img, D_update, Lambda=0.1)z_optimizer.zero_grad()loss.backward()z_optimizer.step()if epoch % 1000 == 0:print('epoch {} || loss_total:{:.0f} '.format(epoch, loss.item()))# 生成图像
fake_img = G_update(z)# 要求损失
loss, loss_each, residual_loss_each = Anomaly_score(x, fake_img, D_update, Lambda=0.1)#损失的计算总损失
loss_each = loss_each.cpu().detach().numpy()
print("total loss:", np.round(loss_each, 0))# 图像可视化
fig = plt.figure(figsize=(15, 6))
for i in range(0, 5):# 把测试数据放在上层plt.subplot(2, 5, i+1)plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray')# 在下层显示生成数据plt.subplot(2, 5, 5+i+1)plt.imshow(fake_img[i][0].cpu().detach().numpy(), 'gray')

可以看 2 的损失值最高,由此可判断 2 为异常图片。
在这里插入图片描述

Efficient GAN

AnoGAN 模型中,最重要的是 z 的取值,对z 的取值也有新的方法,其中一种就是 Efficient GAN,它优化了z 值的更新和学习时间。Efficient GAN是通过编码器的方式来对 z 值进行计算,Encoder通过 BiGAN 机制将图像于其关联在一起。

Efficient GAN 实现

通过 Efficient GAN 实现网络,并进行训练和验证。

# 导入软件包
import random
import math
import time
import pandas as pd
import numpy as np
from PIL import Imageimport torch
import torch.utils.data as data
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optimfrom torchvision import transforms# Setup seeds
torch.manual_seed(1234)
torch.cuda.manual_seed(1234)
np.random.seed(1234)
random.seed(1234)class Generator(nn.Module):def __init__(self, z_dim=20):super(Generator, self).__init__()self.layer1 = nn.Sequential(nn.Linear(z_dim, 1024),nn.BatchNorm1d(1024),nn.ReLU(inplace=True))self.layer2 = nn.Sequential(nn.Linear(1024, 7*7*128),nn.BatchNorm1d(7*7*128),nn.ReLU(inplace=True))self.layer3 = nn.Sequential(nn.ConvTranspose2d(in_channels=128, out_channels=64,kernel_size=4, stride=2, padding=1),nn.BatchNorm2d(64),nn.ReLU(inplace=True))self.last = nn.Sequential(nn.ConvTranspose2d(in_channels=64, out_channels=1,kernel_size=4, stride=2, padding=1),nn.Tanh())#注意 :由于是黑白图像,因此输出通道数量为 1def forward(self, z):out = self.layer1(z)out = self.layer2(out)#为了能置入卷积层中,需要对张量进行变形out = out.view(z.shape[0], 128, 7, 7)out = self.layer3(out)out = self.last(out)return out#确认执行结果
import matplotlib.pyplot as plt
%matplotlib inlineG = Generator(z_dim=20)
G.train()#输入的随机数
#由于要进行批次归一化处理,因此将小批次数设置为 2 以上
input_z = torch.randn(2, 20)#输出伪造图像
fake_images = G(input_z)  # torch.Size([2, 1, 28, 28])
img_transformed = fake_images[0][0].detach().numpy()
plt.imshow(img_transformed, 'gray')
plt.show()class Discriminator(nn.Module):def __init__(self, z_dim=20):super(Discriminator, self).__init__()#图像这边的输入处理self.x_layer1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=4,stride=2, padding=1),nn.LeakyReLU(0.1, inplace=True))#注意 :由于是黑白图像,因此输入通道数量为 1self.x_layer2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=4,stride=2, padding=1),nn.BatchNorm2d(64),nn.LeakyReLU(0.1, inplace=True))#随机数这边的输入处理self.z_layer1 = nn.Linear(z_dim, 512)#最终的判定self.last1 = nn.Sequential(nn.Linear(3648, 1024),nn.LeakyReLU(0.1, inplace=True))self.last2 = nn.Linear(1024, 1)def forward(self, x, z):#图像这边的输入处理x_out = self.x_layer1(x)x_out = self.x_layer2(x_out)#随机数这边的输入处理z = z.view(z.shape[0], -1)z_out = self.z_layer1(z)#将x_out与z_out连接在一起,交给全连接层进行判定x_out = x_out.view(-1, 64 * 7 * 7)out = torch.cat([x_out, z_out], dim=1)out = self.last1(out)feature = out  #最后将通道集中到一个特征量中feature = feature.view(feature.size()[0], -1)   #转换为二维out = self.last2(out)return out, feature#确认执行结果
D = Discriminator(z_dim=20)#生成伪造图像
input_z = torch.randn(2, 20)
fake_images = G(input_z)#将伪造图像输入判定器D中
d_out, _ = D(fake_images, input_z)#将输出结果d_out乘以Sigmoid,以将其转换为0~1的值
print(nn.Sigmoid()(d_out))class Encoder(nn.Module):def __init__(self, z_dim=20):super(Encoder, self).__init__()self.layer1 = nn.Sequential(nn.Conv2d(1, 32, kernel_size=3,stride=1),nn.LeakyReLU(0.1, inplace=True))#把图像转换成zself.layer2 = nn.Sequential(nn.Conv2d(32, 64, kernel_size=3,stride=2, padding=1),nn.BatchNorm2d(64),nn.LeakyReLU(0.1, inplace=True))self.layer3 = nn.Sequential(nn.Conv2d(64, 128, kernel_size=3,stride=2, padding=1),nn.BatchNorm2d(128),nn.LeakyReLU(0.1, inplace=True))#到这里为止,图像的尺寸为7像素×7像素self.last = nn.Linear(128 * 7 * 7, z_dim)def forward(self, x):out = self.layer1(x)out = self.layer2(out)out = self.layer3(out)#为了能放入FC中,对张量进行变形out = out.view(-1, 128 * 7 * 7)out = self.last(out)return out#确认执行结果
E = Encoder(z_dim=20)#输入的图像数据
x = fake_images  #fake_images是由上面的生成器G生成的#将图像编码为z
z = E(x)print(z.shape)
print(z)def make_datapath_list():"""制作用于学习、验证的图像数据和标注数据的文件路径表。 """train_img_list = list()  # 保存图像文件路径for img_idx in range(200):img_path = "./data/img_78_28size/img_7_" + str(img_idx)+'.jpg'train_img_list.append(img_path)img_path = "./data/img_78_28size/img_8_" + str(img_idx)+'.jpg'train_img_list.append(img_path)return train_img_listclass ImageTransform():"""图像的预处理类"""def __init__(self, mean, std):self.data_transform = transforms.Compose([transforms.ToTensor(),transforms.Normalize(mean, std)])def __call__(self, img):return self.data_transform(img)class GAN_Img_Dataset(data.Dataset):"""图像的Dataset类。继承PyTorch的Dataset类"""def __init__(self, file_list, transform):self.file_list = file_listself.transform = transformdef __len__(self):'''返回图像的张数'''return len(self.file_list)def __getitem__(self, index):'''获取预处理图像的Tensor格式数据'''img_path = self.file_list[index]img = Image.open(img_path)  # [高][宽]黑白# 图像的预处理img_transformed = self.transform(img)return img_transformed# 创建DataLoader并确认操作#制作文件列表
train_img_list=make_datapath_list()# Datasetを作成
mean = (0.5,)
std = (0.5,)
train_dataset = GAN_Img_Dataset(file_list=train_img_list, transform=ImageTransform(mean, std))# 制作DataLoader
batch_size = 64train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)# 动作的确认
batch_iterator = iter(train_dataloader)  # 转换成迭代器
imges = next(batch_iterator)  # 找出第一个要素
print(imges.size())  # torch.Size([64, 1, 64, 64])#创建用于训练模型的函数def train_model(G, D, E, dataloader, num_epochs):#确认是否可以使用GPU加速device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")print("使用设备:", device)#设置最优化算法lr_ge = 0.0001lr_d = 0.0001/4beta1, beta2 = 0.5, 0.999g_optimizer = torch.optim.Adam(G.parameters(), lr_ge, [beta1, beta2])e_optimizer = torch.optim.Adam(E.parameters(), lr_ge, [beta1, beta2])d_optimizer = torch.optim.Adam(D.parameters(), lr_d, [beta1, beta2])#定义误差函数#BCEWithLogitsLoss是先将输入数据乘以Logistic,# 再计算二进制交叉熵criterion = nn.BCEWithLogitsLoss(reduction='mean')#对参数进行硬编码z_dim = 20mini_batch_size = 64#将网络载入GPU中G.to(device)E.to(device)D.to(device)G.train()  #将模型设置为训练模式E.train()  #将模型设置为训练模式D.train()  #将模型设置为训练模式#如果网络相对固定,则开启加速torch.backends.cudnn.benchmark = True#图像的张数num_train_imgs = len(dataloader.dataset)batch_size = dataloader.batch_size#设置迭代计数器iteration = 1logs = []# epoch循环for epoch in range(num_epochs):#保存开始时间t_epoch_start = time.time()epoch_g_loss = 0.0  #epoch的损失总和epoch_e_loss = 0.0  #epoch的损失总和epoch_d_loss = 0.0  #epoch的损失总和print('-------------')print('Epoch {}/{}'.format(epoch, num_epochs))print('-------------')print('(train)')#以minibatch为单位从数据加载器中读取数据的循环for imges in dataloader:#如果小批次的尺寸设置为1,会导致批次归一化处理产生错误,因此需要避免if imges.size()[0] == 1:continue#创建用于表示小批次尺寸为1和0的标签#创建正确答案标签和伪造数据标签#在epoch最后的迭代中,小批次的数量会减少mini_batch_size = imges.size()[0]label_real = torch.full((mini_batch_size,), 1).to(device)label_fake = torch.full((mini_batch_size,), 0).to(device)#如果能使用GPU,则将数据送入GPU中imges = imges.to(device)# --------------------# 1. 判别器D的学习# --------------------# 对真实的图像进行判定 z_out_real = E(imges)d_out_real, _ = D(imges, z_out_real)# 生成伪造图像并进行判定input_z = torch.randn(mini_batch_size, z_dim).to(device)fake_images = G(input_z)d_out_fake, _ = D(fake_images, input_z)#计算误差d_loss_real = criterion(d_out_real.view(-1), label_real.float())d_loss_fake = criterion(d_out_fake.view(-1), label_fake.float())d_loss = d_loss_real + d_loss_fake#反向传播d_optimizer.zero_grad()d_loss.backward()d_optimizer.step()# --------------------# 2. 生成器G的学习# --------------------#生成伪造图像并进行判定input_z = torch.randn(mini_batch_size, z_dim).to(device)fake_images = G(input_z)d_out_fake, _ = D(fake_images, input_z)#计算误差g_loss = criterion(d_out_fake.view(-1), label_real.float())#反向传播g_optimizer.zero_grad()g_loss.backward()g_optimizer.step()# --------------------# 3. 编码器E的学习# --------------------#对真实图像的z进行推定z_out_real = E(imges)d_out_real, _ = D(imges, z_out_real)#计算误差e_loss = criterion(d_out_real.view(-1), label_fake.float())#反向传播e_optimizer.zero_grad()e_loss.backward()e_optimizer.step()# --------------------#4.记录# --------------------epoch_d_loss += d_loss.item()epoch_g_loss += g_loss.item()epoch_e_loss += e_loss.item()iteration += 1#epoch的每个phase的loss和准确率t_epoch_finish = time.time()print('-------------')print('epoch {} || Epoch_D_Loss:{:.4f} ||Epoch_G_Loss:{:.4f} ||Epoch_E_Loss:{:.4f}'.format(epoch, epoch_d_loss/batch_size, epoch_g_loss/batch_size, epoch_e_loss/batch_size))print('timer:  {:.4f} sec.'.format(t_epoch_finish - t_epoch_start))t_epoch_start = time.time()print("总迭代次数:", iteration)return G, D, E#网络的初始化
def weights_init(m):classname = m.__class__.__name__if classname.find('Conv') != -1:#Conv2d和ConvTranspose2d的初始化nn.init.normal_(m.weight.data, 0.0, 0.02)nn.init.constant_(m.bias.data, 0)elif classname.find('BatchNorm') != -1:# BatchNorm2d的初始化nn.init.normal_(m.weight.data, 0.0, 0.02)nn.init.constant_(m.bias.data, 0)elif classname.find('Linear') != -1:#全连接层Linear的初始化m.bias.data.fill_(0)#开始初始化
G.apply(weights_init)
E.apply(weights_init)
D.apply(weights_init)print("网络已经成功地完成了初始化")# 进行训练和验证
num_epochs = 1500
G_update, D_update, E_update = train_model(G, D, E, dataloader=train_dataloader, num_epochs=num_epochs)#对生成图像与训练数据的可视化处理
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")#生成输入的随机数
batch_size = 8
z_dim = 20
fixed_z = torch.randn(batch_size, z_dim)
fake_images = G_update(fixed_z.to(device))#训练数据
batch_iterator = iter(train_dataloader)  #转换成迭代器
imges = next(batch_iterator)  #取出最开头的元素#输出
fig = plt.figure(figsize=(15, 6))
for i in range(0, 5):#在上层中显示训练数据plt.subplot(2, 5, i+1)plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray')#在下层中显示生成数据plt.subplot(2, 5, 5+i+1)plt.imshow(fake_images[i][0].cpu().detach().numpy(), 'gray')# ·制作测试用的Dataloaderdef make_test_datapath_list():"""制作用于学习、验证的图像数据和标注数据的文件路径表。 """train_img_list = list()  # ·保存图像文件路径for img_idx in range(5):img_path = "./data/test_28size/img_7_" + str(img_idx)+'.jpg'train_img_list.append(img_path)img_path = "./data/test_28size/img_8_" + str(img_idx)+'.jpg'train_img_list.append(img_path)img_path = "./data/test_28size/img_2_" + str(img_idx)+'.jpg'train_img_list.append(img_path)return train_img_list#制作文件列表
test_img_list = make_test_datapath_list()# 创建Dataset
mean = (0.5,)
std = (0.5,)
test_dataset = GAN_Img_Dataset(file_list=test_img_list, transform=ImageTransform(mean, std))# 制作DataLoader
batch_size = 5test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)#训练数据
batch_iterator = iter(test_dataloader)  #转换成迭代器
imges = next(batch_iterator)  #取出最开头的元素fig = plt.figure(figsize=(15, 6))
for i in range(0, 5):#在下层中显示生成数据plt.subplot(2, 5, i+1)plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray')def Anomaly_score(x, fake_img, z_out_real, D, Lambda=0.1):#计算测试图像x与生成图像fake_img在像素层次上的差值的绝对值,并以小批次为单位进行求和计算residual_loss = torch.abs(x-fake_img)residual_loss = residual_loss.view(residual_loss.size()[0], -1)residual_loss = torch.sum(residual_loss, dim=1)# 将测试图像x和生成图像fake_img输入判别器D中,并取出特征量图_, x_feature = D(x, z_out_real)_, G_feature = D(fake_img, z_out_real)# 计算测试图像x与生成图像fake_img的特征量的差的绝对值,并以小批次为单位进行求和计算discrimination_loss = torch.abs(x_feature-G_feature)discrimination_loss = discrimination_loss.view(discrimination_loss.size()[0], -1)discrimination_loss = torch.sum(discrimination_loss, dim=1)#将每个小批次中的两种损失相加loss_each = (1-Lambda)*residual_loss + Lambda*discrimination_loss#对所有批次中的损失进行计算total_loss = torch.sum(loss_each)return total_loss, loss_each, residual_loss#需要检测异常的图像
x = imges[0:5]
x = x.to(device)#对监督数据的图像进行编码,转换成z,再用生成器G生成图像
z_out_real = E_update(imges.to(device))
imges_reconstract = G_update(z_out_real)#计算损失值
loss, loss_each, residual_loss_each = Anomaly_score(x, imges_reconstract, z_out_real, D_update, Lambda=0.1)#计算损失值,损失总和
loss_each = loss_each.cpu().detach().numpy()
print("total loss:", np.round(loss_each, 0))#图像的可视化
fig = plt.figure(figsize=(15, 6))
for i in range(0, 5):#在上层中显示训练数据plt.subplot(2, 5, i+1)plt.imshow(imges[i][0].cpu().detach().numpy(), 'gray')#在下层中显示生成数据plt.subplot(2, 5, 5+i+1)plt.imshow(imges_reconstract[i][0].cpu().detach().numpy(), 'gray')

在这里插入图片描述

AnoGAN 模型可以进行异常图片的识别,这个例子比较简单,由于是单通道训练,随意模型训练比较快。如果是彩色图片,训练改时间会更久,在业务场景中可以调整阈值,例如Loss高于 250 为异常图片。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mzph.cn/diannao/7202.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

存储故障后oracle报—ORA-01122/ORA-01207故障处理---惜分飞

客户存储异常,通过硬件恢复解决存储故障之后,oracle数据库无法正常启动(存储cache丢失),尝试recover数据库报ORA-00283 ORA-01122 ORA-01110 ORA-01207错误 以前处理过比较类似的存储故障case:又一起存储故障导致ORA-00333 ORA-00312恢复存储故障,强制拉库报ORA-600 kcbzib_kcr…

零基础入门篇①② Python标准数据类型--数字

Python从入门到精通系列专栏面向零基础以及需要进阶的读者倾心打造,9.9元订阅即可享受付费专栏权益,一个专栏带你吃透Python,专栏分为零基础入门篇、模块篇、网络爬虫篇、Web开发篇、办公自动化篇、数据分析篇…学习不断,持续更新,火热订阅中🔥专栏订阅地址 👉Python从…

接口测试及常用的接口测试工具(Postman/Jmeter)

&#x1f345; 视频学习&#xff1a;文末有免费的配套视频可观看 &#x1f345; 点击文末小卡片 &#xff0c;免费获取软件测试全套资料&#xff0c;资料在手&#xff0c;涨薪更快 首先&#xff0c;什么是接口呢&#xff1f; 接口一般来说有两种&#xff0c;一种是程序内部的接…

[Flutter]创建一个私有包并使用

在Flutter中创建一个自己的私有组件&#xff08;通常称为包或库&#xff09;&#xff0c;并通过Dart的包管理工具pub进行使用。 一、创建一个新的Flutter包 1.使用命令行创建 使用Flutter命令行工具来创建一个新的包&#xff1a; $ flutter create --templatepackage my_pri…

软件公司为什么很少接二开项目?

前言 很多企业由于原有项目还在继续运营&#xff0c;但原有技术公司不想再合作或者不想再维持整个技术团队等原因&#xff0c;就需要找一个新的软件公司继续维护原有软件系统。但是一接触往往发现很多软件公司拒绝接手第三方的软件项目&#xff0c;这究竟是什么原因呢&#xff…

某东抢购某台脚本-低调

某东抢购某台脚本 小白操作-学习使用 注意&#xff1a; 本文部分变量已做脱敏处理&#xff0c;仅用于测试和学习研究&#xff0c;禁止用于商业用途&#xff0c;不能保证其合法性&#xff0c;准确性&#xff0c;完整性和有效性&#xff0c;请根据情况自行判断。技术层面需要提…

C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等的介绍

文章目录 前言一、为什么存在动态内存管理二、动态内存函数的介绍1. malloc函数2. 内存泄漏3. 动态内存开辟位置4. free函数5. calloc 函数6. realloc 函数7. realloc 传空指针 总结 前言 C语言动态内存管理malloc、calloc、realloc、free函数、内存泄漏、动态内存开辟的位置等…

JavaScript this 上下文深度探索:综合指南涵盖隐式与显式call、apply、bind、箭头函数、构造函数等用法于多样场景

JavaScript中的this关键字代表函数执行的上下文环境&#xff0c;核心在于确定函数内部访问的当前对象。它根据函数调用方式动态变化&#xff0c;对事件处理、对象方法调用等至关重要。通过.call(), .apply(), .bind()或箭头函数控制this&#xff0c;可确保代码逻辑正确绑定对象…

ROS 2边学边练(43)-- 利用GTest写一个基本测试(C++)

前言 在ROS&#xff08;Robot Operating System&#xff09;中&#xff0c;gtest&#xff08;Google Test&#xff09;是一个广泛使用的C测试框架&#xff0c;用于编写和执行单元测试。这些测试可以验证ROS节点、服务和消息等的正确性和性能。 如果我们需要在写的包中添加测试&…

[redis] redis为什么快

1. Redis与Memcached的区别 两者都是非关系型内存键值数据库&#xff0c;现在公司一般都是用 Redis 来实现缓存&#xff0c;而且 Redis 自身也越来越强大了&#xff01;Redis 与 Memcached 主要有以下不同&#xff1a; (1) memcached所有的值均是简单的字符串&#xff0c;red…

保持 Hiti 证卡打印机清洁的重要性和推荐的清洁用品

在证卡印刷业务中&#xff0c;保持印刷设备的清洁至关重要。特别是对于 Hiti 证卡打印机来说&#xff0c;它们是生产高质量证卡的关键工具。保持设备清洁不仅可以保证打印质量和效率&#xff0c;还可以延长其使用寿命。本文将探讨保持 Hiti 证卡打印机清洁卡的重要性&#xff0…

首届云原生编程挑战赛总决赛亚军比赛攻略(ONE PIECE团队)

关联比赛: 首届云原生编程挑战赛【复赛】实现一个 Serverless 计算服务调度系统 比赛攻略—ONE PIECE团队 代码链接&#xff1a; 初赛&#xff1a;GitHub - czy-gm/containerScheduler: 2020天池首届云原生编程挑战赛亚军-初赛赛道二&#xff08;实现规模化容器静态布局和动…

高项-案例分析万能答案(作业分享)

项目管理&#xff1a;每天进步一点点~ 活到老&#xff0c;学到老 ヾ(◍∇◍)&#xff89;&#xff9e; 何时学习都不晚&#xff0c;加油 一、通用问题原因: 1.项目经理管理经验不足&#xff0c;没有及时发现和解决xx方面的问题。 2.项目管理计划没有得到关键干系人的评审确…

yum常用命令与lrzsz的在线安装

yum命令 yum&#xff08; Yellow dog Updater, Modified&#xff09;是一个在 Fedora 和 RedHat 以及 SUSE 中的 Shell 前端软件包管理器。 基于 RPM 包管理&#xff0c;能够从指定的服务器自动下载 RPM 包并且安装&#xff0c;可以自动处理依赖性关系&#xff0c;并且一次安装…

php基础知识快速入门

一、PHP基本知识 1、php介绍&#xff1a; php是一种创建动态交互性的强有力的服务器脚本语言&#xff0c;PHP是开源免费的&#xff0c;并且使用广泛。PHP是解释性语言&#xff0c;按顺序从上往下执行&#xff0c;无需编译&#xff0c;直接运行。PHP脚本在服务器上运行。 2、ph…

动态规划(dp)(二)

按摩师 按摩师 1.状态表示 dp【i】表示&#xff1a;到i位置时&#xff0c;此时最长时长 继续细化&#xff1a;在i位置选预约&#xff0c;或不选预约 f【i】&#xff1a;到i位置时&#xff0c;nums【i】必选的&#xff0c;最长时长 g【i】&#xff1a;到i位置时&#xff0c…

仅为娱乐,Python中如何重定义True为False?

在Python中&#xff0c;True 和 False 是内建的布尔常量&#xff0c;分别代表逻辑上的真和假。它们是不可变的&#xff0c;且在Python语言规范中具有特殊地位&#xff0c;不能被用户直接重定义。尝试给 True 或 False 赋予新的值是违反Python语言规则的&#xff0c;这样的操作会…

JS基础:变量的详解

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 云桃桃&#xff0c;大专生&#xff0c;一枚程序媛&#xff0c;感谢关注。回复 “前端基础题”&#xff0c;可免费获得前端基础 100 题汇总&#xff0c;回复 “前端基础路线”&#xff0c;可获取…

零基础入门学习Python第二阶01生成式(推导式),数据结构

Python语言进阶 重要知识点 生成式&#xff08;推导式&#xff09;的用法 prices {AAPL: 191.88,GOOG: 1186.96,IBM: 149.24,ORCL: 48.44,ACN: 166.89,FB: 208.09,SYMC: 21.29}# 用股票价格大于100元的股票构造一个新的字典prices2 {key: value for key, value in prices.i…

Java的java.util.concurrent.ExecutorService简介

在Java并发编程的璀璨星空中&#xff0c;ExecutorService无疑是那颗最耀眼的明星。它不仅是Java并发编程的核心组件之一&#xff0c;更是构建高并发、高性能应用的秘密武器。今天&#xff0c;我们就来一场说走就走的探索之旅&#xff0c;揭开它的神秘面纱&#xff01; &#x1…