Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/python3.5
- import torch
- import torch.nn as nn
- import torch.optim as optim
- import torch.nn.functional as F
- import torch.backends.cudnn as cudnn
- import torchvision
- import torchvision.transforms as transforms
- import os
- import argparse
- import math
- from torch.autograd import Variable
- import os
- import torch
- import torch.distributed as dist
- import torch.multiprocessing as mp
- from torch.utils.data.distributed import DistributedSampler
- from torch.utils.data.sampler import RandomSampler
- from torch.optim.lr_scheduler import MultiStepLR
- from torch.optim.lr_scheduler import LambdaLR
- import numpy as np
- import copy
- from mpi4py import MPI
- import random
- import time
- comm = MPI.COMM_WORLD
- size = comm.Get_size()
- rank = comm.Get_rank()
- import os
- NGPU_PER_MACHINE = 8
- os.environ["CUDA_DEVICE_ORDER"]="PCI_BUS_ID"
- os.environ["CUDA_VISIBLE_DEVICES"]= "%d" % (rank % NGPU_PER_MACHINE)
- print ("Rank %d, Using GPU %s" % (rank, os.environ["CUDA_VISIBLE_DEVICES"]))
- import sys
- LEARNING_RATE = float(sys.argv[1])
- MOMEMTUM = float(sys.argv[2])
- COMMUNICATION = sys.argv[3]
- PACKAGE_DROP_RATE = float(sys.argv[4])
- PARTITIONED_LOAD = False
- if rank == 0:
- print ("# LEARNING_RATE: %f" % LEARNING_RATE)
- print ("# MOMEMTUM: %f" % MOMEMTUM)
- print ("# COMMUNICATION: %s" % COMMUNICATION)
- print ("# PACKAGE_DROP_RATE: %f" % PACKAGE_DROP_RATE)
- transform_train = transforms.Compose([
- # transforms.RandomCrop(32, padding=4),
- # transforms.RandomHorizontalFlip(),
- transforms.ToTensor(),
- transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
- ])
- transform_test = transforms.Compose([
- transforms.ToTensor(),
- transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
- ])
- """
- class Bottleneck(nn.Module):
- def __init__(self, in_planes, growth_rate):
- super(Bottleneck, self).__init__()
- self.bn1 = nn.BatchNorm2d(in_planes)
- self.conv1 = nn.Conv2d(in_planes, 4*growth_rate, kernel_size=1, bias=False)
- self.bn2 = nn.BatchNorm2d(4*growth_rate)
- self.conv2 = nn.Conv2d(4*growth_rate, growth_rate, kernel_size=3, padding=1, bias=False)
- def forward(self, x):
- out = self.conv1(F.relu(self.bn1(x)))
- out = self.conv2(F.relu(self.bn2(out)))
- out = torch.cat([out,x], 1)
- return out
- class Transition(nn.Module):
- def __init__(self, in_planes, out_planes):
- super(Transition, self).__init__()
- self.bn = nn.BatchNorm2d(in_planes)
- self.conv = nn.Conv2d(in_planes, out_planes, kernel_size=1, bias=False)
- def forward(self, x):
- out = self.conv(F.relu(self.bn(x)))
- out = F.avg_pool2d(out, 2)
- return out
- class DenseNet(nn.Module):
- def __init__(self, block, nblocks, growth_rate=12, reduction=0.5, num_classes=10):
- super(DenseNet, self).__init__()
- self.growth_rate = growth_rate
- num_planes = 2*growth_rate
- self.conv1 = nn.Conv2d(3, num_planes, kernel_size=3, padding=1, bias=False)
- self.dense1 = self._make_dense_layers(block, num_planes, nblocks[0])
- num_planes += nblocks[0]*growth_rate
- out_planes = int(math.floor(num_planes*reduction))
- self.trans1 = Transition(num_planes, out_planes)
- num_planes = out_planes
- self.dense2 = self._make_dense_layers(block, num_planes, nblocks[1])
- num_planes += nblocks[1]*growth_rate
- out_planes = int(math.floor(num_planes*reduction))
- self.trans2 = Transition(num_planes, out_planes)
- num_planes = out_planes
- self.dense3 = self._make_dense_layers(block, num_planes, nblocks[2])
- num_planes += nblocks[2]*growth_rate
- out_planes = int(math.floor(num_planes*reduction))
- self.trans3 = Transition(num_planes, out_planes)
- num_planes = out_planes
- self.dense4 = self._make_dense_layers(block, num_planes, nblocks[3])
- num_planes += nblocks[3]*growth_rate
- self.bn = nn.BatchNorm2d(num_planes)
- self.linear = nn.Linear(num_planes, num_classes)
- def _make_dense_layers(self, block, in_planes, nblock):
- layers = []
- for i in range(nblock):
- layers.append(block(in_planes, self.growth_rate))
- in_planes += self.growth_rate
- return nn.Sequential(*layers)
- def forward(self, x):
- out = self.conv1(x)
- out = self.trans1(self.dense1(out))
- out = self.trans2(self.dense2(out))
- out = self.trans3(self.dense3(out))
- out = self.dense4(out)
- out = F.avg_pool2d(F.relu(self.bn(out)), 4)
- out = out.view(out.size(0), -1)
- out = self.linear(out)
- return out
- net = DenseNet(Bottleneck, [6,12,24,16], growth_rate=12)
- net2 = DenseNet(Bottleneck, [6,12,24,16], growth_rate=12)
- """
- class BasicBlock(nn.Module):
- expansion = 1
- def __init__(self, in_planes, planes, stride=1):
- super(BasicBlock, self).__init__()
- self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
- self.bn1 = nn.BatchNorm2d(planes)
- self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
- self.bn2 = nn.BatchNorm2d(planes)
- self.shortcut = nn.Sequential()
- if stride != 1 or in_planes != self.expansion*planes:
- self.shortcut = nn.Sequential(
- nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
- nn.BatchNorm2d(self.expansion*planes)
- )
- def forward(self, x):
- out = F.relu(self.bn1(self.conv1(x)))
- out = self.bn2(self.conv2(out))
- out += self.shortcut(x)
- out = F.relu(out)
- return out
- class Bottleneck(nn.Module):
- expansion = 4
- def __init__(self, in_planes, planes, stride=1):
- super(Bottleneck, self).__init__()
- self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
- self.bn1 = nn.BatchNorm2d(planes)
- self.conv2 = nn.Conv2d(planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
- self.bn2 = nn.BatchNorm2d(planes)
- self.conv3 = nn.Conv2d(planes, self.expansion*planes, kernel_size=1, bias=False)
- self.bn3 = nn.BatchNorm2d(self.expansion*planes)
- self.shortcut = nn.Sequential()
- if stride != 1 or in_planes != self.expansion*planes:
- self.shortcut = nn.Sequential(
- nn.Conv2d(in_planes, self.expansion*planes, kernel_size=1, stride=stride, bias=False),
- nn.BatchNorm2d(self.expansion*planes)
- )
- def forward(self, x):
- out = F.relu(self.bn1(self.conv1(x)))
- out = F.relu(self.bn2(self.conv2(out)))
- out = self.bn3(self.conv3(out))
- out += self.shortcut(x)
- out = F.relu(out)
- return out
- class ResNet(nn.Module):
- def __init__(self, block, num_blocks, num_classes=10):
- super(ResNet, self).__init__()
- self.in_planes = 64
- self.conv1 = nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
- self.bn1 = nn.BatchNorm2d(64)
- self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
- self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
- self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
- self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)
- self.linear = nn.Linear(512*block.expansion, num_classes)
- def _make_layer(self, block, planes, num_blocks, stride):
- strides = [stride] + [1]*(num_blocks-1)
- layers = []
- for stride in strides:
- layers.append(block(self.in_planes, planes, stride))
- self.in_planes = planes * block.expansion
- return nn.Sequential(*layers)
- def forward(self, x):
- out = F.relu(self.bn1(self.conv1(x)))
- out = self.layer1(out)
- out = self.layer2(out)
- out = self.layer3(out)
- out = self.layer4(out)
- out = F.avg_pool2d(out, 4)
- out = out.view(out.size(0), -1)
- out = self.linear(out)
- return out
- net = ResNet(BasicBlock, [2, 2, 2, 2])
- net2 = ResNet(BasicBlock, [2, 2, 2, 2])
- net.cuda()
- net2.cuda()
- trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
- if PARTITIONED_LOAD == False:
- dsampler = RandomSampler(trainset)
- trainloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler)
- else:
- dsampler = DistributedSampler(trainset, size, rank)
- trainloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler)
- dsampler2 = DistributedSampler(trainset, size, rank)
- evalloader = torch.utils.data.DataLoader(trainset, batch_size=8192/size, shuffle=False, sampler=dsampler2)
- criterion = nn.CrossEntropyLoss()
- optimizer = optim.SGD(net.parameters(), lr=LEARNING_RATE, momentum=MOMEMTUM, weight_decay=0.0)
- scheduler = MultiStepLR(optimizer, milestones=[50,], gamma=0.1)
- #scheduler = LambdaLR(optimizer, lr_lambda=lambda epoch: LEARNING_RATE / math.sqrt(epoch+1))
- model = net.cuda()
- model2 = net2.cuda()
- random.seed(rank)
- ## Model Avg -- Everyone starts from the same mdoel at the begining
- for param in model.parameters():
- view = param.cpu().data.numpy()
- view2 = 0.0 * view
- comm.Allreduce(view, view2, op=MPI.SUM)
- param.data = torch.from_numpy(view2).cuda()
- param.data /= size
- i = 0
- for iepoch in range(0, 100):
- if PARTITIONED_LOAD == True:
- scheduler.step()
- net.train()
- train_loss = 0
- correct = 0
- total = 0
- start = time.time()
- if (iepoch + 1) % 25 == 0:
- LEARNING_RATE = LEARNING_RATE / 10
- for batch_idx, (inputs, targets) in enumerate(trainloader):
- i = i + 1
- if PARTITIONED_LOAD == False:
- if (i % int(50000/8192)) == 0:
- scheduler.step()
- optimizer.zero_grad()
- inputs, targets = Variable(inputs.cuda()), Variable(targets.cuda())
- outputs = net(inputs)
- loss = criterion(outputs, targets)
- loss.backward()
- # Centralized Communications
- #
- if COMMUNICATION == "CENTRALIZED":
- for param in model.parameters():
- view = param.grad.cpu().data.numpy()
- view2 = 0.0 * view
- comm.Allreduce(view, view2, op=MPI.SUM)
- view2 = view2 / size
- #newmodel = param.cpu().data.numpy() - LEARNING_RATE * view2
- #param.data = torch.from_numpy(newmodel).cuda()
- param.grad.data = torch.from_numpy(view2).cuda()
- # TODO:
- # try impl SGD by itself
- optimizer.step()
- elif COMMUNICATION == "DECENTRALIZED":
- optimizer.step()
- for param in model.parameters():
- view = param.cpu().data.numpy()
- model_neighbor1 = 0.0 * view
- model_neighbor2 = 0.0 * view
- neighbor1 = (rank - 1) % size
- neighbor2 = (rank + 1) % size
- send_req1 = comm.isend(view, dest=neighbor1)
- send_req2 = comm.isend(view, dest=neighbor2)
- model_neighbor1 = comm.recv(source=neighbor1)
- model_neighbor2 = comm.recv(source=neighbor2)
- send_req1.wait()
- send_req2.wait()
- view2 = (model_neighbor1 + model_neighbor2 + view) / 3
- param.data = torch.from_numpy(view2).cuda()
- elif COMMUNICATION == "PACKAGEDROP":
- optimizer.step()
- for param in model.parameters():
- view = param.cpu().data.numpy()
- drop1_local = np.array([0.0,])
- if random.random() < PACKAGE_DROP_RATE:
- view = 0.0 * view
- drop1_local = drop1_local + 1
- #print (" drop1 package rank", rank)
- drop1_global = np.array([0.0,])
- comm.Allreduce(drop1_local, drop1_global, op=MPI.SUM)
- drop1_global = drop1_global[0]
- #print ("drop1_global = ", drop1_global, "/", size)
- view2 = 0.0 * view
- comm.Allreduce(view, view2, op=MPI.SUM)
- view2 = view2 / (size - drop1_global)
- if random.random() < PACKAGE_DROP_RATE:
- pass
- #print (" drop2 package rank", rank)
- else:
- param.data = torch.from_numpy(view2).cuda()
- if (i % int(50000/8192)) == 0:
- if rank == 0:
- elapsed = time.time() - start
- # Evaluation
- net2.train()
- net2.load_state_dict(net.state_dict())
- for param in model2.parameters():
- view = param.cpu().data.numpy()
- view2 = 1.0 * view
- comm.Allreduce(view, view2, op=MPI.SUM)
- param.data = torch.from_numpy(view2).cuda()
- param.data /= size
- _train_loss = 0
- for batch_idx, (inputs, targets) in enumerate(evalloader):
- inputs, targets = Variable(inputs.cuda()), Variable(targets.cuda())
- outputs = net2(inputs)
- loss = criterion(outputs, targets)
- _train_loss += loss.data[0]
- _train_loss = np.array([_train_loss,])
- train_loss = 0.0 * _train_loss
- comm.Allreduce(_train_loss, train_loss, op=MPI.SUM)
- train_loss = train_loss[0]
- if PARTITIONED_LOAD == False:
- if rank == 0:
- print ('%d Loss: %.3f | %f seconds | LR: %f ~' % (i / int(50000/8192),train_loss/size/(batch_idx+1),elapsed, LEARNING_RATE))
- else:
- if rank == 0:
- print ('%d Loss: %.3f | %f seconds | LR: %f' % (iepoch,train_loss/size/(batch_idx+1),elapsed, LEARNING_RATE))
Add Comment
Please, Sign In to add comment