步骤3:构建一个识别器
print ( 'Scores of Sample[0]' , predictions[ 0 ] . pred_scores. item)
步骤4:构建一个评估指标
scores = data_sample[ 'pred_scores' ] [ 'item' ]
scores = np. array( scores)
步骤5:使用本地 PyTorch 训练和测试
'''
修改此处 for data_batch in track_iter_progress(val_data_loader): ->
task_num = len(val_data_loader)for data_batch in track_iter_progress((val_data_loader,task_num)):
'''
task_num = len ( val_data_loader) for data_batch in track_iter_progress( ( val_data_loader, task_num) ) :
修改完的完整代码
import mmaction
from mmaction. utils import register_all_modulesregister_all_modules( init_default_scope= True )
print ( '**************************步骤0:准备数据*****************************' )
print ( '**************************步骤1:构建一个数据流水线*****************************' )
import mmcv
import decord
import numpy as np
from mmcv. transforms import TRANSFORMS, BaseTransform, to_tensor
from mmaction. structures import ActionDataSample@TRANSFORMS. register_module ( )
class VideoInit ( BaseTransform) : def transform ( self, results) : container = decord. VideoReader( results[ 'filename' ] ) results[ 'total_frames' ] = len ( container) results[ 'video_reader' ] = containerreturn results@TRANSFORMS. register_module ( )
class VideoSample ( BaseTransform) : def __init__ ( self, clip_len, num_clips, test_mode= False ) : self. clip_len = clip_lenself. num_clips = num_clipsself. test_mode = test_modedef transform ( self, results) : total_frames = results[ 'total_frames' ] interval = total_frames // self. clip_lenif self. test_mode: np. random. seed( 42 ) inds_of_all_clips = [ ] for i in range ( self. num_clips) : bids = np. arange( self. clip_len) * intervaloffset = np. random. randint( interval, size= bids. shape) inds = bids + offsetinds_of_all_clips. append( inds) results[ 'frame_inds' ] = np. concatenate( inds_of_all_clips) results[ 'clip_len' ] = self. clip_lenresults[ 'num_clips' ] = self. num_clipsreturn results@TRANSFORMS. register_module ( )
class VideoDecode ( BaseTransform) : def transform ( self, results) : frame_inds = results[ 'frame_inds' ] container = results[ 'video_reader' ] imgs = container. get_batch( frame_inds) . asnumpy( ) imgs = list ( imgs) results[ 'video_reader' ] = None del containerresults[ 'imgs' ] = imgsresults[ 'img_shape' ] = imgs[ 0 ] . shape[ : 2 ] return results@TRANSFORMS. register_module ( )
class VideoResize ( BaseTransform) : def __init__ ( self, r_size) : self. r_size = ( np. inf, r_size) def transform ( self, results) : img_h, img_w = results[ 'img_shape' ] new_w, new_h = mmcv. rescale_size( ( img_w, img_h) , self. r_size) imgs = [ mmcv. imresize( img, ( new_w, new_h) ) for img in results[ 'imgs' ] ] results[ 'imgs' ] = imgsresults[ 'img_shape' ] = imgs[ 0 ] . shape[ : 2 ] return results@TRANSFORMS. register_module ( )
class VideoCrop ( BaseTransform) : def __init__ ( self, c_size) : self. c_size = c_sizedef transform ( self, results) : img_h, img_w = results[ 'img_shape' ] center_x, center_y = img_w // 2 , img_h // 2 x1, x2 = center_x - self. c_size // 2 , center_x + self. c_size // 2 y1, y2 = center_y - self. c_size // 2 , center_y + self. c_size // 2 imgs = [ img[ y1: y2, x1: x2] for img in results[ 'imgs' ] ] results[ 'imgs' ] = imgsresults[ 'img_shape' ] = imgs[ 0 ] . shape[ : 2 ] return results@TRANSFORMS. register_module ( )
class VideoFormat ( BaseTransform) : def transform ( self, results) : num_clips = results[ 'num_clips' ] clip_len = results[ 'clip_len' ] imgs = results[ 'imgs' ] imgs = np. array( imgs) imgs = imgs. reshape( ( num_clips, clip_len) + imgs. shape[ 1 : ] ) imgs = imgs. transpose( 0 , 4 , 1 , 2 , 3 ) results[ 'imgs' ] = imgsreturn results@TRANSFORMS. register_module ( )
class VideoPack ( BaseTransform) : def __init__ ( self, meta_keys= ( 'img_shape' , 'num_clips' , 'clip_len' ) ) : self. meta_keys = meta_keysdef transform ( self, results) : packed_results = dict ( ) inputs = to_tensor( results[ 'imgs' ] ) data_sample = ActionDataSample( ) . set_gt_label( results[ 'label' ] ) metainfo = { k: results[ k] for k in self. meta_keys if k in results} data_sample. set_metainfo( metainfo) packed_results[ 'inputs' ] = inputspacked_results[ 'data_samples' ] = data_samplereturn packed_resultsimport os. path as osp
from mmengine. dataset import Composepipeline_cfg = [ dict ( type = 'VideoInit' ) , dict ( type = 'VideoSample' , clip_len= 16 , num_clips= 1 , test_mode= False ) , dict ( type = 'VideoDecode' ) , dict ( type = 'VideoResize' , r_size= 256 ) , dict ( type = 'VideoCrop' , c_size= 224 ) , dict ( type = 'VideoFormat' ) , dict ( type = 'VideoPack' )
] pipeline = Compose( pipeline_cfg)
data_prefix = 'data/kinetics400_tiny/train'
results = dict ( filename= osp. join( data_prefix, 'D32_1gwq35E.mp4' ) , label= 0 )
packed_results = pipeline( results) inputs = packed_results[ 'inputs' ]
data_sample = packed_results[ 'data_samples' ] print ( 'shape of the inputs: ' , inputs. shape)
print ( 'image_shape: ' , data_sample. img_shape)
print ( 'num_clips: ' , data_sample. num_clips)
print ( 'clip_len: ' , data_sample. clip_len)
print ( 'label: ' , data_sample. gt_label)
print ( '**************************步骤2:构建一个数据集和数据加载器*****************************' )
import os. path as osp
from mmengine. fileio import list_from_file
from mmengine. dataset import BaseDataset
from mmaction. registry import DATASETS@DATASETS. register_module ( )
class DatasetZelda ( BaseDataset) : def __init__ ( self, ann_file, pipeline, data_root, data_prefix= dict ( video= '' ) , test_mode= False , modality= 'RGB' , ** kwargs) : self. modality = modalitysuper ( DatasetZelda, self) . __init__( ann_file= ann_file, pipeline= pipeline, data_root= data_root, data_prefix= data_prefix, test_mode= test_mode, ** kwargs) def load_data_list ( self) : data_list = [ ] fin = list_from_file( self. ann_file) for line in fin: line_split = line. strip( ) . split( ) filename, label = line_splitlabel = int ( label) filename = osp. join( self. data_prefix[ 'video' ] , filename) data_list. append( dict ( filename= filename, label= label) ) return data_listdef get_data_info ( self, idx: int ) - > dict : data_info = super ( ) . get_data_info( idx) data_info[ 'modality' ] = self. modalityreturn data_infofrom mmaction. registry import DATASETStrain_pipeline_cfg = [ dict ( type = 'VideoInit' ) , dict ( type = 'VideoSample' , clip_len= 16 , num_clips= 1 , test_mode= False ) , dict ( type = 'VideoDecode' ) , dict ( type = 'VideoResize' , r_size= 256 ) , dict ( type = 'VideoCrop' , c_size= 224 ) , dict ( type = 'VideoFormat' ) , dict ( type = 'VideoPack' )
] val_pipeline_cfg = [ dict ( type = 'VideoInit' ) , dict ( type = 'VideoSample' , clip_len= 16 , num_clips= 5 , test_mode= True ) , dict ( type = 'VideoDecode' ) , dict ( type = 'VideoResize' , r_size= 256 ) , dict ( type = 'VideoCrop' , c_size= 224 ) , dict ( type = 'VideoFormat' ) , dict ( type = 'VideoPack' )
] train_dataset_cfg = dict ( type = 'DatasetZelda' , ann_file= 'kinetics_tiny_train_video.txt' , pipeline= train_pipeline_cfg, data_root= 'data/kinetics400_tiny/' , data_prefix= dict ( video= 'train' ) ) val_dataset_cfg = dict ( type = 'DatasetZelda' , ann_file= 'kinetics_tiny_val_video.txt' , pipeline= val_pipeline_cfg, data_root= 'data/kinetics400_tiny/' , data_prefix= dict ( video= 'val' ) ) train_dataset = DATASETS. build( train_dataset_cfg) packed_results = train_dataset[ 0 ] inputs = packed_results[ 'inputs' ]
data_sample = packed_results[ 'data_samples' ] print ( 'shape of the inputs: ' , inputs. shape)
print ( 'image_shape: ' , data_sample. img_shape)
print ( 'num_clips: ' , data_sample. num_clips)
print ( 'clip_len: ' , data_sample. clip_len)
print ( 'label: ' , data_sample. gt_label) from mmengine. runner import RunnerBATCH_SIZE = 2 train_dataloader_cfg = dict ( batch_size= BATCH_SIZE, num_workers= 0 , persistent_workers= False , sampler= dict ( type = 'DefaultSampler' , shuffle= True ) , dataset= train_dataset_cfg) val_dataloader_cfg = dict ( batch_size= BATCH_SIZE, num_workers= 0 , persistent_workers= False , sampler= dict ( type = 'DefaultSampler' , shuffle= False ) , dataset= val_dataset_cfg) train_data_loader = Runner. build_dataloader( dataloader= train_dataloader_cfg)
val_data_loader = Runner. build_dataloader( dataloader= val_dataloader_cfg) batched_packed_results = next ( iter ( train_data_loader) ) batched_inputs = batched_packed_results[ 'inputs' ]
batched_data_sample = batched_packed_results[ 'data_samples' ] assert len ( batched_inputs) == BATCH_SIZE
assert len ( batched_data_sample) == BATCH_SIZE
print ( '**************************步骤3:构建一个识别器*****************************' )
import torch
from mmengine. model import BaseDataPreprocessor, stack_batch
from mmaction. registry import MODELS@MODELS. register_module ( )
class DataPreprocessorZelda ( BaseDataPreprocessor) : def __init__ ( self, mean, std) : super ( ) . __init__( ) self. register_buffer( 'mean' , torch. tensor( mean, dtype= torch. float32) . view( - 1 , 1 , 1 , 1 ) , False ) self. register_buffer( 'std' , torch. tensor( std, dtype= torch. float32) . view( - 1 , 1 , 1 , 1 ) , False ) def forward ( self, data, training= False ) : data = self. cast_data( data) inputs = data[ 'inputs' ] batch_inputs = stack_batch( inputs) batch_inputs = ( batch_inputs - self. mean) / self. std data[ 'inputs' ] = batch_inputsreturn datafrom mmaction. registry import MODELSdata_preprocessor_cfg = dict ( type = 'DataPreprocessorZelda' , mean= [ 123.675 , 116.28 , 103.53 ] , std= [ 58.395 , 57.12 , 57.375 ] ) data_preprocessor = MODELS. build( data_preprocessor_cfg) preprocessed_inputs = data_preprocessor( batched_packed_results)
print ( preprocessed_inputs[ 'inputs' ] . shape) import torch
import torch. nn as nn
import torch. nn. functional as F
from mmengine. model import BaseModel, BaseModule, Sequential
from mmengine. structures import LabelData
from mmaction. registry import MODELS@MODELS. register_module ( )
class BackBoneZelda ( BaseModule) : def __init__ ( self, init_cfg= None ) : if init_cfg is None : init_cfg = [ dict ( type = 'Kaiming' , layer= 'Conv3d' , mode= 'fan_out' , nonlinearity= "relu" ) , dict ( type = 'Constant' , layer= 'BatchNorm3d' , val= 1 , bias= 0 ) ] super ( BackBoneZelda, self) . __init__( init_cfg= init_cfg) self. conv1 = Sequential( nn. Conv3d( 3 , 64 , kernel_size= ( 3 , 7 , 7 ) , stride= ( 1 , 2 , 2 ) , padding= ( 1 , 3 , 3 ) ) , nn. BatchNorm3d( 64 ) , nn. ReLU( ) ) self. maxpool = nn. MaxPool3d( kernel_size= ( 1 , 3 , 3 ) , stride= ( 1 , 2 , 2 ) , padding= ( 0 , 1 , 1 ) ) self. conv = Sequential( nn. Conv3d( 64 , 128 , kernel_size= 3 , stride= 2 , padding= 1 ) , nn. BatchNorm3d( 128 ) , nn. ReLU( ) ) def forward ( self, imgs) : features = self. conv( self. maxpool( self. conv1( imgs) ) ) return features@MODELS. register_module ( )
class ClsHeadZelda ( BaseModule) : def __init__ ( self, num_classes, in_channels, dropout= 0.5 , average_clips= 'prob' , init_cfg= None ) : if init_cfg is None : init_cfg = dict ( type = 'Normal' , layer= 'Linear' , std= 0.01 ) super ( ClsHeadZelda, self) . __init__( init_cfg= init_cfg) self. num_classes = num_classesself. in_channels = in_channelsself. average_clips = average_clipsif dropout != 0 : self. dropout = nn. Dropout( dropout) else : self. dropout = None self. fc = nn. Linear( self. in_channels, self. num_classes) self. pool = nn. AdaptiveAvgPool3d( 1 ) self. loss_fn = nn. CrossEntropyLoss( ) def forward ( self, x) : N, C, T, H, W = x. shapex = self. pool( x) x = x. view( N, C) assert x. shape[ 1 ] == self. in_channelsif self. dropout is not None : x = self. dropout( x) cls_scores = self. fc( x) return cls_scoresdef loss ( self, feats, data_samples) : cls_scores = self( feats) labels = torch. stack( [ x. gt_label for x in data_samples] ) labels = labels. squeeze( ) if labels. shape == torch. Size( [ ] ) : labels = labels. unsqueeze( 0 ) loss_cls = self. loss_fn( cls_scores, labels) return dict ( loss_cls= loss_cls) def predict ( self, feats, data_samples) : cls_scores = self( feats) num_views = cls_scores. shape[ 0 ] // len ( data_samples) cls_scores = self. average_clip( cls_scores, num_views) for ds, sc in zip ( data_samples, cls_scores) : pred = LabelData( item= sc) ds. pred_scores = predreturn data_samplesdef average_clip ( self, cls_scores, num_views) : if self. average_clips not in [ 'score' , 'prob' , None ] : raise ValueError( f' { self. average_clips} is not supported. ' f'Currently supported ones are ' f'["score", "prob", None]' ) total_views = cls_scores. shape[ 0 ] cls_scores = cls_scores. view( total_views // num_views, num_views, - 1 ) if self. average_clips is None : return cls_scoreselif self. average_clips == 'prob' : cls_scores = F. softmax( cls_scores, dim= 2 ) . mean( dim= 1 ) elif self. average_clips == 'score' : cls_scores = cls_scores. mean( dim= 1 ) return cls_scores@MODELS. register_module ( )
class RecognizerZelda ( BaseModel) : def __init__ ( self, backbone, cls_head, data_preprocessor) : super ( ) . __init__( data_preprocessor= data_preprocessor) self. backbone = MODELS. build( backbone) self. cls_head = MODELS. build( cls_head) def extract_feat ( self, inputs) : inputs = inputs. view( ( - 1 , ) + inputs. shape[ 2 : ] ) return self. backbone( inputs) def loss ( self, inputs, data_samples) : feats = self. extract_feat( inputs) loss = self. cls_head. loss( feats, data_samples) return lossdef predict ( self, inputs, data_samples) : feats = self. extract_feat( inputs) predictions = self. cls_head. predict( feats, data_samples) return predictionsdef forward ( self, inputs, data_samples= None , mode= 'tensor' ) : if mode == 'tensor' : return self. extract_feat( inputs) elif mode == 'loss' : return self. loss( inputs, data_samples) elif mode == 'predict' : return self. predict( inputs, data_samples) else : raise RuntimeError( f'Invalid mode: { mode} ' ) import torch
import copy
from mmaction. registry import MODELSmodel_cfg = dict ( type = 'RecognizerZelda' , backbone= dict ( type = 'BackBoneZelda' ) , cls_head= dict ( type = 'ClsHeadZelda' , num_classes= 2 , in_channels= 128 , average_clips= 'prob' ) , data_preprocessor = dict ( type = 'DataPreprocessorZelda' , mean= [ 123.675 , 116.28 , 103.53 ] , std= [ 58.395 , 57.12 , 57.375 ] ) ) model = MODELS. build( model_cfg)
model. train( )
model. init_weights( )
data_batch_train = copy. deepcopy( batched_packed_results)
data = model. data_preprocessor( data_batch_train, training= True )
loss = model( ** data, mode= 'loss' )
print ( 'loss dict: ' , loss)
with torch. no_grad( ) : model. eval ( ) data_batch_test = copy. deepcopy( batched_packed_results) data = model. data_preprocessor( data_batch_test, training= False ) predictions = model( ** data, mode= 'predict' )
here = ( predictions)
print ( 'Label of Sample[0]' , predictions[ 0 ] . gt_label)
print ( '----------------------------------------------------' )
print ( 'Label of Sample[0]' , predictions[ 0 ] . gt_label)
print ( 'Scores of Sample[0]' , predictions[ 0 ] . pred_scores. item)
print ( '**************************步骤4:构建一个评估指标*****************************' )
import copy
from collections import OrderedDict
from mmengine. evaluator import BaseMetric
from mmaction. evaluation import top_k_accuracy
from mmaction. registry import METRICS@METRICS. register_module ( )
class AccuracyMetric ( BaseMetric) : def __init__ ( self, topk= ( 1 , 5 ) , collect_device= 'cpu' , prefix= 'acc' ) : super ( ) . __init__( collect_device= collect_device, prefix= prefix) self. topk = topkdef process ( self, data_batch, data_samples) : data_samples = copy. deepcopy( data_samples) for data_sample in data_samples: result = dict ( ) scores = data_sample[ 'pred_scores' ] [ 'item' ] scores = np. array( scores) label = data_sample[ 'gt_label' ] . item( ) result[ 'scores' ] = scoresresult[ 'label' ] = labelself. results. append( result) def compute_metrics ( self, results: list ) - > dict : eval_results = OrderedDict( ) labels = [ res[ 'label' ] for res in results] scores = [ res[ 'scores' ] for res in results] topk_acc = top_k_accuracy( scores, labels, self. topk) for k, acc in zip ( self. topk, topk_acc) : eval_results[ f'topk { k} ' ] = accreturn eval_resultsfrom mmaction. registry import METRICSmetric_cfg = dict ( type = 'AccuracyMetric' , topk= ( 1 , 5 ) ) metric = METRICS. build( metric_cfg) data_samples = [ d. to_dict( ) for d in predictions] metric. process( batched_packed_results, data_samples)
acc = metric. compute_metrics( metric. results)
print ( acc)
print ( '**************************步骤5:使用本地 PyTorch 训练和测试*****************************' )
import torch. optim as optim
from mmengine import track_iter_progress
from tqdm import tqdmdevice = 'cuda'
max_epochs = 10 optimizer = optim. Adam( model. parameters( ) , lr= 0.01 ) for epoch in range ( max_epochs) : model. train( ) losses = [ ] task_num = len ( train_data_loader) for data_batch in track_iter_progress( ( train_data_loader, task_num) ) : data = model. data_preprocessor( data_batch, training= True ) loss_dict = model( ** data, mode= 'loss' ) loss = loss_dict[ 'loss_cls' ] optimizer. zero_grad( ) loss. backward( ) optimizer. step( ) losses. append( loss. item( ) ) print ( f'Epoch[ { epoch} ]: loss ' , sum ( losses) / len ( train_data_loader) ) with torch. no_grad( ) : model. eval ( ) task_num = len ( val_data_loader) for data_batch in track_iter_progress( ( val_data_loader, task_num) ) : data = model. data_preprocessor( data_batch, training= False ) predictions = model( ** data, mode= 'predict' ) data_samples = [ d. to_dict( ) for d in predictions] metric. process( data_batch, data_samples) acc = metric. acc = metric. compute_metrics( metric. results) for name, topk in acc. items( ) : print ( f' { name} : ' , topk)
print ( '**************************步骤6:使用 MMEngine 训练和测试(推荐)*****************************' )