Source code for deeprank.learn.NeuralNet

#!/usr/bin/env python
import os
import sys
import time

import h5py
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import numpy as np
import warnings

import torch
import torch.cuda
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data_utils
from torchsummary import summary

from deeprank.config import logger
from deeprank.learn import DataSet, classMetrics, rankingMetrics
from torch.autograd import Variable

matplotlib.use('agg')


[docs]class NeuralNet(): def __init__(self, data_set, model, model_type='3d', proj2d=0, task='reg', class_weights = None, pretrained_model=None, chain1='A', chain2='B', cuda=False, ngpu=0, plot=False, save_hitrate=False, save_classmetrics=False, outdir='./'): """Train a Convolutional Neural Network for DeepRank. Args: data_set (deeprank.DataSet or list(str)): Data set used for training or testing. - deeprank.DataSet for training; - str or list(str), e.g. 'x.hdf5', ['x1.hdf5', 'x2.hdf5'], for testing when pretrained model is loaded. model (nn.Module): Definition of the NN to use. Must subclass nn.Module. See examples in model2d.py and model3d.py model_type (str): Type of model we want to use. Must be '2d' or '3d'. If we specify a 2d model, the data set is automatically converted to the correct format. proj2d (int): Defines how to slice the 3D volumetric data to generate 2D data. Allowed values are 0, 1 and 2, which are to slice along the YZ, XZ or XY plane, respectively. task (str 'reg' or 'class'): Task to perform. - 'reg' for regression - 'class' for classification. The loss function, the target datatype and plot functions will be autmatically adjusted depending on the task. class_weights (Tensor): a manual rescaling weight given to each class. If given, has to be a Tensor of size #classes. Only applicable on 'class' task. pretrained_model (str): Saved model to be used for further training or testing. When using pretrained model, remember to set the following 'chain1' and 'chain2' for the new data. chain1 (str): first chain ID of new data when using pretrained model chain2 (str): second chain ID of new data when using pretrained model cuda (bool): Use CUDA. ngpu (int): number of GPU to be used. plot (bool): Plot the prediction results. save_hitrate (bool): Save and plot hit rate. save_classmetrics (bool): Save and plot classification metrics. Classification metrics include: - accuracy(ACC) - sensitivity(TPR) - specificity(TNR) outdir (str): output directory Raises: ValueError: if dataset format is not recognized ValueError: if task is not recognized Examples: Train models: >>> data_set = Dataset(...) >>> model = NeuralNet(data_set, cnn, ... model_type='3d', task='reg', ... plot=True, save_hitrate=True, ... outdir='./out/') >>> model.train(nepoch = 50, divide_trainset=0.8, ... train_batch_size = 5, num_workers=0) Test a model on new data: >>> data_set = ['test01.hdf5', 'test02.hdf5'] >>> model = NeuralNet(data_set, cnn, ... pretrained_model = './model.pth.tar', ... outdir='./out/') >>> model.test() """ # ------------------------------------------ # Dataset # ------------------------------------------ # data set and model self.data_set = data_set # pretrained model self.pretrained_model = pretrained_model self.class_weights = class_weights if isinstance(self.data_set, (str, list)) and pretrained_model is None: raise ValueError( 'Argument data_set must be a DeepRankDataSet object\ when no pretrained model is loaded') # load the model if self.pretrained_model is not None: if not cuda: self.state = torch.load(self.pretrained_model, map_location='cpu') else: self.state = torch.load(self.pretrained_model) # create the dataset if required # but don't process it yet if isinstance(self.data_set, (str, list)): self.data_set = DataSet(self.data_set, chain1=chain1, chain2=chain2, process=False) # load the model and # change dataset parameters self.load_data_params() # process it self.data_set.process_dataset() # convert the data to 2d if necessary if model_type == '2d': self.data_set.transform = True self.data_set.proj2D = proj2d self.data_set.get_input_shape() # ------------------------------------------ # CUDA # ------------------------------------------ # CUDA required self.cuda = cuda self.ngpu = ngpu # handles GPU/CUDA if self.ngpu > 0: self.cuda = True if self.ngpu == 0 and self.cuda: self.ngpu = 1 # ------------------------------------------ # Regression or classification # ------------------------------------------ # task to accomplish self.task = task # Set the loss functiom if self.task == 'reg': self.criterion = nn.MSELoss(reduction='sum') self._plot = self._plot_scatter_reg elif self.task == 'class': self.criterion = nn.CrossEntropyLoss(weight = self.class_weights, reduction='mean') self._plot = self._plot_boxplot_class self.data_set.normalize_targets = False else: raise ValueError( f"Task {self.task} not recognized. Options are:\n\t " f"reg': regression \n\t 'class': classifiation\n") # ------------------------------------------ # Output # ------------------------------------------ # plot or not plot self.plot = plot # plot and save hitrate or not self.save_hitrate = save_hitrate # plot and save classification metrics or not self.save_classmetrics = save_classmetrics if self.save_classmetrics: self.metricnames = ['acc', 'tpr', 'tnr'] # output directory self.outdir = outdir if not os.path.isdir(self.outdir): os.mkdir(outdir) # ------------------------------------------ # Network # ------------------------------------------ # load the model self.net = model(self.data_set.input_shape) # print model summary sys.stdout.flush() if cuda is True: device = torch.device("cuda") # PyTorch v0.4.0 else: device = torch.device("cpu") summary(self.net.to(device), self.data_set.input_shape, device=device.type) sys.stdout.flush() # load parameters of pretrained model if provided if self.pretrained_model: # a prefix 'module.' is added to parameter names if # torch.nn.DataParallel was used # https://pytorch.org/docs/stable/nn.html#torch.nn.DataParallel if self.state['cuda']: for paramname in list(self.state['state_dict'].keys()): paramname_new = paramname.lstrip('module.') if paramname != paramname_new: self.state['state_dict'][paramname_new] = \ self.state['state_dict'][paramname] del self.state['state_dict'][paramname] self.load_model_params() # multi-gpu if self.ngpu > 1: ids = [i for i in range(self.ngpu)] self.net = nn.DataParallel(self.net, device_ids=ids).cuda() # cuda compatible elif self.cuda: self.net = self.net.cuda() # set the optimizer self.optimizer = optim.SGD(self.net.parameters(), lr=0.005, momentum=0.9, weight_decay=0.001) if self.pretrained_model: self.load_optimizer_params() # ------------------------------------------ # print # ------------------------------------------ logger.info('\n') logger.info('=' * 40) logger.info('=\t Convolution Neural Network') logger.info(f'=\t model : {model_type}') logger.info(f'=\t CNN : {model.__name__}') for feat_type, feat_names in self.data_set.select_feature.items(): logger.info(f'=\t features : {feat_type}') for name in feat_names: logger.info(f'=\t\t {name}') if self.data_set.pair_chain_feature is not None: logger.info(f'=\t Pair : ' f'{self.data_set.pair_chain_feature.__name__}') logger.info(f'=\t targets : {self.data_set.select_target}') logger.info(f'=\t CUDA : {str(self.cuda)}') if self.cuda: logger.info(f'=\t nGPU : {self.ngpu}') logger.info('=' * 40 + '\n') # check if CUDA works if self.cuda and not torch.cuda.is_available(): logger.error( f' --> CUDA not deteceted: Make sure that CUDA is installed ' f'and that you are running on GPUs.\n' f' --> To turn CUDA of set cuda=False in NeuralNet.\n' f' --> Aborting the experiment \n\n') sys.exit()
[docs] def train(self, nepoch=50, divide_trainset=None, hdf5='epoch_data.hdf5', train_batch_size=10, preshuffle=True, preshuffle_seed=None, export_intermediate=True, num_workers=1, save_model='best', save_epoch='intermediate', hit_cutoff=None): """Perform a simple training of the model. Args: nepoch (int, optional): number of iterations divide_trainset (list, optional): the percentage assign to the training, validation and test set. Examples: [0.7, 0.2, 0.1], [0.8, 0.2], None hdf5 (str, optional): file to store the training results train_batch_size (int, optional): size of the batch preshuffle (bool, optional): preshuffle the dataset before dividing it. preshuffle_seed (int, optional): set random seed for preshuffle export_intermediate (bool, optional): export data at intermediate epochs. num_workers (int, optional): number of workers to be used to prepare the batch data save_model (str, optional): 'best' or 'all', save only the best model or all models. save_epoch (str, optional): 'intermediate' or 'all', save the epochs data to HDF5. hit_cutoff (float, optional): the cutoff used to define hit by comparing with docking models' target value, e.g. IRMSD value """ self.hit_cutoff = hit_cutoff logger.info(f'\n: Batch Size: {train_batch_size}') if self.cuda: logger.info(f': NGPU : {self.ngpu}') # hdf5 support fname = os.path.join(self.outdir, hdf5) self.f5 = h5py.File(fname, 'w') # divide the set in train+ valid and test if divide_trainset is not None: # if divide_trainset is not None index_train, index_valid, index_test = self._divide_dataset( divide_trainset, preshuffle, preshuffle_seed) else: index_train = self.data_set.index_train index_valid = self.data_set.index_valid index_test = self.data_set.index_test logger.info(f': {len(index_train)} confs. for training') logger.info(f': {len(index_valid)} confs. for validation') logger.info(f': {len(index_test)} confs. for testing') # train the model t0 = time.time() self._train(index_train, index_valid, index_test, nepoch=nepoch, train_batch_size=train_batch_size, export_intermediate=export_intermediate, num_workers=num_workers, save_epoch=save_epoch, save_model=save_model) self.f5.close() logger.info( f' --> Training done in {self.convertSeconds2Days(time.time()-t0)}') # save the model self.save_model(filename='last_model.pth.tar')
[docs] @staticmethod def convertSeconds2Days(time): # input time in seconds time = int(time) day = time // (24 * 3600) time = time % (24 * 3600) hour = time // 3600 time %= 3600 minutes = time // 60 time %= 60 seconds = time return '%02d-%02d:%02d:%02d' % (day, hour, minutes, seconds)
[docs] def test(self, hdf5='test_data.hdf5', hit_cutoff=None, has_target=False): """Test a predefined model on a new dataset. Args: hdf5 (str, optional): hdf5 file to store the test results hit_cutoff (float, optional): the cutoff used to define hit by comparing with docking models' target value, e.g. IRMSD value has_target(bool, optional): specify the presence (True) or absence (False) of target values in the test set. No metrics can be computed if False. Examples: >>> # adress of the database >>> database = '1ak4.hdf5' >>> # Load the model in a new network instance >>> model = NeuralNet(database, cnn, ... pretrained_model='./model/model.pth.tar', ... outdir='./test/') >>> # test the model >>> model.test() """ # output fname = os.path.join(self.outdir, hdf5) self.f5 = h5py.File(fname, 'w') # load pretrained model to get task and criterion self.load_nn_params() # load data index = list(range(self.data_set.__len__())) sampler = data_utils.sampler.SubsetRandomSampler(index) loader = data_utils.DataLoader(self.data_set, sampler=sampler) # define the target value threshold to compute the hits if save_hitrate is True if self.save_hitrate and hit_cutoff is not None: self.hit_cutoff = hit_cutoff logger.info(f'Use hit cutoff {self.hit_cutoff}') # do test self.data = {} _, self.data['test'] = self._epoch(loader, train_model=False, has_target=has_target) # plot results if self.plot is True : self._plot(os.path.join(self.outdir, 'prediction.png')) if self.save_hitrate: self.plot_hit_rate(os.path.join(self.outdir + 'hitrate.png')) self._export_epoch_hdf5(0, self.data) self.f5.close()
[docs] def save_model(self, filename='model.pth.tar'): """save the model to disk. Args: filename (str, optional): name of the file """ filename = os.path.join(self.outdir, filename) state = {'state_dict': self.net.state_dict(), 'optimizer': self.optimizer.state_dict(), 'normalize_targets': self.data_set.normalize_targets, 'normalize_features': self.data_set.normalize_features, 'select_feature': self.data_set.select_feature, 'select_target': self.data_set.select_target, 'target_ordering': self.data_set.target_ordering, 'pair_chain_feature': self.data_set.pair_chain_feature, 'dict_filter': self.data_set.dict_filter, 'transform': self.data_set.transform, 'proj2D': self.data_set.proj2D, 'clip_features': self.data_set.clip_features, 'clip_factor': self.data_set.clip_factor, 'hit_cutoff': self.hit_cutoff, 'grid_info': self.data_set.grid_info, 'mapfly': self.data_set.mapfly, 'task': self.task, 'criterion': self.criterion, 'cuda': self.cuda } if self.data_set.normalize_features: state['feature_mean'] = self.data_set.feature_mean state['feature_std'] = self.data_set.feature_std if self.data_set.normalize_targets: state['target_min'] = self.data_set.target_min state['target_max'] = self.data_set.target_max torch.save(state, filename)
[docs] def load_model_params(self): """Get model parameters from a saved model.""" self.net.load_state_dict(self.state['state_dict'])
[docs] def load_optimizer_params(self): """Get optimizer parameters from a saved model.""" self.optimizer.load_state_dict(self.state['optimizer'])
[docs] def load_nn_params(self): """Get NeuralNet parameters from a saved model.""" self.task = self.state['task'] self.criterion = self.state['criterion'] try: self.hit_cutoff = self.state['hit_cutoff'] except Exception: logger.warning(f'No "hit_cutoff" found in {self.pretrained_model}. Please set it in function "test()" when doing benchmark"')
[docs] def load_data_params(self): """Get dataset parameters from a saved model.""" self.data_set.select_feature = self.state['select_feature'] self.data_set.select_target = self.state['select_target'] self.data_set.pair_chain_feature = self.state['pair_chain_feature'] self.data_set.dict_filter = self.state['dict_filter'] self.data_set.normalize_targets = self.state['normalize_targets'] if self.data_set.normalize_targets: self.data_set.target_min = self.state['target_min'] self.data_set.target_max = self.state['target_max'] self.data_set.normalize_features = self.state['normalize_features'] if self.data_set.normalize_features: self.data_set.feature_mean = self.state['feature_mean'] self.data_set.feature_std = self.state['feature_std'] self.data_set.transform = self.state['transform'] self.data_set.proj2D = self.state['proj2D'] self.data_set.target_ordering = self.state['target_ordering'] self.data_set.clip_features = self.state['clip_features'] self.data_set.clip_factor = self.state['clip_factor'] self.data_set.mapfly = self.state['mapfly'] self.data_set.grid_info = self.state['grid_info']
[docs] def _divide_dataset(self, divide_set, preshuffle, preshuffle_seed): """Divide the data set into training, validation and test according to the percentage in divide_set. Args: divide_set (list(float)): percentage used for training/validation/test. Example: [0.8, 0.1, 0.1], [0.8, 0.2] preshuffle (bool): shuffle the dataset before dividing it preshuffle_seed (int, optional): set random seed Returns: list(int),list(int),list(int): Indices of the training/validation/test set. """ # if user only provided one number # we assume it's the training percentage if not isinstance(divide_set, list): divide_set = [divide_set, 1. - divide_set] # if user provided 3 number and testset if len(divide_set) == 3 and self.data_set.test_database is not None: divide_set = [divide_set[0], 1. - divide_set[0]] logger.info(f' : test data set AND test in training set detected\n' f' : Divide training set as ' f'{divide_set[0]} train {divide_set[1]} valid.\n' f' : Keep test set for testing') # preshuffle if preshuffle: if preshuffle_seed is not None and not isinstance( preshuffle_seed, int): preshuffle_seed = int(preshuffle_seed) np.random.seed(preshuffle_seed) np.random.shuffle(self.data_set.index_train) # size of the subset for training ntrain = int(np.ceil(float(self.data_set.ntrain) * divide_set[0])) nvalid = int(np.floor(float(self.data_set.ntrain) * divide_set[1])) # indexes train and valid index_train = self.data_set.index_train[:ntrain] index_valid = self.data_set.index_train[ntrain:ntrain + nvalid] # index of test depending of the situation if len(divide_set) == 3: index_test = self.data_set.index_train[ntrain + nvalid:] else: index_test = self.data_set.index_test return index_train, index_valid, index_test
[docs] def _train(self, index_train, index_valid, index_test, nepoch=50, train_batch_size=5, export_intermediate=False, num_workers=1, save_epoch='intermediate', save_model='best'): """Train the model. Args: index_train (list(int)): Indices of the training set index_valid (list(int)): Indices of the validation set index_test (list(int)): Indices of the testing set nepoch (int, optional): numbr of epoch train_batch_size (int, optional): size of the batch export_intermediate (bool, optional):export itnermediate data num_workers (int, optional): number of workers pytorch uses to create the batch size save_epoch (str,optional): 'intermediate' or 'all' save_model (str, optional): 'all' or 'best' Returns: torch.tensor: Parameters of the network after training """ # printing options nprint = np.max([1, int(nepoch / 10)]) # pin memory for cuda pin = False if self.cuda: pin = True # create the sampler train_sampler = data_utils.sampler.SubsetRandomSampler(index_train) valid_sampler = data_utils.sampler.SubsetRandomSampler(index_valid) test_sampler = data_utils.sampler.SubsetRandomSampler(index_test) # get if we test as well _valid_ = len(valid_sampler.indices) > 0 _test_ = len(test_sampler.indices) > 0 # containers for the losses self.losses = {'train': []} if _valid_: self.losses['valid'] = [] if _test_: self.losses['test'] = [] # containers for the class metrics if self.save_classmetrics: self.classmetrics = {} for i in self.metricnames: self.classmetrics[i] = {'train': []} if _valid_: self.classmetrics[i]['valid'] = [] if _test_: self.classmetrics[i]['test'] = [] # create the loaders train_loader = data_utils.DataLoader( self.data_set, batch_size=train_batch_size, sampler=train_sampler, pin_memory=pin, num_workers=num_workers, shuffle=False, drop_last=True) if _valid_: valid_loader = data_utils.DataLoader( self.data_set, batch_size=train_batch_size, sampler=valid_sampler, pin_memory=pin, num_workers=num_workers, shuffle=False, drop_last=True) if _test_: test_loader = data_utils.DataLoader( self.data_set, batch_size=train_batch_size, sampler=test_sampler, pin_memory=pin, num_workers=num_workers, shuffle=False, drop_last=True) # min error to kee ptrack of the best model. min_error = {'train': float('Inf'), 'valid': float('Inf'), 'test': float('Inf')} # training loop av_time = 0.0 self.data = {} for epoch in range(nepoch): logger.info(f'\n: epoch {epoch:03d} / {nepoch:03d} {"-"*45}') t0 = time.time() # train the model logger.info(f"\n\t=> train the model\n") train_loss, self.data['train'] = self._epoch( train_loader, train_model=True) self.losses['train'].append(train_loss) if self.save_classmetrics: for i in self.metricnames: self.classmetrics[i]['train'].append(self.data['train'][i]) # validate the model if _valid_: logger.info(f"\n\t=> validate the model\n") valid_loss, self.data['valid'] = self._epoch( valid_loader, train_model=False) self.losses['valid'].append(valid_loss) if self.save_classmetrics: for i in self.metricnames: self.classmetrics[i]['valid'].append( self.data['valid'][i]) # test the model if _test_: logger.info(f"\n\t=> test the model\n") test_loss, self.data['test'] = self._epoch( test_loader, train_model=False) self.losses['test'].append(test_loss) if self.save_classmetrics: for i in self.metricnames: self.classmetrics[i]['test'].append( self.data['test'][i]) # talk a bit about losse logger.info(f'\n train loss : {train_loss:1.3e}') if _valid_: logger.info(f' valid loss : {valid_loss:1.3e}') if _test_: logger.info(f' test loss : {test_loss:1.3e}') # timer elapsed = time.time() - t0 logger.info( f' epoch done in : {self.convertSeconds2Days(elapsed)}') # remaining time av_time += elapsed nremain = nepoch - (epoch + 1) remaining_time = av_time / (epoch + 1) * nremain logger.info(f" remaining time : " f"{time.strftime('%H:%M:%S', time.gmtime(remaining_time))}") # save the best model for mode in ['train', 'valid', 'test']: if mode not in self.losses: continue if self.losses[mode][-1] < min_error[mode]: self.save_model( filename="best_{}_model.pth.tar".format(mode)) min_error[mode] = self.losses[mode][-1] # save all the model if required if save_model == 'all': self.save_model(filename="model_epoch_%04d.pth.tar" % epoch) # plot and save epoch if (export_intermediate and epoch % nprint == nprint - 1) or \ epoch == 0 or epoch == nepoch - 1: if self.plot: figname = os.path.join(self.outdir, f"prediction_{epoch:04d}.png") self._plot(figname) if self.save_hitrate: figname = os.path.join(self.outdir, f"hitrate_{epoch:04d}.png") self.plot_hit_rate(figname) self._export_epoch_hdf5(epoch, self.data) elif save_epoch == 'all': self._export_epoch_hdf5(epoch, self.data) sys.stdout.flush() # plot the losses self._export_losses(os.path.join(self.outdir, 'losses.png')) # plot classification metrics if self.save_classmetrics: for i in self.metricnames: self._export_metrics(i) return torch.cat([param.data.view(-1) for param in self.net.parameters()], 0)
[docs] def _epoch(self, data_loader, train_model, has_target=True): """Perform one single epoch iteration over a data loader. Args: data_loader (torch.DataLoader): DataLoader for the epoch train_model (bool): train the model if True or not if False Returns: float: loss of the model dict: data of the epoch """ # variables of the epoch running_loss = 0 data = {'outputs': [], 'targets': [], 'mol': []} if self.save_hitrate: data['hit'] = None if self.save_classmetrics: for i in self.metricnames: data[i] = None n = 0 debug_time = False time_learn = 0 # set train/eval mode self.net.train(mode=train_model) torch.set_grad_enabled(train_model) mini_batch = 0 for d in data_loader: mini_batch = mini_batch + 1 logger.info(f"\t\t-> mini-batch: {mini_batch} ") # get the data inputs = d['feature'] targets = d['target'] mol = d['mol'] # transform the data inputs, targets = self._get_variables(inputs, targets) # starting time tlearn0 = time.time() # forward outputs = self.net(inputs) # class complains about the shape ... if self.task == 'class': targets = targets.view(-1) if has_target: # evaluate loss loss = self.criterion(outputs, targets) running_loss += loss.data.item() # pytorch1 compatible n += len(inputs) # zero + backward + step if train_model: self.optimizer.zero_grad() loss.backward() self.optimizer.step() time_learn += time.time() - tlearn0 # get the outputs for export if self.cuda: data['outputs'] += outputs.data.cpu().numpy().tolist() data['targets'] += targets.data.cpu().numpy().tolist() else: data['outputs'] += outputs.data.numpy().tolist() data['targets'] += targets.data.numpy().tolist() fname, molname = mol[0], mol[1] data['mol'] += [(f, m) for f, m in zip(fname, molname)] # transform the output back if self.data_set.normalize_targets: data['outputs'] = self.data_set.backtransform_target( np.array(data['outputs'])) # .flatten()) data['targets'] = self.data_set.backtransform_target( np.array(data['targets'])) # .flatten()) else: data['outputs'] = np.array(data['outputs']) # .flatten() data['targets'] = np.array(data['targets']) # .flatten() # make np for export data['mol'] = np.array(data['mol'], dtype=object) # get the relevance of the ranking if self.save_hitrate and has_target: logger.info(f'Use hit cutoff {self.hit_cutoff}') data['hit'] = self._get_relevance(data, self.hit_cutoff) # get classification metrics if self.save_classmetrics and has_target: for i in self.metricnames: data[i] = self._get_classmetrics(data, i) # normalize the loss if n != 0: running_loss /= n else: warnings.warn(f'Empty input in data_loader {data_loader}.') return running_loss, data
[docs] def _get_variables(self, inputs, targets): # xue: why not put this step to DataSet.py? """Convert the feature/target in torch.Variables. The format is different for regression where the targets are float and classification where they are int. Args: inputs (np.array): raw features targets (np.array): raw target values Returns: torch.Variable: features torch.Variable: target values """ # if cuda is available if self.cuda: inputs = inputs.cuda(non_blocking=True) targets = targets.cuda(non_blocking=True) # get the variable as float by default inputs, targets = Variable(inputs).float(), Variable(targets).float() # change the targets to long for classification if self.task == 'class': targets = targets.long() return inputs, targets
[docs] def _export_losses(self, figname): """Plot the losses vs the epoch. Args: figname (str): name of the file where to export the figure """ logger.info('\n --> Loss Plot') color_plot = ['red', 'blue', 'green'] labels = ['Train', 'Valid', 'Test'] fig, ax = plt.subplots() for ik, name in enumerate(self.losses): plt.plot(np.array(self.losses[name]), c = color_plot[ik], label = labels[ik]) legend = ax.legend(loc='upper left') ax.set_xlabel('Epoch') ax.set_ylabel('Losses') fig.savefig(figname) plt.close() grp = self.f5.create_group('/losses/') grp.attrs['type'] = 'losses' for k, v in self.losses.items(): grp.create_dataset(k, data=v)
[docs] def _export_metrics(self, metricname): logger.info(f'\n --> {metricname.upper()} Plot') color_plot = ['red', 'blue', 'green'] labels = ['Train', 'Valid', 'Test'] data = self.classmetrics[metricname] fig, ax = plt.subplots() for ik, name in enumerate(data): plt.plot(np.array(data[name]), c=color_plot[ik], label=labels[ik]) legend = ax.legend(loc='upper left') ax.set_xlabel('Epoch') ax.set_ylabel(metricname.upper()) figname = os.path.join(self.outdir, metricname + '.png') fig.savefig(figname) plt.close() grp = self.f5.create_group(metricname) grp.attrs['type'] = metricname for k, v in data.items(): grp.create_dataset(k, data=v)
[docs] def _plot_scatter_reg(self, figname): """Plot a scatter plots of predictions VS targets. Useful to visualize the performance of the training algorithm Args: figname (str): filename """ # abort if we don't want to plot if self.plot is False: return logger.info(f'\n --> Scatter Plot: {figname}') color_plot = {'train': 'red', 'valid': 'blue', 'test': 'green'} labels = ['train', 'valid', 'test'] fig, ax = plt.subplots() xvalues = np.array([]) yvalues = np.array([]) for l in labels: if l in self.data: try: targ = self.data[l]['targets'].flatten() except Exception: logger.exception( f'No target values are provided for the {l} set, ', f'skip {l} in the scatter plot') continue out = self.data[l]['outputs'].flatten() xvalues = np.append(xvalues, targ) yvalues = np.append(yvalues, out) ax.scatter(targ, out, c=color_plot[l], label=l) legend = ax.legend(loc='upper left') ax.set_xlabel('Targets') ax.set_ylabel('Predictions') values = np.append(xvalues, yvalues) border = 0.1 * (values.max() - values.min()) ax.plot([values.min() - border, values.max() + border], [values.min() - border, values.max() + border]) fig.savefig(figname) plt.close()
[docs] def _plot_boxplot_class(self, figname): """Plot a boxplot of predictions VS targets. It is only usefull in classification tasks. Args: figname (str): filename """ # abort if we don't want to plot if not self.plot: return logger.info(f'\n --> Box Plot: {figname}') color_plot = {'train': 'red', 'valid': 'blue', 'test': 'green'} labels = ['train', 'valid', 'test'] nwin = len(self.data) fig, ax = plt.subplots(1, nwin, sharey=True, squeeze=False) iwin = 0 for l in labels: if l in self.data: try: tar = self.data[l]['targets'] except Exception: logger.exception( f'No target values are provided for the {l} set, ', f'skip {l} in the boxplot') continue out = self.data[l]['outputs'] data = [[], []] confusion = [[0, 0], [0, 0]] for pts, t in zip(out, tar): r = F.softmax(torch.FloatTensor(pts), dim=0).data.numpy() data[t].append(r[1]) confusion[t][bool(r[1] > 0.5)] += 1 #print(" {:5s}: {:s}".format(l,str(confusion))) ax[0, iwin].boxplot(data) ax[0, iwin].set_xlabel(l) ax[0, iwin].set_xticklabels(['0', '1']) iwin += 1 fig.savefig(figname, bbox_inches='tight') plt.close()
[docs] def plot_hit_rate(self, figname): """Plot the hit rate of the different training/valid/test sets. The hit rate is defined as: The percentage of positive(near-native) decoys that are included among the top m decoys. Args: figname (str): filename for the plot """ if self.plot is False: return logger.info(f'\n --> Hitrate plot: {figname}\n') color_plot = {'train': 'red', 'valid': 'blue', 'test': 'green'} labels = ['train', 'valid', 'test'] fig, ax = plt.subplots() for l in labels: if l in self.data: try: hits = self.data[l]['hit'] except Exception: logger.exception(f'No hitrate computed for the {l} set.') continue if 'hit' in self.data[l]: hitrate = rankingMetrics.hitrate(hits) m = len(hitrate) x = np.linspace(0, 100, m) plt.plot(x, hitrate, c=color_plot[l], label=f"{l} M={m}") legend = ax.legend(loc='upper left') ax.set_xlabel('Top M (%)') ax.set_ylabel('Hit Rate') fmt = '%.0f%%' xticks = mtick.FormatStrFormatter(fmt) ax.xaxis.set_major_formatter(xticks) fig.savefig(figname) plt.close()
[docs] def _compute_hitrate(self, hit_cutoff=None): # define the target value threshold to compute the hits if save_hitrate is True if hit_cutoff is None: hit_cutoff = self.hit_cutoff logger.info(f'Use hit cutoff {self.hit_cutoff}') labels = ['train', 'valid', 'test'] self.hitrate = {} # get the target ordering inverse = self.data_set.target_ordering == 'lower' if self.task == 'class': inverse = False for l in labels: if l in self.data: # get the target values out = self.data[l]['outputs'] # get the target vaues targets = [] try: for fname, mol in self.data[l]['mol']: f5 = h5py.File(fname, 'r') targets.append(f5[mol + f'/targets/{self.data_set.select_target}'][()]) f5.close() except Exception: logger.exception( f'No target value {self.data_set.select_target} ', f'provided for the {l} set. Skip Hitrate computation.') continue # sort the data if self.task == 'class': out = F.softmax(torch.FloatTensor(out), dim=1).data.numpy()[:, 1] ind_sort = np.argsort(out) if not inverse: ind_sort = ind_sort[::-1] # get the targets of the recommendation targets = np.array(targets)[ind_sort] # make a binary list out of that binary_recomendation = (targets <= hit_cutoff).astype('int') # number of recommended hit npos = np.sum(binary_recomendation) if npos == 0: npos = len(targets) warnings.warn( f'Non positive decoys found in {l} for hitrate plot')
[docs] def _get_relevance(self, data, hit_cutoff=None): # define the target value threshold to compute the hits if save_hitrate is True if hit_cutoff is None: hit_cutoff = self.hit_cutoff logger.info(f'Use hit cutoff {self.hit_cutoff}') if hit_cutoff is not None: # get the target ordering inverse = self.data_set.target_ordering == 'lower' if self.task == 'class': inverse = False # get the target values out = data['outputs'] # get the targets targets = [] for fname, mol in data['mol']: f5 = h5py.File(fname, 'r') targets.append(f5[mol + f'/targets/{self.data_set.select_target}'][()]) f5.close() # sort the data if self.task == 'class': out = F.softmax(torch.FloatTensor(out), dim=1).data.numpy()[:, 1] ind_sort = np.argsort(out) if not inverse: ind_sort = ind_sort[::-1] # get the targets of the recommendation targets = np.array(targets)[ind_sort] # make a binary list out of that return (targets <= hit_cutoff).astype('int') else: return (targets == None).astype('int')
[docs] def _get_classmetrics(self, data, metricname): # get predctions pred = self._get_binclass_prediction(data) # get real targets targets = data['targets'] # get metric values if metricname == 'acc': return classMetrics.accuracy(pred, targets) elif metricname == 'tpr': return classMetrics.sensitivity(pred, targets) elif metricname == 'tnr': return classMetrics.specificity(pred, targets) elif metricname == 'ppv': return classMetrics.precision(pred, targets) elif metricname == 'f1': return classMetrics.F1(pred, targets) else: return None
[docs] @staticmethod def _get_binclass_prediction(data): out = data['outputs'] probility = F.softmax(torch.FloatTensor(out), dim=1).data.numpy() pred = probility[:, 0] <= probility[:, 1] return pred.astype(int)
[docs] def _export_epoch_hdf5(self, epoch, data): """Export the epoch data to the hdf5 file. Export the data of a given epoch in train/valid/test group. In each group are stored the predcited values (outputs), ground truth (targets) and molecule name (mol). Args: epoch (int): index of the epoch data (dict): data of the epoch """ # create a group grp_name = 'epoch_%04d' % epoch grp = self.f5.create_group(grp_name) # create attribute for DeepXplroer grp.attrs['type'] = 'epoch' grp.attrs['task'] = self.task # loop over the pass_type: train/valid/test for pass_type, pass_data in data.items(): # we don't want to breack the process in case of issue try: # create subgroup for the pass sg = grp.create_group(pass_type) # loop over the data: target/output/molname for data_name, data_value in pass_data.items(): # mol name is a bit different # since there are strings if data_name == 'mol': string_dt = h5py.special_dtype(vlen=str) sg.create_dataset( data_name, data=data_value, dtype=string_dt) # output/target values else: sg.create_dataset(data_name, data=data_value) except TypeError: logger.exception("Error in export epoch to hdf5")