Commit 1cf5eeda authored by Federico Betti's avatar Federico Betti
Browse files

sol and util added

parent 4339fe3d
import time
import torch
import argparse
import torch.backends.cudnn as cudnn
import torch.multiprocessing as mp
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data.distributed
from torch.utils.tensorboard import SummaryWriter
from torchvision import datasets, transforms, models
import horovod.torch as hvd
import os
import math
from tqdm import tqdm
# Training settings
from util.metric_saving import Metric
parser = argparse.ArgumentParser(description='PyTorch ImageNet Example',
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--train-dir', default=os.path.expanduser('~/imagenet/train'),
help='path to training data')
parser.add_argument('--val-dir', default=os.path.expanduser('~/imagenet/validation'),
help='path to validation data')
parser.add_argument('--log-dir', default='./logs',
help='tensorboard log directory')
parser.add_argument('--checkpoint-format', default='./checkpoint-{epoch}.pth.tar',
help='checkpoint file format')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
parser.add_argument('--batches-per-allreduce', type=int, default=1,
help='number of batches processed locally before '
'executing allreduce across workers; it multiplies '
'total batch size.')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
help='apply gradient predivide factor in optimizer (default: 1.0)')
# Default settings from https://arxiv.org/abs/1706.02677.
parser.add_argument('--batch-size', type=int, default=64,
help='input batch size for training')
parser.add_argument('--val-batch-size', type=int, default=32,
help='input batch size for validation')
parser.add_argument('--epochs', type=int, default=90,
help='number of epochs to train')
parser.add_argument('--base-lr', type=float, default=0.0125,
help='learning rate for a single GPU')
parser.add_argument('--warmup-epochs', type=float, default=5,
help='number of warmup epochs')
parser.add_argument('--momentum', type=float, default=0.9,
help='SGD momentum')
parser.add_argument('--wd', type=float, default=0.00005,
help='weight decay')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42,
help='random seed')
parser.add_argument('--run-name', default='', help='tensorboard log directory name')
def train(epoch):
model.train()
train_sampler.set_epoch(epoch)
train_loss = Metric('train_loss')
train_accuracy = Metric('train_accuracy')
iteration = epoch * len(train_loader)
with tqdm(total=len(train_loader),
desc='Train Epoch #{}'.format(epoch + 1),
disable=not verbose) as t:
for batch_idx, (data, target) in enumerate(train_loader):
adjust_learning_rate(epoch, batch_idx)
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
# Split data into sub-batches of size batch_size
for i in range(0, len(data), args.batch_size):
data_batch = data[i:i + args.batch_size]
target_batch = target[i:i + args.batch_size]
output = model(data_batch)
train_accuracy.update(accuracy(output, target_batch))
loss = F.cross_entropy(output, target_batch)
train_loss.update(loss)
# Average gradients among sub-batches
loss.div_(math.ceil(float(len(data)) / args.batch_size))
loss.backward()
# Gradient is applied across all ranks
optimizer.step()
t.set_postfix({'loss': train_loss.avg.item(),
'accuracy': 100. * train_accuracy.avg.item()})
t.update(1)
if log_writer:
log_writer.add_scalar('train/loss', train_loss.avg, iteration)
log_writer.add_scalar('train/accuracy', train_accuracy.avg, iteration)
iteration += 1
def validate(epoch):
model.eval()
val_loss = Metric('val_loss')
val_accuracy = Metric('val_accuracy')
with tqdm(total=len(val_loader),
desc='Validate Epoch #{}'.format(epoch + 1),
disable=not verbose) as t:
with torch.no_grad():
for data, target in val_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
output = model(data)
val_loss.update(F.cross_entropy(output, target))
val_accuracy.update(accuracy(output, target))
t.set_postfix({'loss': val_loss.avg.item(),
'accuracy': 100. * val_accuracy.avg.item()})
t.update(1)
if log_writer:
log_writer.add_scalar('val/loss', val_loss.avg, epoch)
log_writer.add_scalar('val/accuracy', val_accuracy.avg, epoch)
# Horovod: using `lr = base_lr * hvd.size()` from the very beginning leads to worse final
# accuracy. Scale the learning rate `lr = base_lr` ---> `lr = base_lr * hvd.size()` during
# the first five epochs. See https://arxiv.org/abs/1706.02677 for details.
# After the warmup reduce learning rate by 10 on the 30th, 60th and 80th epochs.
def adjust_learning_rate(epoch, batch_idx):
if epoch < args.warmup_epochs:
epoch += float(batch_idx + 1) / len(train_loader)
lr_adj = 1. / hvd.size() * (epoch * (hvd.size() - 1) / args.warmup_epochs + 1)
elif epoch < 30:
lr_adj = 1.
elif epoch < 60:
lr_adj = 1e-1
elif epoch < 80:
lr_adj = 1e-2
else:
lr_adj = 1e-3
for param_group in optimizer.param_groups:
param_group['lr'] = args.base_lr * hvd.size() * args.batches_per_allreduce * lr_adj
def accuracy(output, target):
# get the index of the max log-probability
pred = output.max(1, keepdim=True)[1]
return pred.eq(target.view_as(pred)).cpu().float().mean()
def save_checkpoint(epoch):
if hvd.rank() == 0:
filepath = args.checkpoint_format.format(epoch=epoch + 1)
state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
}
torch.save(state, filepath)
if __name__ == '__main__':
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
allreduce_batch_size = args.batch_size * args.batches_per_allreduce
hvd.init()
torch.manual_seed(args.seed)
if args.run_name == "":
run_name = time.time()
else:
run_name = args.run_name
summary_writer_dir = os.path.join(args.log_dir, run_name)
os.makedirs(summary_writer_dir, exist_ok=True)
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
cudnn.benchmark = True
# If set > 0, will resume training from a given checkpoint.
resume_from_epoch = 0
for try_epoch in range(args.epochs, 0, -1):
if os.path.exists(args.checkpoint_format.format(epoch=try_epoch)):
resume_from_epoch = try_epoch
break
# Horovod: broadcast resume_from_epoch from rank 0 (which will have
# checkpoints) to other ranks.
resume_from_epoch = hvd.broadcast(torch.tensor(resume_from_epoch), root_rank=0,
name='resume_from_epoch').item()
# Horovod: print logs on the first worker.
verbose = 1 if hvd.rank() == 0 else 0
# Horovod: write TensorBoard logs on first worker.
log_writer = SummaryWriter(run_name) if (hvd.rank() == 0) else None
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(4)
kwargs = {'num_workers': 4, 'pin_memory': True} if args.cuda else {}
# When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
# issues with Infiniband implementations that are not fork-safe
if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
kwargs['multiprocessing_context'] = 'forkserver'
train_dataset = \
datasets.ImageFolder(args.train_dir,
transform=transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
]))
# Horovod: use DistributedSampler to partition data among workers. Manually specify
# `num_replicas=hvd.size()` and `rank=hvd.rank()`.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=allreduce_batch_size,
sampler=train_sampler, **kwargs)
val_dataset = \
datasets.ImageFolder(args.val_dir,
transform=transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
]))
val_sampler = torch.utils.data.distributed.DistributedSampler(
val_dataset, num_replicas=hvd.size(), rank=hvd.rank())
val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=args.val_batch_size,
sampler=val_sampler, **kwargs)
# Set up standard ResNet-50 model.
#model = models.resnet50()
model = models.resnet18()
# By default, Adasum doesn't need scaling up learning rate.
# For sum/average with gradient Accumulation: scale learning rate by batches_per_allreduce
lr_scaler = args.batches_per_allreduce * hvd.size() if not args.use_adasum else 1
if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = args.batches_per_allreduce * hvd.local_size()
# Horovod: scale learning rate by the number of GPUs.
optimizer = optim.SGD(model.parameters(),
lr=(args.base_lr *
lr_scaler),
momentum=args.momentum, weight_decay=args.wd)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(
optimizer, named_parameters=model.named_parameters(),
compression=compression,
backward_passes_per_step=args.batches_per_allreduce,
op=hvd.Adasum if args.use_adasum else hvd.Average,
gradient_predivide_factor=args.gradient_predivide_factor)
# Restore from a previous checkpoint, if initial_epoch is specified.
# Horovod: restore on the first worker which will broadcast weights to other workers.
if resume_from_epoch > 0 and hvd.rank() == 0:
filepath = args.checkpoint_format.format(epoch=resume_from_epoch)
checkpoint = torch.load(filepath)
model.load_state_dict(checkpoint['model'])
optimizer.load_state_dict(checkpoint['optimizer'])
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
for epoch in range(resume_from_epoch, args.epochs):
train(epoch)
validate(epoch)
save_checkpoint(epoch)
\ No newline at end of file
import os
import random
import torch
from sentence_transformers import evaluation
def batch_to_device(batch, target_device):
"""
send a pytorch batch to a device (CPU/GPU)
"""
for key in batch:
if isinstance(batch[key], torch.Tensor):
batch[key] = batch[key].to(target_device)
return batch
class SmartBatchingClass:
def __init__(self, model, device):
self.model = model
self.device = device
def __call__(self, batch):
"""
Transforms a batch from a SmartBatchingDataset to a batch of tensors for the model
Here, batch is a list of tuples: [(tokens, label), ...]
:param batch:
a batch from a SmartBatchingDataset
:return:
a batch of tensors for the model
"""
num_texts = len(batch[0].texts)
texts = [[] for _ in range(num_texts)]
labels = []
for example in batch:
for idx, text in enumerate(example.texts):
texts[idx].append(text)
labels.append(example.label)
labels = torch.tensor(labels).to(self.device)
sentence_features = []
for idx in range(num_texts):
tokenized = self.model.tokenize(texts[idx])
batch_to_device(tokenized, self.model._target_device)
sentence_features.append(tokenized)
return sentence_features, labels
def get_evaluator(dataset_path, max_corpus_size):
ir_queries = {} # Our queries (qid => question)
ir_needed_qids = set() # QIDs we need in the corpus
ir_corpus = {} # Our corpus (qid => question)
ir_relevant_docs = {} # Mapping of relevant documents for a given query (qid => set([relevant_question_ids])
with open(os.path.join(dataset_path, 'information-retrieval/dev-queries.tsv'), encoding='utf8') as fIn:
next(fIn) # Skip header
for line in fIn:
qid, query, duplicate_ids = line.strip().split('\t')
duplicate_ids = duplicate_ids.split(',')
ir_queries[qid] = query
ir_relevant_docs[qid] = set(duplicate_ids)
for qid in duplicate_ids:
ir_needed_qids.add(qid)
# First get all needed relevant documents (i.e., we must ensure, that the relevant questions are actually in the corpus
distraction_questions = {}
with open(os.path.join(dataset_path, 'information-retrieval/corpus.tsv'), encoding='utf8') as fIn:
next(fIn) # Skip header
for line in fIn:
qid, question = line.strip().split('\t')
if qid in ir_needed_qids:
ir_corpus[qid] = question
else:
distraction_questions[qid] = question
# Now, also add some irrelevant questions to fill our corpus
other_qid_list = list(distraction_questions.keys())
random.shuffle(other_qid_list)
for qid in other_qid_list[0:max(0, max_corpus_size - len(ir_corpus))]:
ir_corpus[qid] = distraction_questions[qid]
# Given queries, a corpus and a mapping with relevant documents, the InformationRetrievalEvaluator computes different IR
# metrices. For our use case MRR@k and Accuracy@k are relevant.
ir_evaluator = evaluation.InformationRetrievalEvaluator(ir_queries, ir_corpus, ir_relevant_docs,
main_score_function='cos_sim', map_at_k=[10])
evaluators = []
evaluators.append(ir_evaluator)
# Create a SequentialEvaluator. This SequentialEvaluator runs all three evaluators in a sequential order.
# We optimize the model with respect to the score from the last evaluator (scores[-1])
seq_evaluator = evaluation.SequentialEvaluator(evaluators, main_score_function=lambda scores: scores[-1])
return seq_evaluator
import torch
class Metric(object):
def __init__(self, name):
self.name = name
self.sum = torch.tensor(0.)
self.n = torch.tensor(0.)
self.last = torch.tensor(0.)
def update(self, val):
self.sum += val.detach().cpu()
self.last = val.detach().cpu()
self.n += 1
@property
def avg(self):
return self.sum / self.n
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment