最近入坑pytorch框架,毕竟现在pytorch这么流行,怎么能不学习一波。一般来说训练和测试神经网络的流程基本上是大同小异的
训练过程一般要做的事情如下
train_datasets = MyDataset() # 第一步:构造Dataset对象 train_dataloader = DataLoader(train_datasets)# 第二步:通过DataLoader来构造迭代对象 model = MyNet() #以交叉熵损失函数为例子 criterion = nn.CrossEntropyLoss() #初始化优化器 optimizer = torch.optim.SGD(model.parameters(), lr=0.01) num_epoches = 100 for epoch in range(num_epoches):# 第三步:逐步迭代数据 model.train() for i,(inputs,labels) in enumerate(train_dataloader): optimizer.zero_grad() #通过输入得到预测的输出 pred = model(inputs) #计算损失函数 loss = criterion(pred, labels) #反向传播 loss.backward() optimizer.step() #每隔10个batch_sie输出一次loss #len(train_datasets) // batch_size的含义是表示有多少个batch_size #上面循环中i的范围应该是从0到len(train_datasets) // batch_size-1 if (i+1) % 10 == 0: print('Epoch:[%d/%d],Step:[%d/%d],Loss:%.4f' % (epoch + 1, num_epochs, i + 1, len(train_datasets) // batch_size, loss.item())) #每次跑一次epoch都保存一下模型 torch.save(model, path)例子:主要是参考了下面这篇博客 博客一:https://www.jianshu.com/p/1cd6333128a1
#-*- coding:utf-8 -*- '''本文件用于举例说明pytorch保存和加载文件的方法''' __author__ = 'puxitong from UESTC' import torch as torch import torchvision as tv import torch.nn as nn import torch.optim as optim import torch.nn.functional as F import torchvision.transforms as transforms from torchvision.transforms import ToPILImage import torch.backends.cudnn as cudnn import datetime import argparse # 参数声明 batch_size = 32 epochs = 10 WORKERS = 0 # dataloder线程数 test_flag = True #测试标志,True时加载保存好的模型进行测试 ROOT = '/home/pxt/pytorch/cifar' # MNIST数据集保存路径 log_dir = '/home/pxt/pytorch/logs/cifar_model.pth' # 模型保存路径 # 加载MNIST数据集 transform = tv.transforms.Compose([ transforms.ToTensor(), transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]) train_data = tv.datasets.CIFAR10(root=ROOT, train=True, download=True, transform=transform) test_data = tv.datasets.CIFAR10(root=ROOT, train=False, download=False, transform=transform) train_load = torch.utils.data.DataLoader(train_data, batch_size=batch_size, shuffle=True, num_workers=WORKERS) test_load = torch.utils.data.DataLoader(test_data, batch_size=batch_size, shuffle=False, num_workers=WORKERS) # 构造模型 class Net(nn.Module): def __init__(self): super(Net, self).__init__() self.conv1 = nn.Conv2d(3, 64, 3, padding=1) self.conv2 = nn.Conv2d(64, 128, 3, padding=1) self.conv3 = nn.Conv2d(128, 256, 3, padding=1) self.conv4 = nn.Conv2d(256, 256, 3, padding=1) self.pool = nn.MaxPool2d(2, 2) self.fc1 = nn.Linear(256 * 8 * 8, 1024) self.fc2 = nn.Linear(1024, 256) self.fc3 = nn.Linear(256, 10) def forward(self, x): x = F.relu(self.conv1(x)) x = self.pool(F.relu(self.conv2(x))) x = F.relu(self.conv3(x)) x = self.pool(F.relu(self.conv4(x))) x = x.view(-1, x.size()[1] * x.size()[2] * x.size()[3]) x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return x model = Net().cuda() criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(model.parameters(), lr=0.01) # 模型训练 def train(model, train_loader, epoch): model.train() train_loss = 0 for i, data in enumerate(train_loader, 0): x, y = data x = x.cuda() y = y.cuda() optimizer.zero_grad() y_hat = model(x) loss = criterion(y_hat, y) loss.backward() optimizer.step() train_loss += loss loss_mean = train_loss / (i+1) print('Train Epoch: {}\t Loss: {:.6f}'.format(epoch, loss_mean.item())) # 模型测试 def test(model, test_loader): model.eval() test_loss = 0 correct = 0 with torch.no_grad(): for i, data in enumerate(test_loader, 0): x, y = data x = x.cuda() y = y.cuda() y_hat = model(x) test_loss += criterion(y_hat, y).item() pred = y_hat.max(1, keepdim=True)[1] correct += pred.eq(y.view_as(pred)).sum().item() test_loss /= (i+1) print('Test set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format( test_loss, correct, len(test_data), 100. * correct / len(test_data))) def main(): # 如果test_flag=True,则加载已保存的模型 if test_flag: # 加载保存的模型直接进行测试机验证,不进行此模块以后的步骤 checkpoint = torch.load(log_dir) model.load_state_dict(checkpoint['model']) optimizer.load_state_dict(checkpoint['optimizer']) epochs = checkpoint['epoch'] test(model, test_load) return for epoch in range(0, epochs): train(model, train_load, epoch) test(model, test_load) # 保存模型 state = {'model':model.state_dict(), 'optimizer':optimizer.state_dict(), 'epoch':epoch} torch.save(state, log_dir) if __name__ == '__main__': main()博客二:https://blog.csdn.net/weixin_41424926/article/details/105383064#12__12
if __name__ == '__main__': epoch = 50 batchsize = 5 lr = 0.01 train_data = VOC2012() train_dataloader = DataLoader(VOC2012(is_train=True),batch_size=batchsize,shuffle=True) model = YOLOv1_resnet().cuda() # model.children()里是按模块(Sequential)提取的子模块,而不是具体到每个层,具体可以参见pytorch帮助文档 # 冻结resnet34特征提取层,特征提取层不参与参数更新 for layer in model.children(): layer.requires_grad = False break criterion = Loss_yolov1() optimizer = torch.optim.SGD(model.parameters(),lr=lr,momentum=0.9,weight_decay=0.0005) is_vis = False # 是否进行可视化,如果没有visdom可以将其设置为false if is_vis: vis = visdom.Visdom() viswin1 = vis.line(np.array([0.]),np.array([0.]),opts=dict(title="Loss/Step",xlabel="100*step",ylabel="Loss")) for e in range(epoch): model.train() yl = torch.Tensor([0]).cuda() for i,(inputs,labels) in enumerate(train_dataloader): inputs = inputs.cuda() labels = labels.float().cuda() pred = model(inputs) loss = criterion(pred, labels) optimizer.zero_grad() loss.backward() optimizer.step() print("Epoch %d/%d| Step %d/%d| Loss: %.2f"%(e,epoch,i,len(train_data)//batchsize,loss)) yl = yl + loss if is_vis and (i+1)%100==0: vis.line(np.array([yl.cpu().item()/(i+1)]),np.array([i+e*len(train_data)//batchsize]),win=viswin1,update='append') if (e+1)%10==0: torch.save(model,"./models_pkl/YOLOv1_epoch"+str(e+1)+".pkl") # compute_val_map(model)
测试过程一般要做的事情如下
model = torch.load(path) for i,(inputs,labels) in enumerate(test_dataloader): pred = model(inputs)