PyTorch的GPU训练方式学习

    技术2023-07-16  138

    文章目录

    GPU并行训练单机多卡 分布式训练torch.distributed 包支持示例代码,多GPU的训练代码

    GPU并行训练

    模型转cuda

    gpus=[0]#指定训练的GPU cuda_gpu=torch.cuda.is_available() #判断GPU是否存在可用 net=Net()#模型初始化 if(cuda_gpu): net=torch.nn.DataParallel(net,device_ids=gps).cuda()#ids指定GPU

    数据转cuda

    (minibatchX, minibatchY) = minibatch minibatchX = minibatchX.astype(np.float32).T#转浮点型数字 minibatchY = minibatchY.astype(np.float32).T if(cuda_gpu): b_x = Variable(torch.from_numpy(minibatchX).cuda()) #将数据转为cuda类型 b_y = Variable(torch.from_numpy(minibatchY).cuda()) else: b_x = Variable(torch.from_numpy(minibatchX)) b_y = Variable(torch.from_numpy(minibatchY))

    输出数据去cuda,转为numpy

    correct_prediction = sum(torch.max(output, 1)[1].data.squeeze() == torch.max(b_y, 1)[1].data.squeeze()) if(cuda_gpu): correct_prediction = correct_prediction.cpu().numpy() #.cpu将cuda转为tensor类型,.numpy将tensor转为numpy类型 else: correct_prediction = correct_prediction.numpy()

    单机多卡 分布式训练

    补充文章:DDP入门教程 DDP实现原理与源代码解析 DDP实战技巧

    torch.distributed 包支持

    分布训练包括 GPU 和 CPU 的分布式训练支持。Pytorch 分布式目前只支持 Linux。 torch.distributed 分布式训练的优势:

    1.每个进程对应一个独立的训练过程,且只对梯度等少量数据进行信息交换。

    在每次迭代中,每个进程具有自己的 optimizer ,并独立完成所有的优化步骤,进程内与一般的训练无异。在各进程梯度计算完成之后,各进程需要将梯度进行汇总平均,然后再由 rank=0 的进程,将其 broadcast(广播) 到所有进程。之后,各进程用该梯度来更新参数由于各进程中的模型,初始参数一致 (初始时刻进行一次 broadcast),而每次用于更新参数的梯度也一致,因此,各进程的模型参数始终保持一致 而在 DataParallel 中,全程维护一个 optimizer,对各 GPU 上梯度进行求和,而在主 GPU 进行参数更新,之后再将模型参数 broadcast 到其他 GPU。所以分布式训练的参数传输的数据量更少,因此速度更快效率更高。 基本概念group 即进程组。默认情况下,只有一个组,一个 job 即为一个组,也即一个 world。当需要进行更加精细的通信时,可以通过 new_group 接口,使用 word 的子集,创建新组,用于集体通信等。world size 全局进程个数rank 进程序号,用于进程间通讯,表征进程优先级。rank=0的主机为masklocal_rank 进程内GPU编号,非显式参数,由 torch.distributed.launch 内部指定。比方说, rank = 3,local_rank = 0 表示第 3 个进程内的第 1 块 GPU。 基本使用流程 在使用 distributed 包的任何其他函数之前,需要使用 init_process_group 初始化进程组,同时初始化 distributed 包 distributed.init_process_group(backend='nccl',rank,world_size,init_method='env://') #Env初始化方式 如果需要进行小组内集体通信,用 new_group 创建子分组创建分布式并行模型 DDP(model, device_ids=device_ids)为数据集创建 Sampler train_sample=DistributeedSampler(traindataset) 使用启动工具 torch.distributed.launch --n在每个主机上执行一次脚本,开始训练使用 destory_process_group() 销毁进程组

    编程模板 一,Env初始化方法

    import torch.distributed as dist import torch.utils.data.distributed # ...... import argparse parser = argparse.ArgumentParser() # 注意这个参数,必须要以这种形式指定,即使代码中不使用。因为 launch 工具默认传递该参数 parser.add_argument("--local_rank", type=int) args = parser.parse_args() # ...... dist.init_process_group(backend='nccl', init_method='env://') # ...... trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform) train_sampler = torch.utils.data.distributed.DistributedSampler(trainset) trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler) # ...... # 根据 local_rank,配置当前进程使用的 GPU net = Net() device = torch.device('cuda', args.local_rank) net = net.to(device) net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[args.local_rank], output_device=args.local_rank)

    执行方式

    python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=0 --master_addr="192.168.1.201" --master_port=23456 env_init.py python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=1 --master_addr="192.168.1.201" --master_port=23456 env_init.py python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=2 --master_addr="192.168.1.201" --master_port=23456 env_init.py

    其中,节点数为3,每个节点的进程数为2,node_rank是每个节点的优先级,–master_addr是指定主节点

    说明

    在 Env 方式中,在 init_process_group 中,无需指定任何参数必须在rank==0的进程内保存参数该方式下,使用 torch.distributed.launch 在每台主机上,为其创建多进程,其中:nproc_per_node 参数指定为当前主机创建的进程数。一般设定为当前主机的 GPU 数量nnodes 参数指定当前 job 包含多少个节点node_rank 指定当前节点的优先级master_addr 和 master_port 分别指定 master 节点的 ip:port若没有为每个进程合理分配 GPU,则默认使用当前主机上所有的 GPU。即使一台主机上有多个进程,也会共用 GPU使用 torch.distributed.launch 工具时,将会为当前主机创建 nproc_per_node 个进程,每个进程独立执行训练脚本。同时,它还会为每个进程分配一个 local_rank 参数,表示当前进程在当前主机上的编号。例如:rank=2, local_rank=0 表示第 3 个节点上的第 1 个进程。需要合理利用 local_rank 参数,来合理分配本地的 GPU 资源每条命令表示一个进程。若已开启的进程未达到 word_size 的数量,则所有进程会一直等待 二,TCP初始化方式 代码: import torch.distributed as dist import torch.utils.data.distributed # ...... parser = argparse.ArgumentParser(description='PyTorch distributed training on cifar-10') parser.add_argument('--rank', default=0, help='rank of current process') parser.add_argument('--word_size', default=2, help="word size") parser.add_argument('--init_method', default='tcp://127.0.0.1:23456', help="init-method") args = parser.parse_args() # ...... dist.init_process_group(backend='nccl', init_method=args.init_method, rank=args.rank, world_size=args.word_size) # ...... trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform) train_sampler = torch.utils.data.distributed.DistributedSampler(trainset) trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler) # ...... net = Net() net = net.cuda() net = torch.nn.parallel.DistributedDataParallel(net)

    执行方式:

    # Node 1 : ip 192.168.1.201 port : 12345 python tcp_init.py --init_method tcp://192.168.1.201:12345 --rank 0 --word_size 3 # Node 2 : python tcp_init.py --init_method tcp://192.168.1.201:12345 --rank 1 --word_size 3 # Node 3 : python tcp_init.py --init_method tcp://192.168.1.201:12345 --rank 2 --word_size 3

    说明:

    在 TCP 方式中,在 init_process_group 中必须手动指定以下参数rank 为当前进程的进程号word_size 为当前 job 的总进程数init_method 内指定 tcp 模式,且所有进程的 ip:port 必须一致,设定为主进程的 ip:port必须在 rank==0 的进程内保存参数若程序内未根据 rank 设定当前进程使用的 GPUs,则默认使用全部 GPU,且以数据并行的方式使用。每条命令表示一个进程。若已开启的进程未达到 word_size 的数量,则所有进程会一直等待每台主机上可以开启多个进程。但是,若未为每个进程分配合适的 GPU,则同机不同进程可能会共用 GPU,应该坚决避免这种情况。使用 gloo 后端进行 GPU 训练时,会报错。若每个进程负责多块 GPU,可以利用多 GPU 进行模型并行。如下所示: class ToyMpModel(nn.Module): def init(self, dev0, dev1): super(ToyMpModel, self).init() self.dev0 = dev0 self.dev1 = dev1 self.net1 = torch.nn.Linear(10, 10).to(dev0) self.relu = torch.nn.ReLU() self.net2 = torch.nn.Linear(10, 5).to(dev1) def forward(self, x): x = x.to(self.dev0) x = self.relu(self.net1(x)) x = x.to(self.dev1) return self.net2(x) ...... dev0 = rank * 2 dev1 = rank * 2 + 1 mp_model = ToyMpModel(dev0, dev1)#模型并行 ddp_mp_model = DDP(mp_model)

    进程组说明 1.初始化进程组

    torch.distributed.init_process_group(backend,init_method=None, timeout=datetime.timedelta(0, 1800), world_size=-1, rank=-1, store=None)

    2,参数说明

    backend:指定当前进程要使用的通信后端 小写字符串,支持的通信后端有 gloo,mpi,nccl 。建议用 nccl。具体含义后面说明

    init_method : 指定当前进程组初始化方式

    rank: 指定当前进程的优先级

    world_size:该 job 中的总进程数。如果指定 store 参数,则需要指定该参数。

    timeout:指定每个进程的超时时间 可选参数,datetime.timedelta 对象,默认为 30 分钟。该参数仅用于 Gloo 后端

    store 所有 worker 可访问的 key / value,用于交换连接 / 地址信息。与 init_method 互斥。 new_group

    torch.distributed.new_group(ranks=None, timeout=datetime.timedelta(0, 1800), backend=None) 函数作用 new_group() 函数可用于使用所有进程的任意子集来创建新组。其返回一个分组句柄,可作为 collectives 相关函数的 group 参数 。collectives 是分布式函数,用于特定编程模式中的信息交换。参数详解 ranks:指定新分组内的成员的 ranks 列表 timeout:指定该分组进程组内的操作的超时时间 backend:指定要使用的通信后端,小写字符串,支持的通信后端有 gloo,nccl ,必须与 init_process_group() 中一致。 获取进程组属性get_backendget_rankget_world_sizeis_initializedis_mpi_availableis_nccl_available

    通信后端

    Gloo后端 gloo 后端支持 CPU 和 GPU,其支持集体通信(collective Communication),并对其进行了优化。 由于 GPU 之间可以直接进行数据交换,而无需经过 CPU 和内存,因此,在 GPU 上使用 gloo 后端速度更快。torch.distributed 对 gloo 提供原生支持,无需进行额外操作。NCCL后端 NCCL 的全称为 Nvidia 聚合通信库(NVIDIA Collective Communications Library),是一个可以实现多个 GPU、多个结点间聚合通信的库,在 PCIe、Nvlink、InfiniBand 上可以实现较高的通信速度。NCCL 高度优化和兼容了 MPI,并且可以感知 GPU 的拓扑,促进多 GPU 多节点的加速,最大化 GPU 内的带宽利用率,所以深度学习框架的研究员可以利用 NCCL 的这个优势,在多个结点内或者跨界点间可以充分利用所有可利用的 GPU。NCCL 对 CPU 和 GPU 均有较好支持,且 torch.distributed 对其也提供了原生支持。对于每台主机均使用多进程的情况,使用 NCCL 可以获得最大化的性能。每个进程内,不许对其使用的 GPUs 具有独占权。若进程之间共享 GPUs 资源,则可能导致 deadlocks。MPI后端 MPI 即消息传递接口(Message Passing Interface),是一个来自于高性能计算领域的标准的工具。它支持点对点通信以及集体通信,并且是 torch.distributed 的 API 的灵感来源。使用 MPI 后端的优势在于,在大型计算机集群上,MPI 应用广泛,且高度优化。但是,torch.distributed 对 MPI 并不提供原生支持。因此,要使用 MPI,必须从源码编译 Pytorch。是否支持 GPU,视安装的 MPI 版本而定。 后端如何选择 基本原则: 用 NCCL 进行分布式 GPU 训练 用 Gloo 进行分布式 CPU 训练 更加具体的教程,参考:https://zhuanlan.zhihu.com/p/76638962 https://www.jiqizhixin.com/articles/2019-04-30-8 https://zhuanlan.zhihu.com/p/113694038

    示例代码,多GPU的训练代码

    # -*- coding=utf-8 -*- import pdb import argparse import os import os.path as osp import random import shutil import time import warnings import sys import torch import torch.nn as nn import torch.nn.parallel import torch.nn.functional as F import torch.backends.cudnn as cudnn # import torch.optim # from torch.optim.lr_scheduler import MultiStepLR from torch.utils import data import torch.utils.data.distributed import torchvision.transforms as transforms import torchvision.datasets as datasets import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from collections import OrderedDict from easydict import EasyDict as edict # 添加当前工程内的package cur_dir = osp.split(osp.realpath(__file__))[0] src_dir = osp.join(cur_dir, '../') if src_dir not in sys.path: sys.path.append(src_dir) from src.dataset.sampler import DistributedTestSampler from src.dataset.datasets_3d import video_data from src.model.inceptionv2_3D_len8 import inceptionv2_3D from src.model.criterion import center_loss from src.utils.log import Log from src.dataset import group_transform from src.utils.utils import save_checkpoint, AverageMeter, accuracy,data_prefetcher, group_weight_3d from src.utils.acc import acc_com, acc_thr from src.utils.model_load import model_load_p3d, model_load_len8, model_load, model_load_test import re C = edict() config = # ======================================================================================================================= # common config exp_time = time.strftime('%Y_%m_%d_%H_%M_%S', time.localtime()) C.file_name = osp.splitext(osp.split(__file__)[1])[0] C.snapshot_dir =r'/data1/video_classification/2d/scripts/model_3d_gv2_base/' # train and test file config C.mean = [0.485, 0.456, 0.406] C.std = [0.229, 0.224, 0.225] C.mean_cv = [123,117,104]#[104, 117, 123] C.std_cv = [0.017, 0.017, 0.017] C.test_file='/data1/video_classification/2d/list/video_cls_test_list_20190520_3c.txt' C.train_file='/data1/video_classification/2d/list/train_list_len8_20190716_new_3c.txt' C.train_img_path = r'/dataset/zhangruixuan3/' C.test_img_path = r'/dataset/zhangruixuan3/' # Setting for Network C.pretrained = True # C.pretrained_model_inceptionv2 = r'../src/pre_model/inception_maxpooling_len8_iter_80000.pth' C.pretrained_model_inceptionv2=r'/data1/model_analysis/video/4L_32F_Att5/26pth/checkpoint_E26.pth' # Train Config C.lr = 0.001 C.momentum = 0.9 C.weight_decay = 0.00005 C.batch_size = 64 # 12 # all gpu C.num_workers = 5 C.start_epoch = 1 C.epochs = 40 # C.mile_stones = [20,40] C.snapshot_epoch = list(range(1, C.epochs)) + [C.epochs] # [5, 10, 15, 20, 25, 30] print(C.snapshot_epoch) C.print_freq = 100 def main(): parser = argparse.ArgumentParser() cudnn.benchmark = True parser.add_argument('--local_rank', default=0, type=int, help='processrank on node') parser.add_argument('--center_loss', default=1, type=int, help='processrank on node') parser.add_argument('--weight_center_loss',default=0.5, type=float, help='center_loss') args = parser.parse_args() seed = args.local_rank torch.manual_seed(seed) if torch.cuda.is_available(): torch.cuda.manual_seed(seed) # 初始化distributed environment torch.cuda.set_device(args.local_rank) os.environ['MASTER_ADDR'] ='10.3.67.12' os.environ['MASTER_PORT'] ='58397' dist.init_process_group(backend='nccl', rank=0, world_size=1,init_method='env://') if args.local_rank == 0: os.makedirs(C.snapshot_dir,exist_ok=True) log =Log(osp.join(C.snapshot_dir, 'log_DDP.txt')) args.log = log args.file_name = C.file_name args.ckpt_dir = C.snapshot_dir args.world_size =dist.get_world_size() #####inceptionv3 train_transform =transforms.Compose([ group_transform.GroupScale((224, 224), cv=True),#cv=False), group_transform.GroupRandomHorizontalFlip(),# randomly reverse from leftto right group_transform.Stack(), group_transform.ToTorchFormatTensor(div=False),#True group_transform.GroupNormalize(mean=C.mean_cv, std=C.std_cv, cv=True) ]) test_transform =transforms.Compose([ # transforms.Resize((256,256)), # transforms.CenterCrop((224, 224)), group_transform.GroupScale((224, 224), cv=True),group_transform.Stack(), group_transform.ToTorchFormatTensor(div=False), group_transform.GroupNormalize(mean=C.mean_cv, std=C.std_cv, cv=True) ]) train_set = video_data( img_path=C.train_img_path, txt_path=C.train_file, num_segments=1, length=8, modality='RGB', img_tmpl='image_{:04d}.jpg', step=2, length_first=True, data_transforms=train_transform) train_sampler=torch.utils.data.distributed.DistributedSampler(train_set) train_loader =torch.utils.data.DataLoader( dataset=train_set, batch_size=C.batch_size, shuffle=False, num_workers=C.num_workers, pin_memory=True, drop_last=False, sampler=train_sampler) # test test_set = video_data( img_path=C.test_img_path, txt_path=C.test_file, num_segments=1, length=8, modality='RGB', img_tmpl='image_{:04d}.jpg', step=2, length_first=True, data_transforms=test_transform) test_sampler =DistributedTestSampler(test_set) test_loader = torch.utils.data.DataLoader( dataset=test_set, batch_size=C.batch_size, shuffle=False, num_workers=C.num_workers, pin_memory=True, drop_last=False, sampler=test_sampler) model = inceptionv2_3D(num_classes=3,use_attention=True) if C.pretrained: # model =model_load_len8(C.pretrained_model_inceptionv2, model) model =model_load_test(C.pretrained_model_inceptionv2, model) # move to corresponding GPU model.cuda() model = DDP(model,device_ids=[args.local_rank] print('rank: {}, model scatteringdone'.format(args.local_rank)) criterion_CE =nn.CrossEntropyLoss(ignore_index=-1).cuda(device=args.local_rank) weight_group = [] group_weight_3d(weight_group,module=model, norm_layer=None, lr=C.lr) ## total_params_id = list(map(id,model.module.parameters())) fc_id = list(map(id,model.module.fc_new.parameters())) base_params_id = [x for x in total_params_id if x not in fc_id] total_params =list(model.module.parameters()) fc_params = [x for x intotal_params if id(x) in fc_id] base_params = [x for x intotal_params if id(x) in base_params_id] optimizer = torch.optim.SGD([ {'params': base_params, 'lr':C.lr}, {'params': fc_params, 'lr':C.lr * 10}], momentum=C.momentum,weight_decay=C.weight_decay) # set center loss #if args.center_loss: # center_criterion =center_loss.CenterLoss(num_classes=2, feat_dim=1024, use_gpu=True, # pretrained_path=C.pretrained_model_inceptionv2) # optimizer_center_loss =torch.optim.SGD(center_criterion.parameters(), lr=C.lr) # pdb.set_trace() # init_lr=[x['lr'] for x in optimizer.state_dict()['param_groups']] #per_iter_num=len(train_loader.batch_sampler) # # 对不同参数设置不同的学习率 # optimizer = torch.optim.SGD( # model.module.parameters(),lr=0.1 , # momentum = C.momentum, weight_decay=C.weight_decay) # 设置学习率调整策略 # scheduler =MultiStepLR(optimizer, milestones=mile_stones) scheduler =torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) # 开始训练 for epoch in range(C.start_epoch,C.epochs + 1): train_sampler.set_epoch(epoch) # train for one epoch train_with_prefeatch(train_loader, model, criterion_CE, optimizer,epoch, scheduler, args) # # evaluate on validation set validate_DDP(test_loader,model, epoch, args) if (args.local_rank == 0) and(epoch in C.snapshot_epoch): args.log('saving checkpoint...') save_checkpoint({ 'epoch': epoch, 'arch':'InceptionV2', 'state_dict':model.state_dict(), 'optimizer':optimizer.state_dict(), #'center_dict':center_criterion._parameters, }, False,filepath=C.snapshot_dir) def train_with_prefeatch(train_loader, model, criterion, optimizer, epoch,scheduler, args): cudnn.beckmark = True batch_time = AverageMeter() data_time = AverageMeter() losses = AverageMeter() #center_losses = AverageMeter() top1 = AverageMeter() top5 = AverageMeter( # 按需调整学习率 scheduler.step() # max_iter_num=per_iter_num*C.epochs # switch to train mode model.train() end = time.time() epoch_start = time.time() prefetcher =data_prefetcher(train_loader, device=args.local_rank) input, target = prefetcher.next() if args.local_rank == 0: print(len(target),target[0].size()) i = -1 while input is not None: i += 1 # print (i) data_time.update(time.time()- end) output, feat =model(input[0]) assert (len(output) ==len(target)) loss =torch.zeros([1]).cuda(args.local_rank) prec1_mean =torch.zeros([1]).cuda(args.local_rank) loss_CE = criterion(output,target) loss += loss_CE # loss_center,loss_others_to_centers, loss_center_to_center = center_criterion(feat, target) # print('loss', loss_center,loss_center_to_center, loss_others_to_centers) # loss += loss_CE +args.weight_center_loss * (loss_center + loss_others_to_centers +loss_center_to_center) acc1, acc5 = accuracy(output,target, topk=(1, 2)) losses.update(loss.item(),input[0].size(0)) #center_losses.update(loss_center.item(), input[0].size(0)) #others_to_centers_losses.update(loss_others_to_centers.item(),input[0].size(0)) #center_to_center_losses.update(loss_center_to_center.item(),input[0].size(0)) # prec1_mean, _ =accuracy(output[0].data, target.data, topk=(1, 2)) reduced_loss =reduce_tensor(loss.data) acc1 = reduce_tensor(acc1) acc5 = reduce_tensor(acc5) #reduced_center_loss =reduce_tensor(loss_center.data) #reduced_others_to_center_loss =reduce_tensor(loss_others_to_centers.data) #reduced_center_to_center_loss =reduce_tensor(loss_center_to_center.data) losses.update(reduced_loss.data.item(), input[0].size(0)) top1.update(acc1[0],input[0].size(0)) top5.update(acc5[0],input[0].size(0)) #center_losses.update(reduced_center_loss.data.item(),input[0].size(0)) #others_to_centers_losses.update(reduced_others_to_center_loss.data.item(),input[0].size(0)) #center_to_center_losses.update(reduced_center_to_center_loss.item(), input[0].size(0)) # optimizer =poly_lr_scheduler( optimizer, (epoch-1)*per_iter_num+i, max_iter_num,init_lr,power=1) # compute gradient and do SGD step optimizer.zero_grad() #optimizer_center_loss.zero_grad() loss.backward() optimizer.step() torch.cuda.synchronize() #for param incenter_criterion.parameters(): # param.grad.data *= (1. /args.weight_center_loss) #optimizer_center_loss.step() # measure elapsed time batch_time.update(time.time()- end) end = time.time() # featch next batch input, target =prefetcher.next() if (args.local_rank == 0) and(i % C.print_freq == 0): args.log('Rank: {0}\t' 'Epoch:[{1}][{2}/{3}]\t' 'Time{batch_time.val:.3f} ({batch_time.avg:.3f})\t' 'Data{data_time.val:.3f} ({data_time.avg:.3f})\t' 'Loss{loss.val:.4f} ({loss.avg:.4f})\t' 'Acc@1{top1.val:.3f} ({top1.avg:.3f})\t' 'Acc@5{top5.val:.3f} ({top5.avg:.3f})\t'.format(args.local_rank,epoch, i, len(train_loader), batch_time=batch_time,data_time=data_time, loss=losses, top1=top1, top5=top5),['{:.7f}'.format(x['lr']) for x in optimizer.state_dict()['param_groups']] epoch_end = time.time() if args.local_rank == 0: args.log('epoch time cost:{:.2f}s'.format(epoch_end - epoch_start)) def validate_DDP(val_loader, model, epoch, args): # switch to evaluate mode model.eval() end = time.time() cudnn.beckmark = False probs =torch.FloatTensor().cuda(args.local_rank) # 需要人工做gather label =torch.LongTensor().cuda(args.local_rank) if args.local_rank == 0: args.log('testing {} with epoch {}'.format(osp.split(C.test_file)[-1], epoch)) with torch.no_grad(): for i, (input, target) in enumerate(val_loader): if (args.local_rank == 0)and (i % C.print_freq == 0): args.log('testing {}th sample, time cost: {:.2f}s'.format(i * C.batch_size, time.time() - end)) input =input[0].cuda(device=args.local_rank, non_blocking=True) if isinstance(target,list) or isinstance(target, tuple): target =[x.cuda(device=args.local_rank, non_blocking=True) for x in target] target =[x.unsqueeze(dim=1) for x in target] target = torch.cat(target, dim=1) elif isinstance(target,torch.Tensor): target =target.cuda(device=args.local_rank, non_blocking=True) utput, feat =model(input) # 先转变成probability output = F.softmax(output, dim=1) # save current proborresponding target probs = torch.cat((probs,output), dim=0) label = torch.cat((label,target), dim=0) # gather probs and target rt_probs_list =[torch.zeros_like(probs) for i in range(dist.get_world_size())] rt_label_list =[torch.zeros_like(label) for i in range(dist.get_world_size())] dist.all_gather(rt_probs_list,probs) dist.all_gather(rt_label_list,label) probs_total = torch.cat(rt_probs_list, dim=0) label_total =torch.cat(rt_label_list, dim=0) probs_total =probs_total.data.cpu() label_total =label_total.data.cpu() if args.local_rank == 0: args.log('beforeprobs_total.shape: ', probs_total.shape) args.log('beforelabel_total.shape: ', label_total.shape) # 要去掉一些重复的,在DistributedTestSampler中导致的重复 sample_num =len(val_loader.dataset) probs_total =probs_total[0:sample_num, :] label_total = label_total[0:sample_num] if args.local_rank == 0: args.log('afterprobs_total.shape: ', probs_total.shape) args.log('afterlabel_total.shape: ', label_total.shape) acc_com(probs_total,label_total, args) def reduce_tensor(tensor, gpu_num=8): gpu_num = dist.get_world_size() rt = tensor.clone() dist.all_reduce(rt, op=dist.ReduceOp.SUM) rt /= gpu_num return rt if __name__ == '__main__': main()
    Processed: 0.009, SQL: 9