Source code for adelecv.api.optimize.segmentations.hp_optimizer

import optuna
import pandas as pd
import segmentation_models_pytorch as smp
import torch
from optuna import Study, Trial, samplers
from optuna.trial import TrialState
from torch import optim

from adelecv.api.data.segmentations import SegmentationDataset
from adelecv.api.logs import get_logger
from adelecv.api.models.segmentations import SegmentationModel

from .hyper_params import HyperParamsSegmentation


def _log_study(study: Study) -> None:
    pruned_trials = study.get_trials(
        deepcopy=False, states=[TrialState.PRUNED]
    )
    complete_trials = study.get_trials(
        deepcopy=False, states=[TrialState.COMPLETE]
    )

    logger = get_logger()
    logger.info("Study statistics:")
    logger.info("Number of finished trials: %s", len(study.trials))
    logger.info("Number of pruned trials: %s", len(pruned_trials))
    logger.info("Number of complete trials: %s", len(complete_trials))

    logger.info("Best trial:")
    trial = study.best_trial

    logger.info("Value: %s", trial.value)

    logger.info("Params: ")
    for key, value in trial.params.items():
        logger.info("%s: %s", key, value)


[docs]class HPOptimizer: """ Class for hyperparams search and model training :param hyper_params: Dataclass with hyperparams of models :param num_trials: Number of iterations algorithm (the number of models with pruned models) :param device: GPU or CPU :param dataset: Created dataset """ def __init__( self, hyper_params: HyperParamsSegmentation, num_trials: int, device: str, dataset: SegmentationDataset, ): self._hyper_params = hyper_params self._strategy = self._hyper_params.strategy self._num_trials = num_trials self._dataset = dataset self._num_classes = dataset.num_classes self._device = torch.device( 'cuda' if torch.cuda.is_available() and device == 'GPU' else 'cpu' ) get_logger().debug( "Create hp optimizer with params: " "strategy: %s num_trials: %s num_classes: %s " "device: %s hps: %s", self._strategy, self._num_trials, self._num_classes, self._device, self._hyper_params.dict() ) self._stats_models = pd.DataFrame()
[docs] def optimize(self) -> None: """ Run optimize. """ get_logger().info("Train models started") direction = 'minimize' if self._hyper_params.optimize_score == 'loss' \ else 'maximize' study = optuna.create_study( direction=direction, sampler=getattr(samplers, self._strategy)() ) study.optimize(self._objective, n_trials=self._num_trials, timeout=600) _log_study(study) get_logger().info("Train models is over")
def _create_model(self, trial: Trial) -> tuple[SegmentationModel, int]: optimizer_name = trial.suggest_categorical( "optimizer", self._hyper_params.optimizers ) optimizer = getattr(optim, optimizer_name) architecture_name = trial.suggest_categorical( "architecture", self._hyper_params.architectures ) architecture = getattr(smp, architecture_name) encoder = trial.suggest_categorical( "encoders", self._hyper_params.encoders ) pretrained_weight = trial.suggest_categorical( "pretrained_weight", self._hyper_params.pretrained_weights ) lr = trial.suggest_float( "lr", self._hyper_params.lr_range[0], self._hyper_params.lr_range[1] ) loss_name = trial.suggest_categorical( "loss", self._hyper_params.loss_fns ) loss_fn = getattr(smp.losses, loss_name)('binary') num_epoch = trial.suggest_int( "num_epoch", self._hyper_params.epoch_range[0], self._hyper_params.epoch_range[1] ) model = SegmentationModel( model=architecture, encoder_name=encoder, pretrained_weight=pretrained_weight if pretrained_weight != "None" else None, optimizer=optimizer, lr=lr, loss_fn=loss_fn, num_classes=self._num_classes, num_epoch=num_epoch, device=self._device, img_size=self._dataset.img_size, ) return model, num_epoch def _objective(self, trial: Trial) -> float: model, num_epoch = self._create_model(trial) self._dataset.update_datasets(model.transforms) score = 0 for epoch in range(num_epoch): score = self._train_model(model) trial.report(score, epoch) # Handle pruning based on the intermediate value. if trial.should_prune(): raise optuna.exceptions.TrialPruned() self._postprocessing_model(model) return score def _train_model(self, model: SegmentationModel) -> float: torch.cuda.empty_cache() model.train_step(self._dataset.train) val_loss = model.val_step(self._dataset.val) return val_loss[self._hyper_params.optimize_score] def _postprocessing_model(self, model: SegmentationModel) -> None: model.log_test(self._dataset.test) self._dataset.add_predictions(model) self._add_stats_model(model) model.save_weights() def _add_stats_model(self, model: SegmentationModel) -> None: self._stats_models = pd.concat( [self._stats_models, pd.DataFrame([model.stats_model])], ignore_index=True ) @property def stats_models(self) -> pd.DataFrame: """ Get stats models after training. :return: Info about trained models """ return self._stats_models