Source code for oml.functional.metrics

from collections import defaultdict
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

import numpy as np
import torch
from torch import BoolTensor, FloatTensor, LongTensor, Tensor, isin
from tqdm.auto import tqdm

from oml.const import OVERALL_CATEGORIES_KEY
from oml.utils.misc import check_if_nonempty_positive_integers
from oml.utils.misc_torch import PCA

TMetricsDict = Dict[str, Any]
TCategories = Union[LongTensor, np.ndarray]


[docs]def calc_retrieval_metrics( retrieved_ids: Sequence[LongTensor], gt_ids: Sequence[LongTensor], query_categories: Optional[TCategories] = None, cmc_top_k: Tuple[int, ...] = (5,), precision_top_k: Tuple[int, ...] = (5,), map_top_k: Tuple[int, ...] = (5,), reduce: bool = True, verbose: bool = True, ) -> TMetricsDict: """ Function to compute different retrieval metrics. Args: retrieved_ids: First gallery indices retrieved for every query with the size of ``n_query``. Every index is within the range ``(0, n_gallery - 1)``. gt_ids: Gallery indices relevant to every query with the size of ``n_query``. Every element is within the range ``(0, n_gallery - 1)`` query_categories: Categories of queries with the size of ``n_query`` to compute metrics for each category. cmc_top_k: Values of ``k`` to calculate ``cmc@k`` (`Cumulative Matching Characteristic`) precision_top_k: Values of ``k`` to calculate ``precision@k`` map_top_k: Values of ``k`` to calculate ``map@k`` (`Mean Average Precision`) reduce: If ``False`` return metrics for each query without averaging verbose: Set ``True`` to make the function verbose. Returns: Metrics dictionary. """ assert len(retrieved_ids) == len(gt_ids) assert (query_categories is None) or (len(query_categories) == len(retrieved_ids)) # let's mark every correctly retrieved item as True and vice versa gt_tops = tuple([isin(r, g).bool() for r, g in zip(retrieved_ids, gt_ids)]) n_gts = [len(ids) for ids in gt_ids] metrics: TMetricsDict = defaultdict(dict) if cmc_top_k: cmc = calc_cmc(gt_tops, n_gts, cmc_top_k, verbose=verbose) metrics["cmc"] = dict(zip(cmc_top_k, cmc)) if precision_top_k: precision = calc_precision(gt_tops, n_gts, precision_top_k, verbose=verbose) metrics["precision"] = dict(zip(precision_top_k, precision)) if map_top_k: map_ = calc_map(gt_tops, n_gts, map_top_k, verbose=verbose) metrics["map"] = dict(zip(map_top_k, map_)) if query_categories is not None: metrics_cat = {c: take_unreduced_metrics_by_mask(metrics, query_categories == c) for c in query_categories} metrics = {OVERALL_CATEGORIES_KEY: metrics, **metrics_cat} if reduce: metrics = reduce_metrics(metrics) return metrics
[docs]def calc_topological_metrics( embeddings: Tensor, pcf_variance: Tuple[float, ...], categories: Optional[TCategories] = None, verbose: bool = False ) -> TMetricsDict: """ Function to evaluate different topological metrics. Args: embeddings: Embeddings matrix with the shape of ``[n_embeddings, embeddings_dim]``. categories: Categories of embeddings to compute category wise metrics. pcf_variance: Values in range [0, 1]. Find the number of components such that the amount of variance that needs to be explained is greater than the percentage specified by ``pcf_variance``. verbose: Set ``True`` to see a progress bar. Returns: Metrics dictionary. """ assert (categories is None) or (len(categories) == len(embeddings)) metrics: TMetricsDict = defaultdict(dict) if pcf_variance: main_components = calc_pcf(embeddings, pcf_variance) metrics["pcf"] = dict(zip(pcf_variance, main_components)) if pcf_variance and (categories is not None): categories_unq = np.unique(categories) data = tqdm(categories_unq, desc="Topologic metrics on different categories") if verbose else categories_unq metrics_cat = { c: calc_topological_metrics(embeddings[categories == c], pcf_variance, categories=None) for c in data } metrics = {OVERALL_CATEGORIES_KEY: metrics, **metrics_cat} return metrics
def reduce_metrics(metrics_to_reduce: TMetricsDict) -> TMetricsDict: output: TMetricsDict = {} for k, v in metrics_to_reduce.items(): if isinstance(v, (Tensor, np.ndarray)): output[k] = v.mean() elif isinstance(v, (float, int)): output[k] = v else: output[k] = reduce_metrics(v) # type: ignore return output def take_unreduced_metrics_by_mask(metrics: TMetricsDict, mask: BoolTensor) -> TMetricsDict: output: TMetricsDict = {} for k, v in metrics.items(): if isinstance(v, Tensor): output[k] = v[mask] if v.numel() > 1 else v elif isinstance(v, (float, int)): output[k] = v else: output[k] = take_unreduced_metrics_by_mask(v, mask) # type: ignore return output
[docs]def calc_cmc( gt_tops: Sequence[BoolTensor], n_gts: List[int], top_k: Tuple[int, ...], verbose: bool = False ) -> List[FloatTensor]: """ Function to compute Cumulative Matching Characteristics (CMC) at cutoffs ``top_k``. ``cmc@k`` for a given query equals to 1 if there is at least 1 instance related to this query in top ``k`` gallery instances sorted by distances to the query, and 0 otherwise. The final ``cmc@k`` could be obtained by averaging the results calculated for each query. Args: gt_tops: Indicators that show if retrievied items are correct or not: ``gt_tops[i][j]`` is ``True`` if ``j``-th gallery item is related to the ``i``-th query item. n_gts: Number of existing ground truths for every query. top_k: Values of ``k`` to calculate ``cmc@k``. verbose: Set ``True`` to see progress bar. Returns: List of ``cmc@k`` tensors computed for every query. .. math:: \\textrm{cmc}@k = \\begin{cases} 1, & \\textrm{if top-}k \\textrm{ ranked gallery samples include an output relevant to the query}, \\\\ 0, & \\textrm{otherwise}. \\end{cases} Example: >>> gt_tops = [ ... BoolTensor([1, 0]), ... BoolTensor([0, 1, 1]), ... BoolTensor([0, 0]), ... BoolTensor([]) ... ] >>> n_gts = [2, 2, 1, 0] >>> calc_cmc(gt_tops, n_gts, top_k=(1, 2)) [tensor([1., 0., 0., 1.]), tensor([1., 1., 0., 1.])] """ check_if_nonempty_positive_integers(top_k, "top_k") def cmc_single(is_correct: BoolTensor, n_gt: int, k_: int) -> float: if n_gt == 0 and len(is_correct) == 0: return 1.0 elif n_gt > 0 and len(is_correct) == 0: return 0.0 else: value = float(is_correct[:k_].any()) return value cmc = [] for k in top_k: items = tqdm(zip(gt_tops, n_gts), desc=f"CMC@{k}", total=len(gt_tops)) if verbose else zip(gt_tops, n_gts) cmc.append(FloatTensor([cmc_single(gts, n_gt, k) for gts, n_gt in items])) return cmc
[docs]def calc_precision( gt_tops: Sequence[BoolTensor], n_gts: List[int], top_k: Tuple[int, ...], verbose: bool = False ) -> List[FloatTensor]: """ Function to compute Precision at cutoffs ``top_k``. ``precision@k`` for a given query is a fraction of the relevant gallery instances among the top ``k`` instances sorted by distances from the query to the gallery. The final ``precision@k`` could be obtained by averaging the results calculated for each query. Args: gt_tops: Indicators that show if retrievied items are correct or not: ``gt_tops[i][j]`` is ``True`` if ``j``-th gallery item is related to the ``i``-th query item. n_gts: Number of existing ground truth for every query. top_k: Values of ``k`` to calculate ``precision@k``. verbose: Set ``True`` to see progress bar. Returns: List of ``precision@k`` tensors computed for every query. Given a list :math:`g=[g_1, \\ldots, g_k]` of ground truth top :math:`k` closest elements from the gallery to a given query (:math:`g_i` is 1 if :math:`i`-th element from the gallery is relevant to the query and 0 otherwise), and the total number of relevant elements from the gallery :math:`n`, the :math:`\\textrm{precision}@k` for the query is defined as .. math:: \\textrm{precision}@k = \\frac{1}{\\min{\\left(k, n\\right)}}\\sum\\limits_{i = 1}^k g_i It's worth mentioning that OML version of :math:`\\textrm{precision}@k` differs from the commonly used by the denominator of the fraction. The OML version takes into account the total amount of relevant elements in the gallery, so it will not penalize the ideal model if :math:`n < k`. For instance, let :math:`n = 3` and :math:`g = [1, 1, 1, 0, 0]`. Then by using the common definition of :math:`\\textrm{precision}@k` we get .. math:: \\begin{align} \\textrm{precision}@1 &= \\frac{1}{1}, \\textrm{precision}@2 = \\frac{2}{2}, \\textrm{precision}@3 = \\frac{3}{3}, \\\\ \\textrm{precision}@4 &= \\frac{3}{4}, \\textrm{precision}@5 = \\frac{3}{5}, \\textrm{precision}@6 = \\frac{3}{6} \\\\ \\end{align} But with OML definition of :math:`\\textrm{precision}@k` we get .. math:: \\begin{align} \\textrm{precision}@1 &= \\frac{1}{1}, \\textrm{precision}@2 = \\frac{2}{2}, \\textrm{precision}@3 = \\frac{3}{3} \\\\ \\textrm{precision}@4 &= \\frac{3}{3}, \\textrm{precision}@5 = \\frac{3}{3}, \\textrm{precision}@6 = \\frac{3}{3} \\\\ \\end{align} See: `Evaluation measures (information retrieval). Precision@k`_ .. _`Evaluation measures (information retrieval). Precision@k`: https://en.wikipedia.org/wiki/Evaluation_measures_(information_retrieval)#Precision_at_k Example: >>> gt_tops = [ ... BoolTensor([1, 0]), ... BoolTensor([0, 1, 1]), ... BoolTensor([0, 0]), ... BoolTensor([]) ... ] >>> n_gts = [2, 3, 5, 2] >>> calc_precision(gt_tops, n_gts, top_k=(1, 2)) [tensor([1., 0., 0., 0.]), tensor([0.5000, 0.5000, 0.0000, 0.0000])] """ check_if_nonempty_positive_integers(top_k, "top_k") def precision_single(is_correct: BoolTensor, n_gt: int, k_: int) -> float: if n_gt == 0 and len(is_correct) == 0: return 1.0 elif n_gt > 0 and len(is_correct) == 0: return 0.0 else: k_ = min(k_, len(is_correct)) value = torch.cumsum(is_correct, dim=0)[k_ - 1] / min(n_gt, k_) return float(value) precision = [] for k in top_k: items = tqdm(zip(gt_tops, n_gts), desc=f"Precision@{k}", total=len(n_gts)) if verbose else zip(gt_tops, n_gts) precision.append(FloatTensor([precision_single(gts, n_gt, k) for gts, n_gt in items])) return precision
[docs]def calc_map( gt_tops: Sequence[BoolTensor], n_gts: List[int], top_k: Tuple[int, ...], verbose: bool = False ) -> List[FloatTensor]: """ Function to compute Mean Average Precision (MAP) at cutoffs ``top_k``. ``map@k`` for a given query is the average value of the ``precision`` considered as a function of the ``recall``. The final ``map@k`` could be obtained by averaging the results calculated for each query. Args: gt_tops: Indicators that show if retrievied items are correct or not: ``gt_tops[i][j]`` is ``True`` if ``j``-th gallery item is related to the ``i``-th query item. n_gts: Number of existing ground truth for every query. top_k: Values of ``k`` to calculate ``map@k``. verbose: Set ``True`` to see progress bar. Returns: List of ``map@k`` tensors computed for every query. Given a list :math:`g=[g_1, \\ldots, g_k]` of ground truth top :math:`k` closest elements from the gallery to a given query (:math:`g_i` is 1 if :math:`i`-th element from the gallery is relevant to the query and 0 otherwise), and the total number of relevant elements from the gallery :math:`n`, the :math:`\\textrm{map}@k` for the query is defined as .. math:: \\begin{split} \\textrm{map}@k &= \\frac{1}{n_k}\\sum\\limits_{i = 1}^k \\frac{n_i}{i} \\times \\textrm{rel}(i) \\end{split} where :math:`\\textrm{rel}(i)` is 1 if :math:`i`-th element from the top :math:`i` closest elements from the gallery to the query is relevant to the query, and 0 otherwise; and :math:`n_i = \\sum\\limits_{j = 1}^{i}g_j`, which is the number of the relevant predictions among the first :math:`i` outputs. See: `Evaluation measures (information retrieval). Mean Average Precision`_ `Mean Average Precision (MAP) For Recommender Systems`_ .. _`Evaluation measures (information retrieval). Mean Average Precision`: https://en.wikipedia.org/wiki/Evaluation_measures_(information_retrieval)#Mean_average_precision .. _`Mean Average Precision (MAP) For Recommender Systems`: https://sdsawtelle.github.io/blog/output/mean-average-precision-MAP-for-recommender-systems.html Example: >>> gt_tops = [ ... BoolTensor([1, 0]), ... BoolTensor([0, 1]), ... BoolTensor([0, 0, 0, 0]), ... BoolTensor([]) ... ] >>> n_gts = [1, 1, 2, 0] >>> calc_map(gt_tops, n_gts, top_k=(1, 2)) [tensor([1., 0., 0., 1.]), tensor([1.0000, 0.5000, 0.0000, 1.0000])] """ check_if_nonempty_positive_integers(top_k, "top_k") def map_single(is_correct: BoolTensor, n_gt: int, k_: int) -> float: if n_gt == 0 and len(is_correct) == 0: return 1.0 elif n_gt > 0 and len(is_correct) == 0: return 0.0 else: k_ = min(k_, len(is_correct)) correct_preds = torch.cumsum(is_correct, dim=0).float() positions = torch.arange(1, k_ + 1).to(correct_preds.device) n_k = correct_preds[k_ - 1].clone() n_k[n_k < 1] = torch.inf # hack to avoid zero division value = torch.sum((correct_preds[:k_] / positions) * is_correct[:k_], dim=0) / n_k return float(value) map_ = [] for k in top_k: items = tqdm(zip(gt_tops, n_gts), total=len(gt_tops), desc=f"MAP@{k}") if verbose else zip(gt_tops, n_gts) map_.append(FloatTensor([map_single(is_correct, n_gt, k) for is_correct, n_gt in items])) return map_
[docs]def calc_fnmr_at_fmr(pos_dist: np.ndarray, neg_dist: np.ndarray, fmr_vals: Tuple[float, ...] = (0.1,)) -> FloatTensor: """ Function to compute False Non Match Rate (FNMR) value when False Match Rate (FMR) value is equal to ``fmr_vals``. The metric calculates the quantile of positive distances higher than a given :math:`q`-th quantile of negative distances. Args: pos_dist: Distances between relevant samples. neg_dist: Distances between non-relevant samples. fmr_vals: Values of ``fmr`` (measured in quantiles) to compute the corresponding ``fnmr``. For example, if ``fmr_values`` is (0.2, 0.4) we will calculate ``fnmr@fmr=0.2`` and ``fnmr@fmr=0.4`` Returns: Tensor of ``fnmr@fmr`` values. Given a vector of :math:`N` distances between relevant samples, :math:`u`, the false non-match rate (:math:`\\textrm{FNMR}`) is computed as the proportion of :math:`u` below some threshold, :math:`T`: .. math:: \\textrm{FNMR}(T) = \\frac{1}{N}\\sum\\limits_{i = 1}^{N}H\\left(u_i - T\\right) = 1 - \\frac{1}{N}\\sum\\limits_{i = 1}^{N}H\\left(T - u_i\\right) where :math:`H(x)` is the unit step function, and :math:`H(0)` taken to be :math:`1`. Similarly, given a vector of :math:`N` distances between non-relevant samples, :math:`v`, the false match rate (:math:`\\textrm{FMR}`) is computed as the proportion of :math:`v` above some threshold, :math:`T`: .. math:: \\textrm{FMR}(T) = 1 - \\frac{1}{N}\\sum\\limits_{i = 1}^{N}H\\left(v_i - T\\right) = \\frac{1}{N}\\sum\\limits_{i = 1}^{N}H\\left(T - v_i\\right) Given some interesting false match rate values :math:`\\textrm{FMR}_k` one can find thresholds :math:`T_k` corresponding to :math:`\\textrm{FMR}` measurements .. math:: T_k = Q_v\\left(\\textrm{FMR}_k\\right) where :math:`Q` is the quantile function, and evaluate the corresponding values of :math:`\\textrm{FNMR}@\\textrm{FMR}\\left(T_k\\right) \\stackrel{\\text{def}}{=} \\textrm{FNMR}\\left(T_k\\right)`. See: `Biometrics Performance`_ `BIOMETRIC RECOGNITION: A MODERN ERA FOR SECURITY`_ .. _`Biometrics Performance`: https://en.wikipedia.org/wiki/Biometrics#Performance .. _`BIOMETRIC RECOGNITION: A MODERN ERA FOR SECURITY`: https://www.researchgate.net/publication/50315614_BIOMETRIC_RECOGNITION_A_MODERN_ERA_FOR_SECURITY Example: >>> pos_dist = np.array([0, 0, 1, 1, 2, 2, 5, 5, 9, 9]) >>> neg_dist = np.array([3, 3, 4, 4, 6, 6, 7, 7, 8, 8]) >>> calc_fnmr_at_fmr(pos_dist, neg_dist, fmr_vals=(0.1, 0.5)) tensor([0.4000, 0.2000]) """ _check_if_in_range(fmr_vals, 0, 1, "fmr_vals") thresholds = np.quantile(neg_dist, fmr_vals) # we use numpy because it can take bigger input fnmr_at_fmr = (pos_dist[None, :] >= thresholds[:, None]).sum(axis=1) / len(pos_dist) return FloatTensor(fnmr_at_fmr)
def calc_fnmr_at_fmr_by_distances( pos_dist: np.ndarray, neg_dist: np.ndarray, fmr_vals: Tuple[float, ...] ) -> TMetricsDict: metrics: TMetricsDict = dict() if fmr_vals: fnmr_at_fmr = calc_fnmr_at_fmr(pos_dist, neg_dist, fmr_vals) metrics["fnmr@fmr"] = dict(zip(fmr_vals, fnmr_at_fmr)) return metrics
[docs]def calc_pcf(embeddings: Tensor, pcf_variance: Tuple[float, ...]) -> List[Tensor]: """ Function estimates the Principal Components Fraction (PCF) of embeddings using Principal Component Analysis. The metric is defined as a fraction of components needed to explain the required variance in data. Args: embeddings: Embeddings matrix with the shape of ``[n_embeddings, embeddings_dim]``. pcf_variance: Values in range [0, 1]. Find the number of components such that the amount of variance that needs to be explained is greater than the fraction specified by ``pcf_variance``. Returns: List of linear dimensions as a fractions of the embeddings dimension. Let :math:`X` be a set of :math:`d` dimensional embeddings. Let :math:`\\lambda_1, \\ldots, \\lambda_d\\in\\mathbb{R}` be a set of eigenvalues of the covariance matrix of :math:`X` sorted in descending order. Then for a given value of desired explained variance :math:`r`, the number of principal components that explaines :math:`r\\cdot 100\\%%` variance is the largest integer :math:`n` such that .. math:: \\frac{\\sum\\limits_{i = 1}^{n - 1}\\lambda_i}{\\sum\\limits_{i = 1}^{d}\\lambda_i} \\leq r The function returns .. math:: \\frac{n}{d} See: `Principal Components Analysis`_ .. _`Principal Components Analysis`: https://en.wikipedia.org/wiki/Principal_component_analysis Example: In the example bellow there are 4 vectors of length 10, and only first 4 dimensions have non-zero values. Its covariance matrix will have only 4 eigenvalues that are greater than 0, i.e. there are only 4 principal axes. So, in order to keep at least 50% of the information from the set, we need to keep 2 principal axes, and in order to keep all the information we need to keep 5 principal axes (one additional axis appears because the number of principal axes is superior to the desired explained variance threshold). >>> embeddings = torch.eye(4, 10, dtype=torch.float) >>> calc_pcf(embeddings, pcf_variance=(0.5, 1)) tensor([0.2000, 0.5000]) """ # The code below mirrors code from scikit-learn repository: # https://github.com/scikit-learn/scikit-learn/blob/f3f51f9b6/sklearn/decomposition/_pca.py#L491 _check_if_in_range(pcf_variance, 0, 1, "pcf_variance") try: pca = PCA(embeddings) n_components = pca.calc_principal_axes_number(pcf_variance).to(embeddings) metric = n_components / embeddings.shape[1] except Exception: # Mostly we handle the following error here: # >>> The algorithm failed to converge because the input matrix is ill-conditioned # >>> or has too many repeated singular values metric = [torch.tensor(float("nan"))] * len(pcf_variance) return metric
def _check_if_in_range(vals: Sequence[float], min_: float, max_: float, name: str) -> None: """ Check whether the ``vals`` are in the range ``[min_, max_]``. Throw the ValueError if not. Args: vals: Sequence to check. min_: Minimal value of the range. max_: Maximal value of the range. name: Name of the variable to throw the ValueError for. """ if len(vals) == 0: raise ValueError(f"{name} is expected to be not empty, but got {vals}") if not all(min_ <= x <= max_ for x in vals): raise ValueError(f"{name} is expected to contain numbers in range [{min_}, {max_}], but got {vals}") __all__ = [ "TMetricsDict", "calc_retrieval_metrics", "calc_topological_metrics", "reduce_metrics", "take_unreduced_metrics_by_mask", "calc_fnmr_at_fmr", ]