pythonpytorchray

Pytorch and ray tune: why the error; raise TuneError("Trials did not complete", incomplete_trials)?


I want to embed hyperparameter optimisation with ray into my pytorch script.

I wrote this code (which is a reproducible example):

## Standard libraries
CHECKPOINT_PATH = "/home/ad1/new_dev_v1"
DATASET_PATH = "/home/ad1/"
import torch
device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
from importlib import reload
from itertools import *
import matplotlib
from itertools import groupby
from libs_public.api import get_pantry_token
from matplotlib import pyplot as plt
from matplotlib.colors import to_rgb
from openbabel import pybel
from openbabel.pybel import readstring,descs
from operator import itemgetter
from pathlib import Path
from pytorch_lightning.callbacks import LearningRateMonitor, ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from ray import tune
from ray.tune import CLIReporter
from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback
from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining
from sklearn import preprocessing
from sklearn.metrics import f1_score, precision_score, recall_score,roc_auc_score
from socket import TIPC_DEST_DROPPABLE
from torch.nn import Linear
from torch.utils.data import TensorDataset
from torch_geometric.data import Data, Dataset,DataLoader,DenseDataLoader,InMemoryDataset
from torch_geometric.datasets import TUDataset
from torch_geometric.nn import GCNConv
from torch_geometric.nn import global_mean_pool
from torchmetrics.functional import precision_recall
from torchvision import transforms
from torchvision.datasets import CIFAR10
from tqdm.notebook import tqdm
import getpass, argparse
import joblib
import json
import logging
import math
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np 
import openbabel
import os
import pandas as pd
import pytorch_lightning as pl
import random
import re
import requests
import seaborn as sns
import sklearn
import sys
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data
import torch_geometric
import torch_geometric.data as geom_data
import torch_geometric.nn as geom_nn
import torchmetrics
import torchvision
import warnings
matplotlib.rcParams['lines.linewidth'] = 2.0
pl.seed_everything(42)
print(device)
sns.reset_orig()
sns.set()
sys.path.append('/home/ad1/git/')
torch.backends.cudnn.deterministic = True
warnings.filterwarnings('ignore')



# Setting the seed
pl.seed_everything(42)

# Ensure that all operations are deterministic on GPU (if used) for reproducibility
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
print(device)


import torch
from torch_geometric.datasets import TUDataset
from torch.nn import Linear
from torch_geometric.nn import global_mean_pool
from torch_geometric.data import Data, Dataset,DataLoader


from torch.utils.data import TensorDataset
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler, PopulationBasedTraining
from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback

dataset = TUDataset(root='/tmp/MUTAG', name='MUTAG', use_node_attr=True)
loader = DataLoader(dataset, batch_size=32, shuffle=True)

train_dataset = dataset
val_dataset = dataset
test_dataset = dataset

graph_train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
graph_val_loader = DataLoader(val_dataset, batch_size=64) # Additional loader if you want to change to a larger dataset
graph_test_loader = DataLoader(test_dataset, batch_size=64)


#will change this when it makes sense
#config = {
#    "dropout": tune.uniform(0.4,0.5)
#    } 

config = {'dropout':0.4}

gnn_layer_by_name = {
    "GCN": geom_nn.GCNConv,
    "GAT": geom_nn.GATConv,
    "GraphConv": geom_nn.GraphConv
}

class GCNLayer(nn.Module):
    def __init__(self, c_in, c_out):
        super().__init__()
        self.projection = nn.Linear(c_in, c_out)
        

    def forward(self, node_feats, adj_matrix):
        """
        Inputs:
            node_feats - Tensor with node features of shape [batch_size, num_nodes, c_in]
            adj_matrix - Batch of adjacency matrices of the graph. If there is an edge from i to j, adj_matrix[b,i,j]=1 else 0.
                         Supports directed edges by non-symmetric matrices. Assumes to already have added the identity connections. 
                         Shape: [batch_size, num_nodes, num_nodes]
        """
        # Num neighbours = number of incoming edges
        num_neighbours = adj_matrix.sum(dim=-1, keepdims=True)
        node_feats = self.projection(node_feats)
        node_feats = torch.bmm(adj_matrix, node_feats)
        node_feats = node_feats / num_neighbours
        return node_feats

class GNNModel(nn.Module):
    
    def __init__(self, c_in, c_hidden, c_out, num_layers=2, layer_name="GCN", dp_rate=config['dropout'], **kwargs):
        """
        Inputs:
            c_in - Dimension of input features
            c_hidden - Dimension of hidden features
            c_out - Dimension of the output features. Usually number of classes in classification
            num_layers - Number of "hidden" graph layers
            layer_name - String of the graph layer to use
            dp_rate - Dropout rate to apply throughout the network
            kwargs - Additional arguments for the graph layer (e.g. number of heads for GAT)
        """
        super().__init__()
        gnn_layer = gnn_layer_by_name[layer_name]
        
        layers = []
        in_channels, out_channels = c_in, c_hidden
        for l_idx in range(num_layers-1):
            layers += [
                gnn_layer(in_channels=in_channels, 
                          out_channels=out_channels,
                          **kwargs),
                nn.ReLU(inplace=True),
                nn.Dropout(config['dropout'])
            ]
            in_channels = c_hidden
        layers += [gnn_layer(in_channels=in_channels, 
                             out_channels=c_out,
                             **kwargs)]
        self.layers = nn.ModuleList(layers)
    
    def forward(self, x, edge_index):
        """
        Inputs:
            x - Input features per node
            edge_index - List of vertex index pairs representing the edges in the graph (PyTorch geometric notation)
        """
        for l in self.layers:
            # For graph layers, we need to add the "edge_index" tensor as additional input
            # All PyTorch Geometric graph layer inherit the class "MessagePassing", hence
            # we can simply check the class type.
            if isinstance(l, geom_nn.MessagePassing):
                x = l(x, edge_index)
            else:
                x = l(x)
        return x



class GraphGNNModel(nn.Module):
    
    def __init__(self, c_in, c_hidden, c_out, dp_rate_linear=0.5, **kwargs):
        """
        Inputs:
            c_in - Dimension of input features
            c_hidden - Dimension of hidden features
            c_out - Dimension of output features (usually number of classes)
            dp_rate_linear - Dropout rate before the linear layer (usually much higher than inside the GNN)
            kwargs - Additional arguments for the GNNModel object
        """
        super().__init__()
        self.GNN = GNNModel(c_in=c_in, 
                            c_hidden=c_hidden, 
                            c_out=c_hidden, # Not our prediction output yet!
                            **kwargs)
        self.head = nn.Sequential(
            nn.Dropout(config['dropout']),
            nn.Linear(c_hidden, c_out)
        )

    def forward(self, x, edge_index, batch_idx):
        """
        Inputs:
            x - Input features per node
            edge_index - List of vertex index pairs representing the edges in the graph (PyTorch geometric notation)
            batch_idx - Index of batch element for each node
        """
        x = self.GNN(x, edge_index)
        x = geom_nn.global_mean_pool(x, batch_idx) # Average pooling
        x = self.head(x)
        return x


#see https://pytorch-lightning.readthedocs.io/en/stable/common/lightning_module.html
class GraphLevelGNN(pl.LightningModule):
    
    def __init__(self, **model_kwargs):
        super().__init__()
        # Saving hyperparameters
        self.save_hyperparameters()
        
        self.model = GraphGNNModel(**model_kwargs)
        self.loss_module = nn.BCEWithLogitsLoss() #if self.hparams.c_out == 1 else nn.CrossEntropyLoss()

    def forward(self, data, mode="train"):
        x, edge_index, batch_idx = data.x, data.edge_index, data.batch
        x = self.model(x, edge_index, batch_idx)
        x = x.squeeze(dim=-1)
        
        if self.hparams.c_out == 1:
            preds = (x > 0).float()
            data.y = data.y.float()
        else:
            preds = x.argmax(dim=-1)

        loss = self.loss_module(x, data.y)
        acc = (preds == data.y).sum().float() / preds.shape[0]
        f1 = f1_score(preds,data.y)  ##change f1/precision and recall was just testing
        precision = precision_score(preds,data.y)
        recall = recall_score(preds,data.y)
        #roc_auc = roc_auc_score(preds,data.y)  ##ADD THIS BACK IN
        return loss, acc, f1,precision, recall

    def configure_optimizers(self):
        optimizer = optim.SGD(self.parameters(),lr=0.1) # High lr because of small dataset and small model
        return optimizer

    def training_step(self, batch, batch_idx):
        loss, acc, _,_, _ = self.forward(batch, mode="train")
        self.log('train_loss', loss,on_epoch=True,logger=True)
        self.log('train_acc', acc,on_epoch=True,logger=True)
        #self.log('train_precision',precision_and_recall)
        return loss

    def validation_step(self, batch, batch_idx):
        loss, acc, _,_, _ = self.forward(batch, mode="val")
        self.log('val_acc', acc,on_epoch=True,logger=True)
        self.log('val_loss', loss,on_epoch=True,logger=True)

    def test_step(self, batch, batch_idx):
        loss, acc, f1,precision, recall = self.forward(batch, mode="test")
        self.log('test_acc', acc,on_epoch=True,logger=True)
        self.log('test_f1', f1,on_epoch=True,logger=True)
        self.log('test_precision', precision,on_epoch=True,logger=True)       
        self.log('test_recall', recall,on_epoch=True,logger=True) 
        #self.log('roc_auc', roc_auc,on_epoch=True,logger=True) 


from pytorch_lightning import loggers as pl_loggers
def train_graph_classifier(model_name, **model_kwargs):
    pl.seed_everything(42)
    
    # Create a PyTorch Lightning trainer with the generation callback
    root_dir = os.path.join(CHECKPOINT_PATH, "GraphLevel" + model_name)
    os.makedirs(root_dir, exist_ok=True)
    csv_logger = pl_loggers.CSVLogger(save_dir="logs/")

    tune_report_callback = TuneReportCheckpointCallback(
    metrics={
    "val_loss": "val_loss",
    "val_acc": "val_acc",
    },
    filename="ray_ckpt",
    on="validation_end",
    )

    trainer = pl.Trainer(default_root_dir=root_dir,
                         callbacks=[ModelCheckpoint(save_weights_only=True, mode="max", monitor="val_acc"),tune_report_callback],
                                 #   TuneReportCallback(
                                #    {
                                #        "loss": "val_loss",
                                #        "mean_accuracy": "val_accuracy" 
                                #    },
                                #        on="test_end")] # need to change this to validation but error at the minute
                                 #   ,
                         gpus=1 if str(device).startswith("cuda") else 0,
                         max_epochs=3,
                         progress_bar_refresh_rate=1,
                         logger=csv_logger,                         
                         )

    trainer.logger._default_hp_metric = None # Optional logging argument that we don't need

    # Check whether pretrained model exists. If yes, load it and skip training
    pretrained_filename = os.path.join(CHECKPOINT_PATH, f"GraphLevel{model_name}.ckpt")

    if os.path.isfile(pretrained_filename):
        print("Found pretrained model, loading...")
        model = GraphLevelGNN.load_from_checkpoint(pretrained_filename)
    else:
        pl.seed_everything(42)
        model = GraphLevelGNN(c_in = dataset.num_node_features, 
                              c_out=1, #if tu_dataset.num_classes==2 else tu_dataset.num_classes, 
                              **model_kwargs)
        trainer.fit(model, graph_train_loader, graph_val_loader)
        model = GraphLevelGNN.load_from_checkpoint(trainer.checkpoint_callback.best_model_path)
        
    # Test best model on validation and test set
    #train_result = trainer.test(model, graph_train_loader, verbose=False)
    #test_result = trainer.test(model, graph_test_loader, verbose=False)
    #result = {"test": test_result[0]['test_acc'], "train": train_result[0]['test_acc']} 
    #return model, result
    return model

# Example of ASHA Scheduler
scheduler_asha = ASHAScheduler(
    max_t=100,
    grace_period=1,
    reduction_factor=2,
)

from ray.tune.integration.pytorch_lightning import TuneReportCallback, TuneReportCheckpointCallback

reporter = CLIReporter(
    parameter_columns=['dropout'],
    metric_columns=["val_loss", "val_acc", "training_iteration"]
)


model = train_graph_classifier(model_name="GraphConv", 
                                       c_hidden=128, 
                                       layer_name="GraphConv", 
                                       num_layers=3, 
                                       dp_rate_linear=0.5,
                                       dp_rate=0.0)


result = tune.run(
    tune.with_parameters(
        model,
        #feature_size=10,
        #target_size=2,
        epochs=50,
        gpus=0
        ),

    resources_per_trial={
        "cpu": 1,
        "gpu": 0,
    },
    

    local_dir='/home/ad1/ray_ckpt2',  # path for saving checkpoints
    metric="val_loss",
    mode="min",
    config=config,
    num_samples=16,
    scheduler=scheduler_asha,
    progress_reporter=reporter,
    name="test",
)

And the error returned is:

(tune_with_parameters pid=65319) 2022-08-17 16:28:47,053        ERROR function_runner.py:286 -- Runner Thread raised error.
(tune_with_parameters pid=65319) Traceback (most recent call last):
(tune_with_parameters pid=65319)   File "/root/miniconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 277, in run
(tune_with_parameters pid=65319)     self._entrypoint()
(tune_with_parameters pid=65319)   File "/root/miniconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 352, in entrypoint
(tune_with_parameters pid=65319)     self._status_reporter.get_checkpoint(),
(tune_with_parameters pid=65319)   File "/root/miniconda3/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 462, in _resume_span
(tune_with_parameters pid=65319)     return method(self, *_args, **_kwargs)
(tune_with_parameters pid=65319)   File "/root/miniconda3/lib/python3.7/site-packages/ray/tune/function_runner.py", line 645, in _trainable_func
(tune_with_parameters pid=65319)     output = fn()
(tune_with_parameters pid=65319)   File "/root/miniconda3/lib/python3.7/site-packages/ray/tune/utils/trainable.py", line 410, in inner
(tune_with_parameters pid=65319)     trainable(config, **fn_kwargs)
(tune_with_parameters pid=65319)   File "/root/miniconda3/lib/python3.7/site-packages/torch/nn/modules/module.py", line 1130, in _call_impl
(tune_with_parameters pid=65319)     return forward_call(*input, **kwargs)
(tune_with_parameters pid=65319) TypeError: forward() got an unexpected keyword argument 'checkpoint_dir'

Traceback (most recent call last):
  File "test_pytorch.py", line 390, in <module>
    name="test",
  File "/root/miniconda3/lib/python3.7/site-packages/ray/tune/tune.py", line 741, in run
    raise TuneError("Trials did not complete", incomplete_trials)
ray.tune.error.TuneError: ('Trials did not complete', [tune_with_parameters_a90c2_00000, tune_with_parameters_a90c2_00001, 
...cut for space
tune_with_parameters_a90c2_00014, tune_with_parameters_a90c2_00015])

Could someone show me where I'm going wrong, how to I run HPO with tune in this network and then train the model with the best hyperparameters and then return the model for prediction?


Solution

  • Ray Tune expects a function trainable in the form of

    def train_fn(config):
        # ...
    

    In your case, it is probably best to wrap the train_graph_classifier function, e.g.

    def train_fn(config):
        train_graph_classifier(
            model_name="GraphConv", 
            layer_name="GraphConv",
            **config)
    
    
    analysis = tune.run(
        train_fn,
        config={
            # provide your hyperparameter search space here
            "c_hidden": tune.choice([64, 128]),
            "dp_rate_linear": tune.quniform(0.0, 1.0, 0.1),
            # ...
        },
        metric="val_loss",
        mode="min",
        # ...
    
    
    print(analysis.best_checkpoint)
    

    If you provide the TuneReportCheckpointCallback to the trainer, the analysis.best_checkpoint should contain the best model that can be then loaded for prediction, e.g.

    with analysis.best_checkpoint.as_directory() as tmpdir:
        trainer = GraphLevelGNN.load_from_checkpoint(tmpdir)