# Copyright (c) Facebook, Inc. and its affiliates.
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
import collections
import logging
import os
import random
import sys
import tempfile
import time
from functools import partial, wraps
import numpy as np
import pkg_resources
import torch
import torch.multiprocessing as mp
from iopath.common.file_io import g_pathmgr
from scipy.sparse import csr_matrix
from vissl.utils.extract_features_utils import ExtractedFeaturesLoader
[docs]def is_fairscale_sharded_available():
"""
Check if the fairscale version has the ShardedGradScaler()
to use with ZeRO + PyTorchAMP
"""
try:
from fairscale.optim.grad_scaler import ShardedGradScaler # NOQA
fairscale_sharded_available = True
except ImportError:
fairscale_sharded_available = False
return fairscale_sharded_available
[docs]def is_faiss_available():
"""
Check if faiss is available with simple python imports.
To install faiss, simply do:
If using PIP env: `pip install faiss-gpu`
If using conda env: `conda install faiss-gpu -c pytorch`
"""
try:
import faiss # NOQA
faiss_available = True
except ImportError:
faiss_available = False
return faiss_available
[docs]def is_opencv_available():
"""
Check if opencv is available with simple python imports.
To install opencv, simply do: `pip install opencv-python`
regardless of whether using conda or pip environment.
"""
try:
import cv2 # NOQA
opencv_available = True
except ImportError:
opencv_available = False
return opencv_available
[docs]def is_apex_available():
"""
Check if apex is available with simple python imports.
"""
try:
import apex # NOQA
apex_available = True
except ImportError:
apex_available = False
return apex_available
[docs]def is_augly_available():
"""
Check if apex is available with simple python imports.
"""
try:
assert sys.version_info >= (
3,
7,
0,
), "Please upgrade your python version to 3.7 or higher to use Augly."
import augly.image # NOQA
augly_available = True
except (AssertionError, ImportError):
augly_available = False
return augly_available
[docs]def find_free_tcp_port():
"""
Find the free port that can be used for Rendezvous on the local machine.
We use this for 1 machine training where the port is automatically detected.
"""
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Binding to port 0 will cause the OS to find an available port for us
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
# NOTE: there is still a chance the port could be taken by other processes.
return port
[docs]def get_dist_run_id(cfg, num_nodes):
"""
For multi-gpu training with PyTorch, we have to specify
how the gpus are going to rendezvous. This requires specifying
the communication method: file, tcp and the unique rendezvous run_id that
is specific to 1 run.
We recommend:
1) for 1-node: use init_method=tcp and run_id=auto
2) for multi-node, use init_method=tcp and specify run_id={master_node}:{port}
"""
init_method = cfg.DISTRIBUTED.INIT_METHOD
run_id = cfg.DISTRIBUTED.RUN_ID
if init_method == "tcp" and cfg.DISTRIBUTED.RUN_ID == "auto":
assert (
num_nodes == 1
), "cfg.DISTRIBUTED.RUN_ID=auto is allowed for 1 machine only."
port = find_free_tcp_port()
run_id = f"localhost:{port}"
elif init_method == "file":
if num_nodes > 1:
logging.warning(
"file is not recommended to use for distributed training on > 1 node"
)
# Find a unique tempfile if needed.
if not run_id or run_id == "auto":
unused_fno, run_id = tempfile.mkstemp()
elif init_method == "tcp" and cfg.DISTRIBUTED.NUM_NODES > 1:
assert cfg.DISTRIBUTED.RUN_ID, "please specify RUN_ID for tcp"
elif init_method == "env":
assert num_nodes == 1, "can not use 'env' init method for multi-node. Use tcp"
return run_id
[docs]def setup_multiprocessing_method(method_name: str):
"""
PyTorch supports several multiprocessing options: forkserver | spawn | fork
We recommend and use forkserver as the default method in VISSL.
"""
try:
mp.set_start_method(method_name, force=True)
logging.info("Set start method of multiprocessing to {}".format(method_name))
except RuntimeError:
pass
[docs]def set_seeds(cfg, dist_rank):
"""
Set the python random, numpy and torch seed for each gpu. Also set the CUDA
seeds if the CUDA is available. This ensures deterministic nature of the training.
"""
# Since in the pytorch sampler, we increment the seed by 1 for every epoch.
seed_value = (cfg.SEED_VALUE + dist_rank) * cfg.OPTIMIZER.num_epochs
logging.info(f"MACHINE SEED: {seed_value}")
random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
if cfg["MACHINE"]["DEVICE"] == "gpu" and torch.cuda.is_available():
torch.cuda.manual_seed_all(seed_value)
[docs]def set_dataloader_seeds(_worker_id: int):
"""
See: https://tanelp.github.io/posts/a-bug-that-plagues-thousands-of-open-source-ml-projects/
When using "Fork" process spawning, the dataloader workers inherit the seeds of the
parent process for numpy. While torch seeds are handled correctly across dataloaders and
across epochs, numpy seeds are not. Therefore in order to ensure each worker has a
different and deterministic seed, we must explicitly set the numpy seed to the torch seed.
Also see https://pytorch.org/docs/stable/data.html#randomness-in-multi-process-data-loading
"""
# numpy and random seed must be between 0 and 2 ** 32 - 1.
torch_seed = torch.utils.data.get_worker_info().seed % (2 ** 32)
random.seed(torch_seed)
np.random.seed(torch_seed)
[docs]def get_indices_sparse(data):
"""
Is faster than np.argwhere. Used in loss functions like swav loss, etc
"""
cols = np.arange(data.size)
M = csr_matrix((cols, (data.ravel(), cols)), shape=(data.max() + 1, data.size))
return [np.unravel_index(row.data, data.shape) for row in M]
[docs]def merge_features(input_dir: str, split: str, layer: str):
return ExtractedFeaturesLoader.load_features(input_dir, split, layer)
[docs]def get_json_catalog_path(default_dataset_catalog_path: str) -> str:
"""
Gets dataset catalog json file absolute path.
Optionally set environment variable VISSL_DATASET_CATALOG_PATH for dataset catalog path.
Useful for local development and/or remote server configuration.
"""
dataset_catalog_path = os.environ.get(
"VISSL_DATASET_CATALOG_PATH", default_dataset_catalog_path
)
# If catalog path is the default and we cannot find it, we want to continue without failing.
if os.environ.get("VISSL_DATASET_CATALOG_PATH", False):
assert g_pathmgr.exists(
dataset_catalog_path
), f"Dataset catalog path: { dataset_catalog_path } not found."
return dataset_catalog_path
[docs]def get_json_data_catalog_file():
"""
Searches for the dataset_catalog.json file that contains information about
the dataset paths if set by user.
"""
default_path = pkg_resources.resource_filename(
"configs", "config/dataset_catalog.json"
)
json_catalog_path = get_json_catalog_path(default_path)
return json_catalog_path
[docs]@torch.no_grad()
def concat_all_gather(tensor):
"""
Performs all_gather operation on the provided tensors.
*** Warning ***: torch.distributed.all_gather has no gradient.
"""
tensors_gather = [
torch.ones_like(tensor) for _ in range(torch.distributed.get_world_size())
]
torch.distributed.all_gather(tensors_gather, tensor, async_op=False)
output = torch.cat(tensors_gather, dim=0)
return output
[docs]def get_rng_state():
state = {"torch_rng_state": torch.get_rng_state()}
if torch.cuda.is_available():
state["cuda_rng_state"] = torch.cuda.get_rng_state()
return state
[docs]def set_rng_state(state):
torch.set_rng_state(state["torch_rng_state"])
if torch.cuda.is_available():
torch.cuda.set_rng_state(state["cuda_rng_state"])
[docs]class set_torch_seed(object):
def __init__(self, seed):
assert isinstance(seed, int)
self.rng_state = get_rng_state()
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
def __enter__(self):
return self
def __exit__(self, *exc):
set_rng_state(self.rng_state)
# Credit: https://stackoverflow.com/questions/42521549/retry-function-in-python
[docs]def retry(func=None, exception=Exception, n_tries=5, delay=5, backoff=1, logger=False):
"""Retry decorator with exponential backoff.
Parameters
----------
func : typing.Callable, optional
Callable on which the decorator is applied, by default None
exception : Exception or tuple of Exceptions, optional
Exception(s) that invoke retry, by default Exception
n_tries : int, optional
Number of tries before giving up, by default 5
delay : int, optional
Initial delay between retries in seconds, by default 5
backoff : int, optional
Backoff multiplier e.g. value of 2 will double the delay, by default 1
logger : bool, optional
Option to log or print, by default False
Returns
-------
typing.Callable
Decorated callable that calls itself when exception(s) occur.
Examples
--------
>>> import random
>>> @retry(exception=Exception, n_tries=4)
... def test_random(text):
... x = random.random()
... if x < 0.5:
... raise Exception("Fail")
... else:
... print("Success: ", text)
>>> test_random("It works!")
"""
if func is None:
return partial(
retry,
exception=exception,
n_tries=n_tries,
delay=delay,
backoff=backoff,
logger=logger,
)
@wraps(func)
def wrapper(*args, **kwargs):
ntries, ndelay = n_tries, delay
while ntries > 1:
try:
return func(*args, **kwargs)
except exception as e:
msg = f"{str(e)}, Retrying in {ndelay} seconds..."
if logger:
logging.warning(msg)
else:
print(msg)
time.sleep(ndelay)
ntries -= 1
ndelay *= backoff
return func(*args, **kwargs)
return wrapper
# Credit: https://stackoverflow.com/questions/6027558/flatten-nested-dictionaries-compressing-keys # NOQA
[docs]def flatten_dict(d: dict, parent_key="", sep="_"):
"""
Flattens a dict, delimited with a '_'. For example the input:
{
'top_1': {
'res_5': 100
}
}
will return:
{
'top_1_res_5': 100
}
"""
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten_dict(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
# Credit: https://stackoverflow.com/questions/7204805/how-to-merge-dictionaries-of-dictionaries
[docs]def recursive_dict_merge(dict1, dict2):
"""
Recursively merges dict2 into dict1
"""
if not isinstance(dict1, dict) or not isinstance(dict2, dict):
return dict2
for k in dict2:
if k in dict1:
dict1[k] = recursive_dict_merge(dict1[k], dict2[k])
else:
dict1[k] = dict2[k]
return dict1