Source code for vissl.hooks.log_hooks

# Copyright (c) Facebook, Inc. and its affiliates. All Rights Reserved

"""
All the hooks involved in human-readable logging
"""

import datetime
import logging
import time
from typing import Optional

import torch
from classy_vision import tasks
from classy_vision.generic.distributed_util import get_rank, is_primary
from classy_vision.generic.util import save_checkpoint
from classy_vision.hooks.classy_hook import ClassyHook
from fvcore.common.file_io import PathManager
from vissl.utils.checkpoint import is_checkpoint_phase
from vissl.utils.env import get_machine_local_and_dist_rank
from vissl.utils.io import create_file_symlink, save_file
from vissl.utils.logger import log_gpu_stats
from vissl.utils.perf_stats import PerfStats


[docs]class LogGpuStatsHook(ClassyHook): """ Hook executed at the start of training and after every training iteration is done. Logs Gpu nvidia-smi stats to logger streams: at the start of training and after 50 training iterations. """ on_forward = ClassyHook._noop on_backward = ClassyHook._noop on_phase_start = ClassyHook._noop on_phase_end = ClassyHook._noop on_end = ClassyHook._noop on_loss_and_meter = ClassyHook._noop on_update = ClassyHook._noop
[docs] def on_start(self, task: "tasks.ClassyTask") -> None: """ Logs Gpu nvidia-smi stats to logger streams. """ if is_primary() and (task.device.type == "cuda"): # print the nvidia-smi stats log_gpu_stats()
[docs] def on_step(self, task: "tasks.ClassyTask") -> None: """ Print the nvidia-smi stats again to get more accurate nvidia-smi useful for monitoring memory usage. """ if ( is_primary() and (task.device.type == "cuda") and task.local_iteration_num == 50 ): log_gpu_stats()
[docs]class LogLossLrEtaHook(ClassyHook): """ Hook executed after every parameters update step. Logs training stats like: LR, iteration, ETA, batch time etc to logger streams. """ on_start = ClassyHook._noop on_phase_start = ClassyHook._noop on_forward = ClassyHook._noop on_backward = ClassyHook._noop on_phase_end = ClassyHook._noop on_end = ClassyHook._noop on_step = ClassyHook._noop on_loss_and_meter = ClassyHook._noop
[docs] def __init__(self, btime_freq: Optional[int] = None) -> None: """ Args: btime_freq: if specified, logs average batch time of rolling_freq batches also. """ super().__init__() self.btime_freq: Optional[int] = btime_freq
[docs] def on_update(self, task: "tasks.ClassyTask") -> None: """ Executed after after parameter update. If the current phase is training, and it's a logging iteration, we compute and log several helpul training stats to keep track of ongoing training. For monitoring the batch size (average training iteration time), we allow monitoring the stats (optionally) for every N iterations to get better idea about the batch time and training eta. Set the btime_freq input using cfg.PERF_STAT_FREQUENCY=N ensuring that cfg.MONITOR_PERF_STATS = True. """ phase_type = "train" if task.train else "test" if is_primary() and phase_type == "train": train_phase_idx = task.train_phase_idx log_freq = task.config["LOG_FREQUENCY"] iteration = task.iteration if torch.cuda.is_available(): peak_mem_used = int(torch.cuda.max_memory_allocated() / 1024.0 / 1024.0) else: peak_mem_used = -1 if ( (iteration == 1) or (iteration % log_freq == 0) or (iteration <= 100 and iteration % 5 == 0) ): loss_val = round(task.last_batch.loss.data.cpu().item(), 5) if len(task.batch_time) > 0: batch_times = task.batch_time else: batch_times = [0] avg_time = sum(batch_times) / len(batch_times) eta_secs = avg_time * (task.max_iteration - iteration) eta_string = str(datetime.timedelta(seconds=int(eta_secs))) if isinstance(task.optimizer.options_view.lr, set): lr_val = list(task.optimizer.options_view.lr) else: lr_val = round(task.optimizer.options_view.lr, 5) batch_time = int(1000.0 * avg_time) rank = get_rank() log_str = ( f"Rank: {rank}; " f"[ep: {train_phase_idx}] " f"iter: {iteration}; " f"lr: {lr_val}; " f"loss: {loss_val}; " f"btime(ms): {batch_time}; " f"eta: {eta_string}; " f"peak_mem: {peak_mem_used}M" ) if self.btime_freq and len(batch_times) >= self.btime_freq: rolling_avg_time = ( sum(batch_times[-self.btime_freq :]) / self.btime_freq ) rolling_eta_secs = int( rolling_avg_time * (task.max_iteration - iteration) ) rolling_eta_str = str( datetime.timedelta(seconds=int(rolling_eta_secs)) ) rolling_btime = int(1000.0 * rolling_avg_time) log_str = ( f"{log_str}; " f"btime({self.btime_freq}iters): {rolling_btime} ms; " f"rolling_eta: {rolling_eta_str}" ) logging.info(log_str)
[docs]class LogLossMetricsCheckpointHook(ClassyHook): """ Hook called after every forward pass (to check training doesn't give NaN), after every step and at the end of epoch (to check if the model should be checkpointed) and print the meters values at the end of every phase. """ on_start = ClassyHook._noop on_phase_start = ClassyHook._noop on_forward = ClassyHook._noop on_loss_and_meter = ClassyHook._noop on_backward = ClassyHook._noop on_end = ClassyHook._noop on_update = ClassyHook._noop # TODO: make this a standalone hook and make it optional to save runtime # although the overhead is minimal when the model is training fine (no nans)
[docs] def on_forward(self, task: "tasks.ClassyTask") -> None: """ Called each time a model forward is done and make sure that the model forward output is not NaN. If we encounter NaN as the model output, we checkpoint the model to enable debugging and also checkpoint the model input sample, model output. """ # check the model output is not NaN. has_nan = False model_output = task.last_batch.model_output if isinstance(model_output, list): has_nan = not torch.tensor( [torch.isfinite(x).all() for x in model_output] ).all() else: has_nan = not torch.isfinite(model_output).all() if has_nan: _, dist_rank = get_machine_local_and_dist_rank() logging.info(f"Infinite Model output or NaN at iteration={task.iteration}.") self._checkpoint_model( task, task.train_phase_idx, mode_frequency=1, mode_num=task.iteration, mode="iteration", ) model_output_file = ( f"{task.checkpoint_folder}/rank{dist_rank}_model_output.pth" ) input_sample_file = ( f"{task.checkpoint_folder}/rank{dist_rank}_input_sample.pth" ) with PathManager.open(model_output_file, "wb") as fwrite: torch.save(model_output, fwrite) with PathManager.open(input_sample_file, "wb") as fwrite: torch.save(task.last_batch.sample, fwrite) logging.info(f"Saved model output: {model_output_file}") logging.info(f"Saved model input: {input_sample_file}")
[docs] def on_step(self, task: "tasks.ClassyTask") -> None: """ In some cases, we might want to checkpoint after certain number of iterations. If we want to checkpoint after every N iterations, check the checkpoint frequency matches and checkpoint if it does. """ checkpoint_frequency = task.config["CHECKPOINT"]["CHECKPOINT_ITER_FREQUENCY"] if checkpoint_frequency > 0: self._checkpoint_model( task, task.train_phase_idx, mode_frequency=checkpoint_frequency, mode_num=task.iteration, mode="iteration", )
[docs] def on_phase_end(self, task: "tasks.ClassyTask") -> None: """ Called at the end of each phase and forward. We log the metrics and also save the checkpoint. We pass the mode: phase or iteration """ if is_primary(): self._print_and_save_meters(task, task.train_phase_idx) checkpoint_frequency = task.config["CHECKPOINT"]["CHECKPOINT_FREQUENCY"] self._checkpoint_model( task, task.train_phase_idx, mode_frequency=checkpoint_frequency, mode_num=task.phase_idx, mode="phase", )
def _checkpoint_model( self, task, train_phase_idx, mode_frequency, mode_num, mode="phase" ): """ Checkpoint model. Can be called in 3 possible scenarios: 1. If training becomes NaN, then we checkpoint the model to facilitate debugging 2. After every N epochs (CHECKPOINT_FREQ), model state is checkpointed. 3. If user wants to checkpoint during the epoch (ie. after every few training iterations, the model state is checkpointed.) Args: task: Self-supervision task that hold information about training iteration, epoch number etc. train_phase_idx (int): current training phase number. Starts from 0 mode_frequency (int): mode can be "phase" or "iteration". Frequency of checkpointing for the given mode mode_num (int): for the checkpointing mode (phase or iteration), the number of phase or iteration at which checkpointing is being done """ phase_idx = task.phase_idx num_epochs = task.num_epochs # check if we need to checkpoint this phase is_checkpointing_phase = is_checkpoint_phase( mode_num, mode_frequency, train_phase_idx, num_epochs, mode ) is_final_train_phase = ( (train_phase_idx == (num_epochs - 1)) and task.train and mode == "phase" ) # save checkpoint: if ( is_primary() and task.train and (is_final_train_phase or is_checkpointing_phase) ): checkpoint_folder = task.checkpoint_folder logging.info( f"[{mode}: {mode_num}] Saving checkpoint to {checkpoint_folder}" ) classy_state_dict = task.get_classy_state() # phase_idx is already incremented at the beginning of phase but if we # are checkpointing at an iteration in the middle of phase, we should not # save the incremented phase_idx as it will incorrectly assume that model # trained for that phase already. if mode == "iteration": phase_idx = phase_idx - 1 classy_state_dict["phase_idx"] = classy_state_dict["phase_idx"] - 1 if task.train: train_phase_idx = train_phase_idx - 1 classy_state_dict["train_phase_idx"] = train_phase_idx checkpoint_task = { "phase_idx": phase_idx, "iteration": task.iteration, "loss": task.loss.state_dict(), "iteration_num": task.local_iteration_num, "train_phase_idx": train_phase_idx, "classy_state_dict": classy_state_dict, } ckpt_name = f"model_{mode}{mode_num}.torch" if is_final_train_phase: ckpt_name = f"model_final_checkpoint_{mode}{mode_num}.torch" backend = task.config["CHECKPOINT"]["BACKEND"] assert backend == "disk", "Only disk BACKEND supported" save_checkpoint( checkpoint_folder, checkpoint_task, checkpoint_file=ckpt_name ) logging.info(f"Saved checkpoint: {checkpoint_folder}/{ckpt_name}") # we create the checkpoint symlink and use this symlink to load # checkpoints. This helps ensure that the checkpoint we load from # are valid. It's a particularly useful feature for resuming trainings. logging.info("Creating symlink...") symlink_dest_file = f"{checkpoint_folder}/checkpoint.torch" source_file = f"{checkpoint_folder}/{ckpt_name}" create_file_symlink(source_file, symlink_dest_file) logging.info(f"Created symlink: {symlink_dest_file}") def _print_and_save_meters(self, task, train_phase_idx): """ Executed only on master gpu at the end of each epoch. Computes the meters and logs the metrics to the json file and to logger streams (stdout, file). """ phase_type = "train" if task.train else "test" rank, _ = get_machine_local_and_dist_rank() checkpoint_folder = task.checkpoint_folder save_metrics = {} save_metrics["iteration"] = task.iteration save_metrics["phase_idx"] = task.phase_idx save_metrics["train_phase_idx"] = train_phase_idx for meter in task.meters: if len(task.meters) > 0 and ( (task.train and task.config["METERS"]["enable_training_meter"]) or (not task.train) ): meter_value = meter.value metric_key = f"{phase_type}_{meter.name}" if metric_key not in task.metrics: task.metrics[metric_key] = [] task.metrics[metric_key].append(meter_value) save_metrics[metric_key] = meter_value logging.info(f"Rank: {rank}, name: {metric_key}, value: {meter_value}") meter_file = f"{checkpoint_folder}/metrics.json" save_file(save_metrics, meter_file)
[docs]class LogPerfTimeMetricsHook(ClassyHook): """ Computes and prints performance metrics. Logs at the end of a phase or every log_freq if specified by user. """ on_start = ClassyHook._noop on_phase_start = ClassyHook._noop on_forward = ClassyHook._noop on_loss_and_meter = ClassyHook._noop on_backward = ClassyHook._noop on_phase_end = ClassyHook._noop on_end = ClassyHook._noop on_update = ClassyHook._noop on_step = ClassyHook._noop
[docs] def __init__(self, log_freq: Optional[int] = None) -> None: """ Args: log_freq: if specified, logs every log_freq batches also. """ super().__init__() self.log_freq: Optional[int] = log_freq self.start_time: Optional[float] = None
[docs] def on_phase_start(self, task: "tasks.ClassyTask") -> None: """ Initialize start time and reset perf stats """ self.start_time = time.time() task.perf_stats = PerfStats()
[docs] def on_loss_and_meter(self, task: "tasks.ClassyTask") -> None: """ Log performance metrics every log_freq batches, if log_freq is not None. """ if self.log_freq is None: return batches = len(task.losses) if batches and batches % self.log_freq == 0: self._log_performance_metrics(task)
[docs] def on_phase_end(self, task: "tasks.ClassyTask") -> None: """ Log performance metrics at the end of a phase if log_freq is None. """ batches = len(task.losses) if batches: self._log_performance_metrics(task)
def _log_performance_metrics(self, task: "tasks.ClassyTask") -> None: """ Compute and log performance metrics. """ phase_type = task.phase_type batches = len(task.losses) if self.start_time is None: logging.warning("start_time not initialized") else: # Average batch time calculation total_batch_time = time.time() - self.start_time average_batch_time = total_batch_time / batches logging.info( "Average %s batch time (ms) for %d batches: %d" % (phase_type, batches, 1000.0 * average_batch_time) ) # Train step time breakdown if task.perf_stats is None: logging.warning('"perf_stats" not set in local_variables') elif task.train: logging.info( "Train step time breakdown (rank {}):\n{}".format( get_rank(), task.perf_stats.report_str() ) )