微调基于 torchvision 0.3的目标检测模型
"""
为数据集编写类
"""
import os
import numpy as np
import torch
from PIL import Imageclass PennFudanDataset ( object ) : def __init__ ( self, root, transforms) : self. root = rootself. transforms = transformsself. imgs = list ( sorted ( os. listdir( os. path. join( root, "PNGImages" ) ) ) ) self. masks = list ( sorted ( os. listdir( os. path. join( root, "PedMasks" ) ) ) ) def __getitem__ ( self, idx) : img_path = os. path. join( self. root, "PNGImages" , self. imgs[ idx] ) mask_path = os. path. join( self. root, "PedMasks" , self. masks[ idx] ) img = Image. open ( img_path) . convert( "RGB" ) mask = Image. open ( mask_path) mask = np. array( mask) obj_ids = np. unique( mask) obj_ids = obj_ids[ 1 : ] masks = mask == obj_ids[ : , None , None ] num_objs = len ( obj_ids) boxes = [ ] for i in range ( num_objs) : pos = np. where( masks[ i] ) xmin = np. min ( pos[ 1 ] ) xmax = np. max ( pos[ 1 ] ) ymin = np. min ( pos[ 0 ] ) ymax = np. max ( pos[ 0 ] ) boxes. append( [ xmin, ymin, xmax, ymax] ) boxes = torch. as_tensor( boxes, dtype= torch. float32) labels = torch. ones( ( num_objs, ) , dtype= torch. int64) masks = torch. as_tensor( masks, dtype= torch. uint8) image_id = torch. tensor( [ idx] ) area = ( boxes[ : , 3 ] - boxes[ : , 1 ] ) * ( boxes[ : , 2 ] - boxes[ : , 0 ] ) iscrowd = torch. zeros( ( num_objs, ) , dtype= torch. int64) target = { } target[ "boxes" ] = boxestarget[ "labels" ] = labelstarget[ "masks" ] = maskstarget[ "image_id" ] = image_idtarget[ "area" ] = areatarget[ "iscrowd" ] = iscrowdif self. transforms is not None : img, target = self. transforms( img, target) return img, targetdef __len__ ( self) : return len ( self. imgs) """
第一个是我们想要从预先训练的模型开始,然后微调最后一层。
另一种是当我们想要用不同的模型替换模型的主干时(例如,用于更快的预测)。
下面是对这两种情况的处理。
"""
"""
PennFudan 数据集的实例分割模型
"""
import torchvision
from torchvision. models. detection. faster_rcnn import FastRCNNPredictor
from torchvision. models. detection. mask_rcnn import MaskRCNNPredictordef get_model_instance_segmentation ( num_classes) : model = torchvision. models. detection. maskrcnn_resnet50_fpn( pretrained= True ) in_features = model. roi_heads. box_predictor. cls_score. in_featuresmodel. roi_heads. box_predictor = FastRCNNPredictor( in_features, num_classes) in_features_mask = model. roi_heads. mask_predictor. conv5_mask. in_channelshidden_layer = 256 model. roi_heads. mask_predictor = MaskRCNNPredictor( in_features_mask, hidden_layer, num_classes) return model"""
为数据扩充/转换编写辅助函数:
"""
import transforms as Tdef get_transform ( train) : transforms = [ ] transforms. append( T. ToTensor( ) ) if train: transforms. append( T. RandomHorizontalFlip( 0.5 ) ) return T. Compose( transforms) """
编写执行训练和验证的主要功能
"""
from engine import train_one_epoch, evaluate
import utilsdef main ( ) : device = torch. device( 'cuda' ) if torch. cuda. is_available( ) else torch. device( 'cpu' ) num_classes = 2 dataset = PennFudanDataset( 'PennFudanPed' , get_transform( train= True ) ) dataset_test = PennFudanDataset( 'PennFudanPed' , get_transform( train= False ) ) indices = torch. randperm( len ( dataset) ) . tolist( ) dataset = torch. utils. data. Subset( dataset, indices[ : - 50 ] ) dataset_test = torch. utils. data. Subset( dataset_test, indices[ - 50 : ] ) data_loader = torch. utils. data. DataLoader( dataset, batch_size= 2 , shuffle= True , num_workers= 4 , collate_fn= utils. collate_fn) data_loader_test = torch. utils. data. DataLoader( dataset_test, batch_size= 1 , shuffle= False , num_workers= 4 , collate_fn= utils. collate_fn) model = get_model_instance_segmentation( num_classes) model. to( device) params = [ p for p in model. parameters( ) if p. requires_grad] optimizer = torch. optim. SGD( params, lr= 0.005 , momentum= 0.9 , weight_decay= 0.0005 ) lr_scheduler = torch. optim. lr_scheduler. StepLR( optimizer, step_size= 3 , gamma= 0.1 ) num_epochs = 10 for epoch in range ( num_epochs) : train_one_epoch( model, optimizer, data_loader, device, epoch, print_freq= 10 ) lr_scheduler. step( ) evaluate( model, data_loader_test, device= device) print ( "That's it!" )
微调 Torchvision 模型
from __future__ import print_function
from __future__ import division
import torch
import torch. nn as nn
import torch. optim as optim
import numpy as np
import torchvision
from torchvision import datasets, models, transforms
import matplotlib. pyplot as plt
import time
import os
import copy"""
输入
"""
data_dir = "./data/hymenoptera_data"
model_name = "squeezenet"
num_classes = 2
batch_size = 8
num_epochs = 15
feature_extract = True """
辅助函数
"""
def train_model ( model, dataloaders, criterion, optimizer, num_epochs= 25 , is_inception= False ) : since = time. time( ) val_acc_history = [ ] best_model_wts = copy. deepcopy( model. state_dict( ) ) best_acc = 0.0 for epoch in range ( num_epochs) : print ( 'Epoch {}/{}' . format ( epoch, num_epochs - 1 ) ) print ( '-' * 10 ) for phase in [ 'train' , 'val' ] : if phase == 'train' : model. train( ) else : model. eval ( ) running_loss = 0.0 running_corrects = 0 for inputs, labels in dataloaders[ phase] : inputs = inputs. to( device) labels = labels. to( device) optimizer. zero_grad( ) with torch. set_grad_enabled( phase == 'train' ) : if is_inception and phase == 'train' : outputs, aux_outputs = model( inputs) loss1 = criterion( outputs, labels) loss2 = criterion( aux_outputs, labels) loss = loss1 + 0.4 * loss2else : outputs = model( inputs) loss = criterion( outputs, labels) _, preds = torch. max ( outputs, 1 ) if phase == 'train' : loss. backward( ) optimizer. step( ) running_loss += loss. item( ) * inputs. size( 0 ) running_corrects += torch. sum ( preds == labels. data) epoch_loss = running_loss / len ( dataloaders[ phase] . dataset) epoch_acc = running_corrects. double( ) / len ( dataloaders[ phase] . dataset) print ( '{} Loss: {:.4f} Acc: {:.4f}' . format ( phase, epoch_loss, epoch_acc) ) if phase == 'val' and epoch_acc > best_acc: best_acc = epoch_accbest_model_wts = copy. deepcopy( model. state_dict( ) ) if phase == 'val' : val_acc_history. append( epoch_acc) print ( ) time_elapsed = time. time( ) - sinceprint ( 'Training complete in {:.0f}m {:.0f}s' . format ( time_elapsed // 60 , time_elapsed % 60 ) ) print ( 'Best val Acc: {:4f}' . format ( best_acc) ) model. load_state_dict( best_model_wts) return model, val_acc_historydef set_parameter_requires_grad ( model, feature_extracting) : if feature_extracting: for param in model. parameters( ) : param. requires_grad = False """
初始化和重塑网络
当进行特征提取时,我们只想更新最后一层的参数,换句话说,我们只想更新我们正在重塑层的参数。
因此,我们不需要计算不需要改变 的参数的梯度,因此为了提高效率,我们将其它层的.requires_grad属性设置为False。
这很重要,因为默认情况下,此属性设置为True。 然后,当我们初始化新层时,默认情况下新参数.requires_grad = True,因此只更新新层的参数。
当我们进行微调时,我们可以将所有 .required_grad设置为默认值True。
"""
"""
重塑代码
""" def initialize_model ( model_name, num_classes, feature_extract, use_pretrained= True ) : model_ft = None input_size = 0 if model_name == "resnet" : """ Resnet18""" model_ft = models. resnet18( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. fc. in_featuresmodel_ft. fc = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "alexnet" : """ Alexnet""" model_ft = models. alexnet( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. classifier[ 6 ] . in_featuresmodel_ft. classifier[ 6 ] = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "vgg" : """ VGG11_bn""" model_ft = models. vgg11_bn( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. classifier[ 6 ] . in_featuresmodel_ft. classifier[ 6 ] = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "squeezenet" : """ Squeezenet""" model_ft = models. squeezenet1_0( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) model_ft. classifier[ 1 ] = nn. Conv2d( 512 , num_classes, kernel_size= ( 1 , 1 ) , stride= ( 1 , 1 ) ) model_ft. num_classes = num_classesinput_size = 224 elif model_name == "densenet" : """ Densenet""" model_ft = models. densenet121( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. classifier. in_featuresmodel_ft. classifier = nn. Linear( num_ftrs, num_classes) input_size = 224 elif model_name == "inception" : """ Inception v3Be careful, expects (299,299) sized images and has auxiliary output""" model_ft = models. inception_v3( pretrained= use_pretrained) set_parameter_requires_grad( model_ft, feature_extract) num_ftrs = model_ft. AuxLogits. fc. in_featuresmodel_ft. AuxLogits. fc = nn. Linear( num_ftrs, num_classes) num_ftrs = model_ft. fc. in_featuresmodel_ft. fc = nn. Linear( num_ftrs, num_classes) input_size = 299 else : print ( "Invalid model name, exiting..." ) exit( ) return model_ft, input_size
model_ft, input_size = initialize_model( model_name, num_classes, feature_extract, use_pretrained= True )
print ( model_ft) """
加载模型
"""
data_transforms = { 'train' : transforms. Compose( [ transforms. RandomResizedCrop( input_size) , transforms. RandomHorizontalFlip( ) , transforms. ToTensor( ) , transforms. Normalize( [ 0.485 , 0.456 , 0.406 ] , [ 0.229 , 0.224 , 0.225 ] ) ] ) , 'val' : transforms. Compose( [ transforms. Resize( input_size) , transforms. CenterCrop( input_size) , transforms. ToTensor( ) , transforms. Normalize( [ 0.485 , 0.456 , 0.406 ] , [ 0.229 , 0.224 , 0.225 ] ) ] ) ,
} print ( "Initializing Datasets and Dataloaders..." )
image_datasets = { x: datasets. ImageFolder( os. path. join( data_dir, x) , data_transforms[ x] ) for x in [ 'train' , 'val' ] }
dataloaders_dict = { x: torch. utils. data. DataLoader( image_datasets[ x] , batch_size= batch_size, shuffle= True , num_workers= 4 ) for x in [ 'train' , 'val' ] }
device = torch. device( "cuda:0" if torch. cuda. is_available( ) else "cpu" ) """
创建优化器
"""
model_ft = model_ft. to( device)
params_to_update = model_ft. parameters( )
print ( "Params to learn:" )
if feature_extract: params_to_update = [ ] for name, param in model_ft. named_parameters( ) : if param. requires_grad == True : params_to_update. append( param) print ( "\t" , name)
else : for name, param in model_ft. named_parameters( ) : if param. requires_grad == True : print ( "\t" , name)
optimizer_ft = optim. SGD( params_to_update, lr= 0.001 , momentum= 0.9 ) """
运行训练和验证
"""
criterion = nn. CrossEntropyLoss( )
model_ft, hist = train_model( model_ft, dataloaders_dict, criterion, optimizer_ft, num_epochs= num_epochs, is_inception= ( model_name== "inception" ) ) """
对比从头开始模型
"""
scratch_model, _ = initialize_model( model_name, num_classes, feature_extract= False , use_pretrained= False )
scratch_model = scratch_model. to( device)
scratch_optimizer = optim. SGD( scratch_model. parameters( ) , lr= 0.001 , momentum= 0.9 )
scratch_criterion = nn. CrossEntropyLoss( )
_, scratch_hist = train_model( scratch_model, dataloaders_dict, scratch_criterion, scratch_optimizer, num_epochs= num_epochs, is_inception= ( model_name== "inception" ) )
ohist = [ ]
shist = [ ] ohist = [ h. cpu( ) . numpy( ) for h in hist]
shist = [ h. cpu( ) . numpy( ) for h in scratch_hist] plt. title( "Validation Accuracy vs. Number of Training Epochs" )
plt. xlabel( "Training Epochs" )
plt. ylabel( "Validation Accuracy" )
plt. plot( range ( 1 , num_epochs+ 1 ) , ohist, label= "Pretrained" )
plt. plot( range ( 1 , num_epochs+ 1 ) , shist, label= "Scratch" )
plt. ylim( ( 0 , 1 . ) )
plt. xticks( np. arange( 1 , num_epochs+ 1 , 1.0 ) )
plt. legend( )
plt. show( )
空间变换器网络
from __future__ import print_function
import torch
import torch. nn as nn
import torch. nn. functional as F
import torch. optim as optim
import torchvision
from torchvision import datasets, transforms
import matplotlib. pyplot as plt
import numpy as npplt. ion( ) """
加载数据
""" device = torch. device( "cuda" if torch. cuda. is_available( ) else "cpu" )
train_loader = torch. utils. data. DataLoader( datasets. MNIST( root= './data/mnist/MNIST' , train= True , download= False , transform= transforms. Compose( [ transforms. ToTensor( ) , transforms. Normalize( ( 0.1307 , ) , ( 0.3081 , ) ) ] ) ) , batch_size= 64 , shuffle= True , num_workers= 4 )
test_loader = torch. utils. data. DataLoader( datasets. MNIST( root= './data/mnist/MNIST' , train= False , transform= transforms. Compose( [ transforms. ToTensor( ) , transforms. Normalize( ( 0.1307 , ) , ( 0.3081 , ) ) ] ) ) , batch_size= 64 , shuffle= True , num_workers= 4 ) """
空间变换器网络:
结构:
本地网络(Localisation Network)是常规CNN,其对变换参数进行回归。不会从该数据集中明确地学习转换,而是网络自动学习增强 全局准确性的空间变换。
网格生成器( Grid Genator)在输入图像中生成与输出图像中的每个像素相对应的坐标网格。
采样器(Sampler)使用变换的参数并将其应用于输入图像。
"""
class Net ( nn. Module) : def __init__ ( self) : super ( Net, self) . __init__( ) self. conv1 = nn. Conv2d( 1 , 10 , kernel_size= 5 ) self. conv2 = nn. Conv2d( 10 , 20 , kernel_size= 5 ) self. conv2_drop = nn. Dropout2d( ) self. fc1 = nn. Linear( 320 , 50 ) self. fc2 = nn. Linear( 50 , 10 ) self. localization = nn. Sequential( nn. Conv2d( 1 , 8 , kernel_size= 7 ) , nn. MaxPool2d( 2 , stride= 2 ) , nn. ReLU( True ) , nn. Conv2d( 8 , 10 , kernel_size= 5 ) , nn. MaxPool2d( 2 , stride= 2 ) , nn. ReLU( True ) ) self. fc_loc = nn. Sequential( nn. Linear( 10 * 3 * 3 , 32 ) , nn. ReLU( True ) , nn. Linear( 32 , 3 * 2 ) ) self. fc_loc[ 2 ] . weight. data. zero_( ) self. fc_loc[ 2 ] . bias. data. copy_( torch. tensor( [ 1 , 0 , 0 , 0 , 1 , 0 ] , dtype= torch. float ) ) def stn ( self, x) : xs = self. localization( x) xs = xs. view( - 1 , 10 * 3 * 3 ) theta = self. fc_loc( xs) theta = theta. view( - 1 , 2 , 3 ) grid = F. affine_grid( theta, x. size( ) ) x = F. grid_sample( x, grid) return xdef forward ( self, x) : x = self. stn( x) x = F. relu( F. max_pool2d( self. conv1( x) , 2 ) ) x = F. relu( F. max_pool2d( self. conv2_drop( self. conv2( x) ) , 2 ) ) x = x. view( - 1 , 320 ) x = F. relu( self. fc1( x) ) x = F. dropout( x, training= self. training) x = self. fc2( x) return F. log_softmax( x, dim= 1 ) model = Net( ) . to( device) """
训练模型
""" optimizer = optim. SGD( model. parameters( ) , lr= 0.01 ) def train ( epoch) : model. train( ) for batch_idx, ( data, target) in enumerate ( train_loader) : data, target = data. to( device) , target. to( device) optimizer. zero_grad( ) output = model( data) loss = F. nll_loss( output, target) loss. backward( ) optimizer. step( ) if batch_idx % 500 == 0 : print ( 'Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}' . format ( epoch, batch_idx * len ( data) , len ( train_loader. dataset) , 100 . * batch_idx / len ( train_loader) , loss. item( ) ) )
def test ( ) : with torch. no_grad( ) : model. eval ( ) test_loss = 0 correct = 0 for data, target in test_loader: data, target = data. to( device) , target. to( device) output = model( data) test_loss += F. nll_loss( output, target, size_average= False ) . item( ) pred = output. max ( 1 , keepdim= True ) [ 1 ] correct += pred. eq( target. view_as( pred) ) . sum ( ) . item( ) test_loss /= len ( test_loader. dataset) print ( '\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n' . format ( test_loss, correct, len ( test_loader. dataset) , 100 . * correct / len ( test_loader. dataset) ) ) """
可视化 STN 结果
"""
def convert_image_np ( inp) : """Convert a Tensor to numpy image.""" inp = inp. numpy( ) . transpose( ( 1 , 2 , 0 ) ) mean = np. array( [ 0.485 , 0.456 , 0.406 ] ) std = np. array( [ 0.229 , 0.224 , 0.225 ] ) inp = std * inp + meaninp = np. clip( inp, 0 , 1 ) return inp
def visualize_stn ( ) : with torch. no_grad( ) : data = next ( iter ( test_loader) ) [ 0 ] . to( device) input_tensor = data. cpu( ) transformed_input_tensor = model. stn( data) . cpu( ) in_grid = convert_image_np( torchvision. utils. make_grid( input_tensor) ) out_grid = convert_image_np( torchvision. utils. make_grid( transformed_input_tensor) ) f, axarr = plt. subplots( 1 , 2 ) axarr[ 0 ] . imshow( in_grid) axarr[ 0 ] . set_title( 'Dataset Images' ) axarr[ 1 ] . imshow( out_grid) axarr[ 1 ] . set_title( 'Transformed Images' ) for epoch in range ( 1 , 20 + 1 ) : train( epoch) test( )
visualize_stn( ) plt. ioff( )
plt. show( )
使用 PyTorch 进行 Neural-Transfer
from __future__ import print_functionimport torch
import torch. nn as nn
import torch. nn. functional as F
import torch. optim as optimfrom PIL import Image
import matplotlib. pyplot as pltimport torchvision. transforms as transforms
import torchvision. models as modelsimport copy"""
基本原理
我们定义两个间距,一个用于内容D_C,另一个用于风格D_S。D_C测量两张图片内容的不同,而D_S用来测量两张图片风格的不同。
然后,我们输入第三张图片,并改变这张图片,使其与内容图片的内容间距和风格图片的风格间距最小化。
开始图像风格转换。
""" device = torch. device( "cuda" if torch. cuda. is_available( ) else "cpu" ) """
加载图片
原始的PIL图片的值介于0到255之间,但是当转换成torch张量时,它们的值被转换成0到1之间。
图片也需要被重设成相同的维度。
一个重要的细节是,注意torch库中的神经网络用来训练的张量的值为0到1之间。
如果你尝试将0到255的张量图片加载到神经网络,然后激活的特征映射将不能侦测到目标内容和风格。
然而,Caffe库中的预训练网络用来训练的张量值为0到255之间的图片。
"""
imsize = 512 if torch. cuda. is_available( ) else 128 loader = transforms. Compose( [ transforms. Resize( imsize) , transforms. ToTensor( ) ] ) def image_loader ( image_name) : image = Image. open ( image_name) image = loader( image) . unsqueeze( 0 ) return image. to( device, torch. float ) style_img = image_loader( "./data/images//neural-style/picasso.jpg" )
content_img = image_loader( "./data/images//neural-style/dancing.jpg" ) assert style_img. size( ) == content_img. size( ) , \"我们需要导入相同大小的样式和内容图像"
unloader = transforms. ToPILImage( ) plt. ion( ) def imshow ( tensor, title= None ) : image = tensor. cpu( ) . clone( ) image = image. squeeze( 0 ) image = unloader( image) plt. imshow( image) if title is not None : plt. title( title) plt. pause( 0.001 ) plt. figure( )
imshow( style_img, title= 'Style Image' ) plt. figure( )
imshow( content_img, title= 'Content Image' ) """
内容损失
内容损失是一个表示一层内容间距的加权版本。
""" class ContentLoss ( nn. Module) : def __init__ ( self, target, ) : super ( ContentLoss, self) . __init__( ) self. target = target. detach( ) def forward ( self, input ) : self. loss = F. mse_loss( input , self. target) return input """
风格损失
它要作为一个网络中的透明层,来计算相应层的风格损失
""" def gram_matrix ( input ) : a, b, c, d = input . size( ) features = input . view( a * b, c * d) G = torch. mm( features, features. t( ) ) return G. div( a * b * c * d) class StyleLoss ( nn. Module) : def __init__ ( self, target_feature) : super ( StyleLoss, self) . __init__( ) self. target = gram_matrix( target_feature) . detach( ) def forward ( self, input ) : G = gram_matrix( input ) self. loss = F. mse_loss( G, self. target) return input """
导入模型
""" cnn = models. vgg19( pretrained= True ) . features. to( device) . eval ( ) """
图片预处理
"""
cnn_normalization_mean = torch. tensor( [ 0.485 , 0.456 , 0.406 ] ) . to( device)
cnn_normalization_std = torch. tensor( [ 0.229 , 0.224 , 0.225 ] ) . to( device)
class Normalization ( nn. Module) : def __init__ ( self, mean, std) : super ( Normalization, self) . __init__( ) self. mean = torch. tensor( mean) . view( - 1 , 1 , 1 ) self. std = torch. tensor( std) . view( - 1 , 1 , 1 ) def forward ( self, img) : return ( img - self. mean) / self. std"""
创建一个新的Sequential模型,并正确的插入内容损失和风格损失模型。
"""
content_layers_default = [ 'conv_4' ]
style_layers_default = [ 'conv_1' , 'conv_2' , 'conv_3' , 'conv_4' , 'conv_5' ] def get_style_model_and_losses ( cnn, normalization_mean, normalization_std, style_img, content_img, content_layers= content_layers_default, style_layers= style_layers_default) : cnn = copy. deepcopy( cnn) normalization = Normalization( normalization_mean, normalization_std) . to( device) content_losses = [ ] style_losses = [ ] model = nn. Sequential( normalization) i = 0 for layer in cnn. children( ) : if isinstance ( layer, nn. Conv2d) : i += 1 name = 'conv_{}' . format ( i) elif isinstance ( layer, nn. ReLU) : name = 'relu_{}' . format ( i) layer = nn. ReLU( inplace= False ) elif isinstance ( layer, nn. MaxPool2d) : name = 'pool_{}' . format ( i) elif isinstance ( layer, nn. BatchNorm2d) : name = 'bn_{}' . format ( i) else : raise RuntimeError( 'Unrecognized layer: {}' . format ( layer. __class__. __name__) ) model. add_module( name, layer) if name in content_layers: target = model( content_img) . detach( ) content_loss = ContentLoss( target) model. add_module( "content_loss_{}" . format ( i) , content_loss) content_losses. append( content_loss) if name in style_layers: target_feature = model( style_img) . detach( ) style_loss = StyleLoss( target_feature) model. add_module( "style_loss_{}" . format ( i) , style_loss) style_losses. append( style_loss) for i in range ( len ( model) - 1 , - 1 , - 1 ) : if isinstance ( model[ i] , ContentLoss) or isinstance ( model[ i] , StyleLoss) : break model = model[ : ( i + 1 ) ] return model, style_losses, content_losses"""
选择输入图片
""" input_img = content_img. clone( )
plt. figure( )
imshow( input_img, title= 'Input Image' ) """
梯度下降
""" def get_input_optimizer ( input_img) : optimizer = optim. LBFGS( [ input_img. requires_grad_( ) ] ) return optimizer"""
每次网络运行的时候将输 入的值矫正到0到1之间
""" def run_style_transfer ( cnn, normalization_mean, normalization_std, content_img, style_img, input_img, num_steps= 300 , style_weight= 1000000 , content_weight= 1 ) : """Run the style transfer.""" print ( 'Building the style transfer model..' ) model, style_losses, content_losses = get_style_model_and_losses( cnn, normalization_mean, normalization_std, style_img, content_img) optimizer = get_input_optimizer( input_img) print ( 'Optimizing..' ) run = [ 0 ] while run[ 0 ] <= num_steps: def closure ( ) : input_img. data. clamp_( 0 , 1 ) optimizer. zero_grad( ) model( input_img) style_score = 0 content_score = 0 for sl in style_losses: style_score += sl. lossfor cl in content_losses: content_score += cl. lossstyle_score *= style_weightcontent_score *= content_weightloss = style_score + content_scoreloss. backward( ) run[ 0 ] += 1 if run[ 0 ] % 50 == 0 : print ( "run {}:" . format ( run) ) print ( 'Style Loss : {:4f} Content Loss: {:4f}' . format ( style_score. item( ) , content_score. item( ) ) ) print ( ) return style_score + content_scoreoptimizer. step( closure) input_img. data. clamp_( 0 , 1 ) return input_img"""
运行这个算法。
""" output = run_style_transfer( cnn, cnn_normalization_mean, cnn_normalization_std, content_img, style_img, input_img) plt. figure( )
imshow( output, title= 'Output Image' )
plt. ioff( )
plt. show( )
生成对抗示例
from __future__ import print_function
import torch
import torch. nn as nn
import torch. nn. functional as F
import torch. optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib. pyplot as plt"""
输入
"""
epsilons = [ 0 , .05 , .1 , .15 , .2 , .25 , .3 ]
pretrained_model = "data/lenet_mnist_model.pth"
use_cuda= True """
被攻击的模型
"""
class Net ( nn. Module) : def __init__ ( self) : super ( Net, self) . __init__( ) self. conv1 = nn. Conv2d( 1 , 10 , kernel_size= 5 ) self. conv2 = nn. Conv2d( 10 , 20 , kernel_size= 5 ) self. conv2_drop = nn. Dropout2d( ) self. fc1 = nn. Linear( 320 , 50 ) self. fc2 = nn. Linear( 50 , 10 ) def forward ( self, x) : x = F. relu( F. max_pool2d( self. conv1( x) , 2 ) ) x = F. relu( F. max_pool2d( self. conv2_drop( self. conv2( x) ) , 2 ) ) x = x. view( - 1 , 320 ) x = F. relu( self. fc1( x) ) x = F. dropout( x, training= self. training) x = self. fc2( x) return F. log_softmax( x, dim= 1 )
test_loader = torch. utils. data. DataLoader( datasets. MNIST( '../data' , train= False , download= True , transform= transforms. Compose( [ transforms. ToTensor( ) , ] ) ) , batch_size= 1 , shuffle= True )
print ( "CUDA Available: " , torch. cuda. is_available( ) )
device = torch. device( "cuda" if ( use_cuda and torch. cuda. is_available( ) ) else "cpu" )
model = Net( ) . to( device)
model. load_state_dict( torch. load( pretrained_model, map_location= 'cpu' ) )
model. eval ( ) """
FGSM算法攻击
"""
def fgsm_attack ( image, epsilon, data_grad) : sign_data_grad = data_grad. sign( ) perturbed_image = image + epsilon* sign_data_gradperturbed_image = torch. clamp( perturbed_image, 0 , 1 ) return perturbed_image"""
测试函数
"""
def test ( model, device, test_loader, epsilon ) : correct = 0 adv_examples = [ ] for data, target in test_loader: data, target = data. to( device) , target. to( device) data. requires_grad = True output = model( data) init_pred = output. max ( 1 , keepdim= True ) [ 1 ] if init_pred. item( ) != target. item( ) : continue loss = F. nll_loss( output, target) model. zero_grad( ) loss. backward( ) data_grad = data. grad. dataperturbed_data = fgsm_attack( data, epsilon, data_grad) output = model( perturbed_data) final_pred = output. max ( 1 , keepdim= True ) [ 1 ] if final_pred. item( ) == target. item( ) : correct += 1 if ( epsilon == 0 ) and ( len ( adv_examples) < 5 ) : adv_ex = perturbed_data. squeeze( ) . detach( ) . cpu( ) . numpy( ) adv_examples. append( ( init_pred. item( ) , final_pred. item( ) , adv_ex) ) else : if len ( adv_examples) < 5 : adv_ex = perturbed_data. squeeze( ) . detach( ) . cpu( ) . numpy( ) adv_examples. append( ( init_pred. item( ) , final_pred. item( ) , adv_ex) ) final_acc = correct/ float ( len ( test_loader) ) print ( "Epsilon: {}\tTest Accuracy = {} / {} = {}" . format ( epsilon, correct, len ( test_loader) , final_acc) ) return final_acc, adv_examples"""
运行攻击
"""
accuracies = [ ]
examples = [ ]
for eps in epsilons: acc, ex = test( model, device, test_loader, eps) accuracies. append( acc) examples. append( ex) """
准确度
"""
plt. figure( figsize= ( 5 , 5 ) )
plt. plot( epsilons, accuracies, "*-" )
plt. yticks( np. arange( 0 , 1.1 , step= 0.1 ) )
plt. xticks( np. arange( 0 , .35 , step= 0.05 ) )
plt. title( "Accuracy vs Epsilon" )
plt. xlabel( "Epsilon" )
plt. ylabel( "Accuracy" )
plt. show( ) """
样本对抗性示例
"""
cnt = 0
plt. figure( figsize= ( 8 , 10 ) )
for i in range ( len ( epsilons) ) : for j in range ( len ( examples[ i] ) ) : cnt += 1 plt. subplot( len ( epsilons) , len ( examples[ 0 ] ) , cnt) plt. xticks( [ ] , [ ] ) plt. yticks( [ ] , [ ] ) if j == 0 : plt. ylabel( "Eps: {}" . format ( epsilons[ i] ) , fontsize= 14 ) orig, adv, ex = examples[ i] [ j] plt. title( "{} -> {}" . format ( orig, adv) ) plt. imshow( ex, cmap= "gray" )
plt. tight_layout( )
plt. show( )