Source code for kwcoco.metrics.confusion_measures

"""
Classes that store accumulated confusion measures (usually derived from
confusion vectors).


For each chosen threshold value:
    * thresholds[i] - the i-th threshold value


The primary data we manipulate are arrays of "confusion" counts, i.e.

    * tp_count[i] - true positives at the i-th threshold
    * fp_count[i] - false positives at the i-th threshold
    * fn_count[i] - false negatives at the i-th threshold
    * tn_count[i] - true negatives at the i-th threshold

"""
import kwarray
import numpy as np
import ubelt as ub
import warnings
from kwcoco.util.dict_like import DictProxy


[docs] class Measures(ub.NiceRepr, DictProxy): """ Dict-like container for accumulated confusion counts and derived metrics. Holds accumulated confusion counts, and derived measures. At minimum this class needs to be given an array of thresholds and corresponding arrays of FP, FP, TN, FN counts at each threshold. These are generally computed by :class:`kwcoco.metrics.confusion_vectors.BinaryConfusionVectors`. From there, other higher level metrics such as AP, AUC, max-F1, max-MCC etc can be computed. Example: >>> from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors # NOQA >>> binvecs = BinaryConfusionVectors.demo(n=100, p_error=0.5) >>> self = binvecs.measures() >>> summary = self.summary() >>> print(f'summary = {ub.urepr(summary, nl=1)}') >>> print('self = {!r}'.format(self)) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> self.draw(doclf=True) >>> self.draw(key='pr', pnum=(1, 2, 1)) >>> self.draw(key='roc', pnum=(1, 2, 2)) >>> kwplot.show_if_requested() """ def __init__(self, info): self.proxy = info @property def catname(self): """ Category name associated with these measures, if any (``node`` key). """ return self.get('node', None) def __nice__(self): info = self.summary() return ub.urepr(info, nl=0, precision=3, strvals=True, align=':')
[docs] def reconstruct(self): """ Recomputes derivable measures (e.g. AP, F1) from raw confusion counts. Returns: Self """ populate_info(info=self) return self
[docs] @classmethod def from_json(cls, state): """ Construct from a minimally serialized state. """ populate_info(state) return cls(state)
def __json__(self): """ Return a minimal, JSON-serializable snapshot of the measures. The snapshot includes the raw counts, thresholds, totals, and selected derived scalars (e.g., ``auc``, ``ap``) when present. Omitted fields can be recomputed with :func:`Measures.reconstruct`. Example: >>> from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors # NOQA >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> binvecs = BinaryConfusionVectors.demo(n=10, p_error=0.5) >>> self = binvecs.measures() >>> info = self.__json__() >>> print('info = {}'.format(ub.urepr(info, nl=1))) >>> populate_info(info) >>> print('info = {}'.format(ub.urepr(info, nl=1))) >>> recon = Measures.from_json(info) """ from kwcoco.util.util_json import ensure_json_serializable state = {} minimal = { 'fp_count', 'tp_count', 'tn_count', 'fn_count', 'thresholds', 'realpos_total', 'realneg_total', 'trunc_idx', 'nsupport', 'fp_cutoff', 'stabalize_thresh', 'cx', 'node', 'monotonic_ppv', } nice_to_have = { # recomputable # 'mcc', 'g1', 'f1', 'ppv', 'tpr', 'fpr', 'acc', 'bm', 'mk', # 'max_mcc', '_max_mcc', 'max_g1', '_max_g1', 'max_f1', # '_max_f1', 'max_acc', '_max_acc', # 'trunc_tpr', 'trunc_fpr', 'trunc_auc', 'auc', 'ap', # 'sklish_ap', 'pycocotools_ap', # 'outlier_ap', 'sklearn', } state = ub.dict_isect(self.proxy, minimal | nice_to_have) state = ensure_json_serializable(state) return state
[docs] def summary(self): """ A concise dictionary with summary level information about the measures Returns: dict """ return { 'ap': self.get('ap', None), 'auc': self.get('auc', None), # 'max_mcc': self['max_mcc'], 'max_f1': self.get('max_f1', None), # 'max_g1': self['max_g1'], 'nsupport': self.get('nsupport', None), 'realpos_total': self.get('realpos_total', None), # 'realneg_total': self['realneg_total'], 'catname': self.get('node', None), }
[docs] def maximized_thresholds(self): """ Returns thresholds that maximize metrics. Returns: Dict[str, Dict[str, Any]]: For each metric (e.g., ``"f1"``, ``"mcc"``), a dict with: ``{"thresh": float, "metric_value": float, "metric_name": str}``. Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> self = Measures.demo() >>> info = self.maximized_thresholds() >>> print(f'info = {ub.urepr(info, nl=1, precision=2)}') """ # TODO: clean up the underlying storage of this info maximized_info = {} for k in self.keys(): if k.startswith('_max_'): v = self[k] metric_value, thresh_value = v metric_name = k.split('_max_', 1)[1] maximized_info[metric_name] = { 'thresh': thresh_value, 'metric_value': metric_value, 'metric_name': metric_name, } return maximized_info
[docs] def scalars(self): """ Return the computed metrics without the full curve content Returns: Dict: The underlying map with large arrays removed. Example: >>> from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors # NOQA >>> binvecs = BinaryConfusionVectors.demo(n=100, p_error=0.5) >>> self = binvecs.measures() >>> scalars = self.scalars() >>> print(f'scalars = {ub.urepr(scalars, nl=1)}') """ scalar_data = ub.dict_diff(self, [ 'fp_count', 'tp_count', 'tn_count', 'fn_count', 'thresholds', 'trunc_fpr', 'monotonic_ppv', 'trunc_fp_count', 'trunc_tp_count', 'trunc_tpr', 'trunc_thresholds', 'tpr', 'ppv', 'tpr', 'fpr', 'bm', 'mk', 'acc', 'tnr', 'npv', 'mcc', 'f1', 'g1', ]) return scalar_data
[docs] def counts(self): """ Just return the curves, from which most other data is computed (subject to metadata, see __json__ for actual minimal metadata) Returns: kwarray.DataFrameArray """ counts_df = ub.dict_isect(self, [ 'fp_count', 'tp_count', 'tn_count', 'fn_count', 'thresholds']) counts_df = kwarray.DataFrameArray(counts_df) return counts_df
[docs] def draw(self, key=None, prefix='', **kw): """ Draw a specified metric curve using matplotlib. Args: key (str | None): - ``None`` or ``"thresh"``: threshold vs metric curves - ``"pr"``: precision–recall curve - ``"roc"``: ROC curve prefix (str): Label prefix for legends/titles. **kw Forwarded to the underlying drawing helpers. TODO: - [ ] Modernize these plots with seaborn Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> # xdoctest: +REQUIRES(module:pandas) >>> from kwcoco.metrics.confusion_vectors import ConfusionVectors # NOQA >>> cfsn_vecs = ConfusionVectors.demo() >>> ovr_cfsn = cfsn_vecs.binarize_ovr(keyby='name') >>> self = ovr_cfsn.measures()['perclass'] >>> self.draw('mcc', doclf=True, fnum=1) >>> self.draw('pr', doclf=1, fnum=2) >>> self.draw('roc', doclf=1, fnum=3) """ from kwcoco.metrics import drawing if key is None or key == 'thresh': return drawing.draw_threshold_curves(self, prefix=prefix, **kw) elif key == 'pr': return drawing.draw_prcurve(self, prefix=prefix, **kw) elif key == 'roc': return drawing.draw_roc(self, prefix=prefix, **kw)
[docs] def summary_plot(self, fnum=1, title='', subplots='auto'): """ Draws a figure with multiple metric curves using :func:`Measures.draw`. Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.confusion_vectors import ConfusionVectors # NOQA >>> cfsn_vecs = ConfusionVectors.demo(n=3, p_error=0.5) >>> binvecs = cfsn_vecs.binarize_classless() >>> self = binvecs.measures() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> self.summary_plot() >>> kwplot.show_if_requested() """ if subplots == 'auto' or (isinstance(subplots, int) and subplots): subplots = ['pr', 'roc', 'thresh'] import kwplot kwplot.figure(fnum=fnum, figtitle=title) pnum_ = kwplot.PlotNums(nSubplots=len(subplots)) for key in subplots: kwplot.figure(fnum=fnum, pnum=pnum_()) self.draw(key, fnum=fnum)
# kwplot.figure(fnum=fnum, pnum=(1, 3, 2)) # self.draw('roc') # kwplot.figure(fnum=fnum, pnum=(1, 3, 3)) # self.draw('thresh', keys=['mcc', 'f1', 'acc'])
[docs] @classmethod def demo(cls, **kwargs): """ Create a demo Measures object for testing / demos Args: **kwargs: passed to :func:`BinaryConfusionVectors.demo`. some valid keys are: n, rng, p_rue, p_error, p_miss. """ from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors bin_cfsn = BinaryConfusionVectors.demo(**kwargs) measures = bin_cfsn.measures() return measures
[docs] @classmethod def combine(cls, tocombine, precision=None, growth=None, thresh_bins=None): """ Combine binary confusion metrics Args: tocombine (List[Measures]): a list of measures to combine into one precision (int | None): If specified rounds thresholds to this precision which can prevent a RAM explosion when combining a large number of measures. However, this is a lossy operation and will impact the underlying scores. NOTE: use ``growth`` instead. growth (str | None): if specified this limits how much the resulting measures are allowed to grow by. If None, growth is unlimited. Otherwise, if growth is 'max', the growth is limited to the maximum length of an input. We might make this more numerical in the future. thresh_bins (int | None): Force this many threshold bins. Returns: kwcoco.metrics.confusion_measures.Measures Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> measures1 = Measures.demo(n=15) >>> measures2 = measures1 >>> tocombine = [measures1, measures2] >>> new_measures = Measures.combine(tocombine) >>> new_measures.reconstruct() >>> print('new_measures = {!r}'.format(new_measures)) >>> print('measures1 = {!r}'.format(measures1)) >>> print('measures2 = {!r}'.format(measures2)) >>> print(ub.urepr(measures1.__json__(), nl=1, sort=0)) >>> print(ub.urepr(measures2.__json__(), nl=1, sort=0)) >>> print(ub.urepr(new_measures.__json__(), nl=1, sort=0)) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.figure(fnum=1) >>> new_measures.summary_plot() >>> measures1.summary_plot() >>> measures1.draw('roc') >>> measures2.draw('roc') >>> new_measures.draw('roc') Ignore: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> rng = kwarray.ensure_rng(0) >>> tocombine = [ >>> Measures.demo(n=rng.randint(40, 50), rng=rng, p_true=0.2, p_error=0.4, p_miss=0.6) >>> for _ in range(80) >>> ] >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> for idx, growth in enumerate([None, 'max', 'log', 'root', 'half']): >>> combo = Measures.combine(tocombine, growth=growth).reconstruct() >>> print('growth = {!r}'.format(growth)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> combo.summary_plot(fnum=idx + 1, title=str(growth)) Example: >>> # Demonstrate issues that can arrise from choosing a precision >>> # that is too low when combining metrics. Breakpoints >>> # between different metrics can get muddled, but choosing a >>> # precision that is too high can overwhelm memory. >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> base = ub.map_vals(np.asarray, { >>> 'tp_count': [ 1, 1, 2, 2, 2, 2, 3], >>> 'fp_count': [ 0, 1, 1, 2, 3, 4, 5], >>> 'fn_count': [ 1, 1, 0, 0, 0, 0, 0], >>> 'tn_count': [ 5, 4, 4, 3, 2, 1, 0], >>> 'thresholds': [.0, .0, .0, .0, .0, .0, .0], >>> }) >>> # Make tiny offsets to thresholds >>> rng = kwarray.ensure_rng(0) >>> n = len(base['thresholds']) >>> offsets = [ >>> sorted(rng.rand(n) * 10 ** -rng.randint(4, 7))[::-1] >>> for _ in range(20) >>> ] >>> tocombine = [] >>> for offset in offsets: >>> base_n = base.copy() >>> base_n['thresholds'] += offset >>> measures_n = Measures(base_n).reconstruct() >>> tocombine.append(measures_n) >>> for precision in [6, 5, 2]: >>> combo = Measures.combine(tocombine, precision=precision).reconstruct() >>> print('precision = {!r}'.format(precision)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> for growth in [None, 'max', 'log', 'root', 'half']: >>> combo = Measures.combine(tocombine, growth=growth).reconstruct() >>> print('growth = {!r}'.format(growth)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> #print(combo.counts().pandas()) Example: >>> # Test case: combining a single measures should leave it unchanged >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> measures = Measures.demo(n=40, p_true=0.2, p_error=0.4, p_miss=0.6) >>> df1 = measures.counts().pandas().fillna(0) >>> print(df1) >>> tocombine = [measures] >>> combo = Measures.combine(tocombine) >>> df2 = combo.counts().pandas().fillna(0) >>> print(df2) >>> assert np.allclose(df1, df2) >>> combo = Measures.combine(tocombine, thresh_bins=2) >>> df3 = combo.counts().pandas().fillna(0) >>> print(df3) >>> # I am NOT sure if this is correct or not >>> thresh_bins = 20 >>> combo = Measures.combine(tocombine, thresh_bins=thresh_bins) >>> df4 = combo.counts().pandas().fillna(0) >>> print(df4) >>> combo = Measures.combine(tocombine, thresh_bins=np.linspace(0, 1, 20)) >>> df4 = combo.counts().pandas().fillna(0) >>> print(df4) assert np.allclose(combo['thresholds'], measures['thresholds']) assert np.allclose(combo['fp_count'], measures['fp_count']) assert np.allclose(combo['tp_count'], measures['tp_count']) assert np.allclose(combo['tp_count'], measures['tp_count']) globals().update(xdev.get_func_kwargs(Measures.combine)) Example: >>> # Test degenerate case >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> tocombine = [ >>> {'fn_count': [0.0], 'fp_count': [359980.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7747.0]}, >>> {'fn_count': [0.0], 'fp_count': [360849.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [424.0]}, >>> {'fn_count': [0.0], 'fp_count': [367003.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [991.0]}, >>> {'fn_count': [0.0], 'fp_count': [367976.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [1017.0]}, >>> {'fn_count': [0.0], 'fp_count': [676338.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7067.0]}, >>> {'fn_count': [0.0], 'fp_count': [676348.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7406.0]}, >>> {'fn_count': [0.0], 'fp_count': [676626.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7858.0]}, >>> {'fn_count': [0.0], 'fp_count': [676693.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [10969.0]}, >>> {'fn_count': [0.0], 'fp_count': [677269.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11188.0]}, >>> {'fn_count': [0.0], 'fp_count': [677331.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11734.0]}, >>> {'fn_count': [0.0], 'fp_count': [677395.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11556.0]}, >>> {'fn_count': [0.0], 'fp_count': [677418.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11621.0]}, >>> {'fn_count': [0.0], 'fp_count': [677422.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11424.0]}, >>> {'fn_count': [0.0], 'fp_count': [677648.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [9804.0]}, >>> {'fn_count': [0.0], 'fp_count': [677826.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [2470.0]}, >>> {'fn_count': [0.0], 'fp_count': [677834.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [2470.0]}, >>> {'fn_count': [0.0], 'fp_count': [677835.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [2470.0]}, >>> {'fn_count': [11123.0, 0.0], 'fp_count': [0.0, 676754.0], 'thresholds': [0.0002442002442002442, 0.0], 'tn_count': [676754.0, 0.0], 'tp_count': [2.0, 11125.0]}, >>> {'fn_count': [7738.0, 0.0], 'fp_count': [0.0, 676466.0], 'thresholds': [0.0002442002442002442, 0.0], 'tn_count': [676466.0, 0.0], 'tp_count': [0.0, 7738.0]}, >>> {'fn_count': [8653.0, 0.0], 'fp_count': [0.0, 676341.0], 'thresholds': [0.0002442002442002442, 0.0], 'tn_count': [676341.0, 0.0], 'tp_count': [0.0, 8653.0]}, >>> ] >>> thresh_bins = np.linspace(0, 1, 4) >>> combo = Measures.combine(tocombine, thresh_bins=thresh_bins).reconstruct() >>> print('tocombine = {}'.format(ub.urepr(tocombine, nl=2))) >>> print('thresh_bins = {!r}'.format(thresh_bins)) >>> print(ub.urepr(combo.__json__(), nl=1)) >>> for thresh_bins in [4096, 1]: >>> combo = Measures.combine(tocombine, thresh_bins=thresh_bins).reconstruct() >>> print('thresh_bins = {!r}'.format(thresh_bins)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> for precision in [6, 5, 2]: >>> combo = Measures.combine(tocombine, precision=precision).reconstruct() >>> print('precision = {!r}'.format(precision)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> for growth in [None, 'max', 'log', 'root', 'half']: >>> combo = Measures.combine(tocombine, growth=growth).reconstruct() >>> print('growth = {!r}'.format(growth)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) """ if precision is not None and np.isinf(precision): precision = None # For each item to combine, we merge all unique considered thresholds # into a single array, and then reconstruct the confusion counts with # respect to this new threshold array. tocombine_thresh = [m['thresholds'] for m in tocombine] if ub.iterable(thresh_bins): combo_thresh_asc = thresh_bins nonfinite_vals = [] else: combo_thresh_asc, nonfinite_vals = _combine_threshold( tocombine_thresh, thresh_bins, growth, precision) if nonfinite_vals: # Force inclusion of non-finite thresholds combo_thresh_asc = np.hstack([combo_thresh_asc, nonfinite_vals]) combo_thresh_asc.sort() new_thresh = combo_thresh_asc[::-1] # From this point on, the rest of the logic should (if the # implementation is correct) work with respect to an arbitrary choice # of threshold bins, thus the critical step is choosing what these bins # should be in order to minimize lossyness of the combined accumulated # measures while also keeping a reasonable memory footprint. # Regardless, it might be good to double check this logic. summable = { 'nsupport': 0, 'realpos_total': 0, 'realneg_total': 0, } # Initialize new counts for each entry in the new threshold array # XX_accum[idx] represents the number of *new* XX cases at index idx fp_accum = np.zeros(len(new_thresh)) tp_accum = np.zeros(len(new_thresh)) tn_accum = np.zeros(len(new_thresh)) fn_accum = np.zeros(len(new_thresh)) # For each item to combine, find where its counts should go in the new # combined confusion array. for measures in tocombine: # this thresh is descending thresholds = measures['thresholds'] # XX_pos[idx] is the number of *new* XX cases at index idx at # thresholds[idx] for *these* measures. fp_pos, _, _ = reversable_diff(measures['fp_count']) tp_pos, _, _ = reversable_diff(measures['tp_count']) tn_pos, tn_p, tn_s = reversable_diff(measures['tn_count'], reverse=1) fn_pos, _, _ = reversable_diff(measures['fn_count'], reverse=1) # Find the locations where the thresholds from "these" measures # should be inserted into the combined measures. right_idxs = np.searchsorted(combo_thresh_asc, thresholds, 'right') left_idxs = len(combo_thresh_asc) - right_idxs left_idxs = left_idxs.clip(0, len(combo_thresh_asc) - 1) # Accumulate these values from these measures into the appropriate # new threshold position. note: np.add.at is unbuffered np.add.at(fp_accum, left_idxs, fp_pos) np.add.at(tp_accum, left_idxs, tp_pos) np.add.at(fn_accum, left_idxs, fn_pos) np.add.at(tn_accum, left_idxs, tn_pos) for k in summable: if k in measures: summable[k] += measures[k] new_fp = np.cumsum(fp_accum) # will increase with index new_tp = np.cumsum(tp_accum) # will increase with index new_tn = np.cumsum(tn_accum[::-1])[::-1] # will decrease with index new_fn = np.cumsum(fn_accum[::-1])[::-1] # will decrease with index if -np.inf in nonfinite_vals: # Not sure if this is right... new_fp[-1] = np.inf new_tn[-1] = -np.inf new_info = { 'fp_count': new_fp, 'tp_count': new_tp, 'tn_count': new_tn, 'fn_count': new_fn, 'thresholds': new_thresh, } new_info.update(summable) other = { 'fp_cutoff', 'monotonic_ppv', 'node', 'cx', 'stabalize_thresh', 'trunc_idx' } rest = ub.dict_isect(tocombine[0], other) new_info.update(rest) new_measures = Measures(new_info) return new_measures
[docs] def _combine_threshold(tocombine_thresh, thresh_bins, growth, precision): """ Logic to take care of combining thresholds in the case bins are not given This can be fairly slow and lead to unnecessary memory usage """ orig_thresholds = np.unique(np.hstack(tocombine_thresh)) finite_flags = np.isfinite(orig_thresholds) nonfinite_flags = ~finite_flags if np.any(nonfinite_flags): orig_finite_thresholds = orig_thresholds[finite_flags] # If we have any non-finite threshold values we are stuck with them nonfinite_vals = orig_thresholds[nonfinite_flags] else: orig_finite_thresholds = orig_thresholds nonfinite_vals = [] if thresh_bins is None: if growth is not None: if precision is not None: raise ValueError('dont use precision with growth') # Keep a minimum number of threshold and allow # the resulting metrics to grow by some factor num_per_item = list(map(len, tocombine_thresh)) if growth == 'max': max_num_thresholds = max(num_per_item) elif growth == 'min': max_num_thresholds = min(num_per_item) elif growth == 'log': # Each item after the first is only allowed to contribute # some of its thresholds defined by the base-e-log. modulated_sizes = sorted([np.log(n) for n in num_per_item]) max_num_thresholds = int(round(sum(modulated_sizes[:-1]) + max(num_per_item))) elif growth == 'root': # Each item after the first is only allowed to contribute # some of its thresholds defined by the square root. modulated_sizes = sorted([np.sqrt(n) for n in num_per_item]) max_num_thresholds = int(round(sum(modulated_sizes[:-1]) + max(num_per_item))) elif growth == 'half': # Each item after the first is only allowed to contribute # some of its thresholds, half of them. modulated_sizes = sorted([n / 2 for n in num_per_item]) max_num_thresholds = int(round(sum(modulated_sizes[:-1]) + max(num_per_item))) else: raise KeyError(growth) chosen_idxs = np.linspace(0, len(orig_finite_thresholds) - 1, max_num_thresholds).round().astype(int) chosen_idxs = np.unique(chosen_idxs) combo_thresh_asc = orig_finite_thresholds[chosen_idxs] elif precision is not None: round_thresholds = orig_finite_thresholds.round(precision) combo_thresh_asc = np.unique(round_thresholds) else: combo_thresh_asc = orig_finite_thresholds else: assert growth is None assert precision is None if isinstance(thresh_bins, int): # Fixme, this might not be the correct way to do this. threshold_pool = orig_finite_thresholds # Subdivide threshold pool until we get enough values while thresh_bins > len(threshold_pool): # x = [1, 2, 3, 4, 5] # x = [1, 2, 3, 4, 5, 6] # even_slice = threshold_pool[:(len(threshold_pool) // 2) * 2] # odd_slice = threshold_pool[1:(len(threshold_pool) // 2) * 2] x = threshold_pool even_slice = x[:(len(x) // 2) * 2] odd_slice = x[1:len(x) - (len(x) % 2 == 0)] mean_vals = np.r_[[0, threshold_pool[0]], even_slice, odd_slice, [1, threshold_pool[-1]]].reshape(-1, 2).mean(axis=1) threshold_pool = np.hstack([mean_vals, threshold_pool]) threshold_pool = np.unique(threshold_pool) chosen_idxs = np.linspace(0, len(threshold_pool) - 1, thresh_bins).round().astype(int) chosen_idxs = np.unique(chosen_idxs) combo_thresh_asc = threshold_pool[chosen_idxs] else: raise TypeError return combo_thresh_asc, nonfinite_vals
[docs] def reversable_diff(arr, assume_sorted=1, reverse=False): """ Does a reversible array difference operation. This will be used to find positions where accumulation happened in confusion count array. Args: arr (ndarray): Input sequence (finite interior; may have +/-inf at ends). assume_sorted (int): Reserved; asserts the monotone assumption. Default is True. reverse (bool): If True, treat ``arr`` as reversed; outputs are adjusted accordingly. Defaults to False. Returns: Tuple[np.ndarray, np.ndarray, np.ndarray] ``(diff_arr, prefix, suffix)``. To invert: - if ``reverse=False``: ``recon = np.cumsum(diff_arr); recon[:len(prefix)] += prefix; recon[-len(suffix):] += suffix`` - if ``reverse=True``: apply the same idea with reversed arrays. Ignore: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> funcs = { >>> 'at_zero': [0, 3, 9, 43, 333], >>> 'at_nonzero': [2, 3, 9, 43, 333], >>> 'at_negative': [-2, 3, 9, 43, 333], >>> 'at_neginf': [-np.inf, 3, 9, 43, 333], >>> 'at_dblinf': [-np.inf, -np.inf, 3, 9, 43, 333, np.inf, np.inf], >>> } >>> for k, arr in funcs.items(): >>> arr = np.asarray(arr) + 30 >>> diff_arr, prefix, suffix = reversable_diff(arr) >>> recon_arr = np.cumsum(diff_arr) >>> recon_arr[0:len(prefix)] += prefix >>> recon_arr[len(recon_arr) - len(suffix):] += suffix >>> print('k = {!r}'.format(k)) >>> print('diff_arr = {!r}'.format(diff_arr)) >>> print('arr = {!r}'.format(arr)) >>> print('recon_arr = {!r}'.format(recon_arr)) >>> for k, arr in funcs.items(): >>> arr = np.asarray(arr)[::-1] + 30 >>> diff_arr, prefix, suffix = reversable_diff(arr, reverse=True) >>> recon_arr = np.cumsum(diff_arr[::-1])[::-1] >>> recon_arr[0:len(prefix)] += prefix >>> recon_arr[len(recon_arr) - len(suffix):] += suffix >>> print('k = {!r}'.format(k)) >>> print('diff_arr = {!r}'.format(diff_arr)) >>> print('arr = {!r}'.format(arr)) >>> print('recon_arr = {!r}'.format(recon_arr)) """ import math assert assume_sorted if len(arr) == 0: raise ValueError('todo: default value for empty') if reverse: arr = arr[::-1] n = len(arr) last_idx = n - 1 first_finite_idx = 0 for first_finite_idx, v in zip(range(n), arr): if math.isfinite(v): break last_finite_idx = n for last_finite_idx, v in zip(range(last_idx, -1, -1), arr[::-1]): if math.isfinite(v): break suffix_len = last_idx - last_finite_idx prefix_len = first_finite_idx offset = arr[first_finite_idx] # This is + C prefix = arr[:first_finite_idx] suffix = arr[last_finite_idx + 1:] finite_body = arr[first_finite_idx:last_finite_idx + 1] diff_arr = np.r_[[0] * prefix_len, [offset], np.diff(finite_body), [0] * suffix_len] # The goal is to be able to recon perfectly def invert(diff_arr): recon_arr = np.cumsum(diff_arr) recon_arr[0:len(prefix)] += prefix recon_arr[len(recon_arr) - len(suffix):] += suffix return recon_arr if reverse: diff_arr = diff_arr[::-1] prefix, suffix = suffix[::-1], prefix[::-1] return diff_arr, prefix, suffix
[docs] class PerClass_Measures(ub.NiceRepr, DictProxy): """ A container class mapping categories to :class:`Measures`. """ def __init__(self, cx_to_info): self.proxy = cx_to_info def __nice__(self): return ub.urepr(self.proxy, nl=2, strvals=True, align=':')
[docs] def summary(self): return {k: v.summary() for k, v in self.items()}
[docs] @classmethod def from_json(cls, state): state = ub.map_vals(Measures.from_json, state) self = cls(state) return self
def __json__(self): return {k: v.__json__() for k, v in self.items()}
[docs] def draw(self, key='mcc', prefix='', **kw): """ Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> from kwcoco.metrics.confusion_vectors import ConfusionVectors # NOQA >>> cfsn_vecs = ConfusionVectors.demo() >>> ovr_cfsn = cfsn_vecs.binarize_ovr(keyby='name') >>> self = ovr_cfsn.measures()['perclass'] >>> self.draw('mcc', doclf=True, fnum=1) >>> self.draw('pr', doclf=1, fnum=2) >>> self.draw('roc', doclf=1, fnum=3) """ from kwcoco.metrics import drawing if key == 'pr': return drawing.draw_perclass_prcurve(self, prefix=prefix, **kw) elif key == 'roc': return drawing.draw_perclass_roc(self, prefix=prefix, **kw) else: return drawing.draw_perclass_thresholds( self, key=key, prefix=prefix, **kw)
[docs] def draw_roc(self, prefix='', **kw): from kwcoco.metrics import drawing return drawing.draw_perclass_roc(self, prefix=prefix, **kw)
[docs] def draw_pr(self, prefix='', **kw): from kwcoco.metrics import drawing return drawing.draw_perclass_prcurve(self, prefix=prefix, **kw)
[docs] def summary_plot(self, fnum=1, title='', subplots='auto'): """ CommandLine: python ~/code/kwcoco/kwcoco/metrics/confusion_measures.py PerClass_Measures.summary_plot --show Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.detect_metrics import DetectionMetrics >>> dmet = DetectionMetrics.demo( >>> n_fp=(0, 1), n_fn=(0, 3), nimgs=32, nboxes=(0, 32), >>> classes=3, rng=0, newstyle=1, box_noise=0.7, cls_noise=0.2, score_noise=0.3, with_probs=False) >>> cfsn_vecs = dmet.confusion_vectors() >>> ovr_cfsn = cfsn_vecs.binarize_ovr(keyby='name', ignore_classes=['vector', 'raster']) >>> self = ovr_cfsn.measures()['perclass'] >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> import seaborn as sns >>> sns.set() >>> self.summary_plot(title='demo summary_plot ovr', subplots=['pr', 'roc']) >>> kwplot.show_if_requested() >>> self.summary_plot(title='demo summary_plot ovr', subplots=['mcc', 'acc'], fnum=2) """ if subplots == 'auto' or (isinstance(subplots, int) and subplots): subplots = ['pr', 'roc', 'mcc', 'f1', 'acc'] import kwplot pnum_ = kwplot.PlotNums(nSubplots=len(subplots)) kwplot.figure(fnum=fnum, doclf=True, figtitle=title) for key in subplots: self.draw(key, fnum=fnum, pnum=pnum_())
[docs] class MeasureCombiner: """ Helper to iteravely combine binary measures generated by some process Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors >>> rng = kwarray.ensure_rng(0) >>> bin_combiner = MeasureCombiner(growth='max') >>> for _ in range(80): >>> bin_cfsn_vecs = BinaryConfusionVectors.demo(n=rng.randint(40, 50), rng=rng, p_true=0.2, p_error=0.4, p_miss=0.6) >>> bin_measures = bin_cfsn_vecs.measures() >>> bin_combiner.submit(bin_measures) >>> combined = bin_combiner.finalize() >>> print('combined = {!r}'.format(combined)) """ def __init__(self, precision=None, growth=None, thresh_bins=None): """ Args: precision (None | int): deprecated use growth or thresh_bins instead. growth (str | None): if specified this limits how much the resulting measures are allowed to grow by. If None, growth is unlimited. Otherwise, if growth is 'max', the growth is limited to the maximum length of an input. We might make this more numerical in the future. thresh_bins (int | List[float] | None): If an integer force this many threshold bins, or if a list then use these threshold bins. """ self.measures = None self.growth = growth self.thresh_bins = thresh_bins self.precision = precision self.queue = [] @property def queue_size(self): return len(self.queue)
[docs] def submit(self, other): """ Queue another set of measures for combination. Args: other (BinaryConfusionVectors): confusion vectors for an item """ self.queue.append(other)
[docs] def combine(self): """ Reduce the current queue into a consolidated ``self.measures``. """ # Reduce measures over the chunk if self.measures is None: to_combine = self.queue else: to_combine = [self.measures] + self.queue if len(to_combine) == 0: pass elif len(to_combine) == 1: self.measures = to_combine[0] else: self.measures = Measures.combine( to_combine, precision=self.precision, growth=self.growth, thresh_bins=self.thresh_bins) self.queue = []
[docs] def finalize(self): """ Combine any remaining items, reconstruct metrics, and return the result. Returns: Measures: the combined measures """ if self.queue: self.combine() if self.measures is None: # Note: might want to return None instead? return False else: self.measures.reconstruct() return self.measures
[docs] class OneVersusRestMeasureCombiner: """ Helper to iteravely combine ovr measures generated by some process Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.confusion_vectors import OneVsRestConfusionVectors >>> rng = kwarray.ensure_rng(0) >>> ovr_combiner = OneVersusRestMeasureCombiner(growth='max') >>> for _ in range(80): >>> ovr_cfsn_vecs = OneVsRestConfusionVectors.demo() >>> ovr_measures = ovr_cfsn_vecs.measures() >>> ovr_combiner.submit(ovr_measures) >>> combined = ovr_combiner.finalize() >>> print('combined = {!r}'.format(combined)) """ def __init__(self, precision=None, growth=None, thresh_bins=None): self.catname_to_combiner = {} self.precision = precision self.growth = precision self.thresh_bins = thresh_bins self.queue_size = 0
[docs] def submit(self, other): self.queue_size += 1 for catname, other_m in other['perclass'].items(): if catname not in self.catname_to_combiner: combiner = MeasureCombiner( precision=self.precision, growth=self.growth, thresh_bins=self.thresh_bins) self.catname_to_combiner[catname] = combiner self.catname_to_combiner[catname].submit(other_m)
[docs] def _summary(self): for catname, combiner in self.catname_to_combiner.items(): # combiner summary # combiner.measures if combiner.measures is not None: combiner.measures.reconstruct() for qx, measure in enumerate(combiner.queue): measure.reconstruct() print(' * queue[{}] = {}'.format(qx, ub.urepr(measure, nl=1)))
[docs] def combine(self): for combiner in self.catname_to_combiner.values(): combiner.combine() self.queue_size = 0
[docs] def finalize(self): catname_to_measures = {} for catname, combiner in self.catname_to_combiner.items(): catname_to_measures[catname] = combiner.finalize() perclass = PerClass_Measures(catname_to_measures) # TODO: consolidate in kwcoco with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='Mean of empty slice') mAUC = np.nanmean([item['trunc_auc'] for item in perclass.values()]) mAP = np.nanmean([item['ap'] for item in perclass.values()]) compatible_format = { 'perclass': perclass, 'mAUC': mAUC, 'mAP': mAP, } return compatible_format
[docs] def populate_info(info): """ Given raw accumulated confusion counts, populated secondary measures like AP, AUC, F1, MCC, etc.. """ info['tp_count'] = tp = np.array(info['tp_count']) info['fp_count'] = fp = np.array(info['fp_count']) has_tn = 'tn_count' in info if has_tn: info['tn_count'] = tn = np.array(info['tn_count']) info['fn_count'] = fn = np.array(info['fn_count']) info['thresholds'] = thresh = np.array(info['thresholds']) realpos_total = info.get('realpos_total', None) if realpos_total is None: realpos_total = info['tp_count'][-1] + info['fn_count'][-1] info['realpos_total'] = realpos_total realneg_total = info.get('realneg_total', None) if realneg_total is None: if has_tn: realneg_total = info['tn_count'][-1] + info['fp_count'][-1] info['realneg_total'] = realneg_total nsupport = info.get('nsupport', None) if nsupport is None: info['nsupport'] = nsupport = realneg_total + realpos_total monotonic_ppv = info.get('monotonic_ppv', True) info['monotonic_ppv'] = monotonic_ppv finite_flags = np.isfinite(thresh) trunc_fp = fp # Cutoff the curves at a comparable point fp_cutoff = info.get('fp_cutoff', None) if fp_cutoff is None: fp_cutoff = np.inf elif isinstance(fp_cutoff, str): if fp_cutoff == 'num_true': fp_cutoff = int(np.ceil(realpos_total)) else: raise KeyError(fp_cutoff) if np.isfinite(fp_cutoff): idxs = np.where(trunc_fp > fp_cutoff)[0] if len(idxs) == 0: trunc_idx = len(trunc_fp) else: trunc_idx = idxs[0] else: trunc_idx = None if trunc_idx is None and np.any(np.isinf(fp)): idxs = np.where(np.isinf(fp))[0] if len(idxs): trunc_idx = idxs[0] if trunc_idx is None: trunc_fp = fp trunc_tp = tp trunc_thresholds = thresh else: trunc_fp = fp[:trunc_idx] trunc_tp = tp[:trunc_idx] trunc_thresholds = thresh[:trunc_idx] # if the cuttoff was not reached, horizontally extend the curve # This will hurt the scores (aka we may be bias against small # scenes), but this will ensure that big scenes are comparable from kwcoco.metrics import assignment if len(trunc_fp) == 0: trunc_fp = np.array([fp_cutoff]) trunc_tp = np.array([0]) if assignment.USE_NEG_INF: trunc_thresholds = np.array([-np.inf]) else: trunc_thresholds = np.array([0]) # THIS WILL CAUSE AUC TO RAISE AN ERROR IF IT GETS HIT elif trunc_fp[-1] <= fp_cutoff and np.isfinite(fp_cutoff): trunc_fp = np.hstack([trunc_fp, [fp_cutoff]]) trunc_tp = np.hstack([trunc_tp, [trunc_tp[-1]]]) if assignment.USE_NEG_INF: trunc_thresholds = np.hstack([trunc_thresholds, [-np.inf]]) else: trunc_thresholds = np.hstack([trunc_thresholds, [0]]) info['trunc_idx'] = trunc_idx info['trunc_fp_count'] = trunc_fp info['trunc_tp_count'] = trunc_tp info['trunc_thresholds'] = trunc_thresholds with warnings.catch_warnings(): # It is very possible that we will divide by zero in this func warnings.filterwarnings('ignore', message='invalid .* true_divide') warnings.filterwarnings('ignore', message='invalid value') warnings.filterwarnings('ignore', message='divide by zero', category=RuntimeWarning) pred_pos = (tp + fp) # number of predicted positives ppv = tp / pred_pos # precision ppv[np.isnan(ppv)] = 0 if monotonic_ppv: # trick to make precision monotonic (which is probably not entirely # correct) ppv = np.maximum.accumulate(ppv[::-1])[::-1] # can set tpr_denom denominator to one tpr_denom = (tp + fn) # tpr_denom[~(tpr_denom > 0)] = 1 tpr = tp / tpr_denom # recall debug = 0 if debug: assert ub.allsame(tpr_denom), 'tpr denom should be constant' # tpr_denom should be equal to info['realpos_total'] if np.any(tpr_denom != info['realpos_total']): warnings.warn('realpos_total is inconsistent') if has_tn: tnr_denom = (tn + fp) tnr_denom[tnr_denom == 0] = 1 tnr = tn / tnr_denom pnv_denom = (tn + fn) pnv_denom[pnv_denom == 0] = 1 npv = tn / pnv_denom info['ppv'] = ppv info['tpr'] = tpr ppv_mul_tpr = ppv * tpr # fpr_denom is a proxy for fp + tn as tn is generally unknown in # the case where all negatives are specified in the confusion # vectors fpr_denom will be exactly (fp + tn) # fpr = fp / (fp + tn) finite_fp = fp[np.isfinite(fp)] fpr_denom = finite_fp[-1] if len(finite_fp) else 0 if fpr_denom == 0: fpr_denom = 1 fpr = info['fp_count'] / fpr_denom info['fpr'] = fpr if has_tn: info['bm'] = tpr + tnr - 1 # informedness info['mk'] = ppv + npv - 1 # markedness tp_add_tn = tp + tn # info['acc'] = (tp + tn) / (tp + tn + fp + fn) info['acc'] = (tp_add_tn) / (tp_add_tn + fp + fn) # https://en.wikipedia.org/wiki/Matthews_correlation_coefficient # mcc_numer = (tp * tn) - (fp * fn) # mcc_denom = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn)) # mcc_denom[np.isnan(mcc_denom) | (mcc_denom == 0)] = 1 # info['mcc'] = mcc_numer / mcc_denom real_pos = fn + tp # number of real positives p_denom = real_pos.copy() p_denom[p_denom == 0] = 1 fnr = fn / p_denom # miss-rate fdr = 1 - ppv # false discovery rate fmr = 1 - npv # false omission rate (for) info['tnr'] = tnr info['npv'] = npv # info['mcc'] = np.sqrt(ppv * tpr * tnr * npv) - np.sqrt(fdr * fnr * fpr * fmr) info['mcc'] = np.sqrt(ppv_mul_tpr * tnr * npv) - np.sqrt(fdr * fnr * fpr * fmr) # f1_numer = (2 * ppv * tpr) # NOTE: F1 might be impacted by the monotonic flag # might need to refine that metric, probably in a backwards compatible # way f1_numer = (2 * ppv_mul_tpr) f1_denom = (ppv + tpr) f1_denom[f1_denom == 0] = 1 info['f1'] = f1_numer / f1_denom if 0: # The IoU (or Jaccard) is directly related to F1 if we want it. f1 = info['f1'] info['jaccard'] = f1 / (2 - f1) # https://erotemic.wordpress.com/2019/10/23/closed-form-of-the-mcc-when-tn-inf/ # https://arxiv.org/abs/2305.00594 # info['g1'] = np.sqrt(ppv * tpr) info['g1'] = np.sqrt(ppv_mul_tpr) # Report the threshold that maximizes wholistic quality metrics as well # as the contextural metrics at that threshold. keys = ['mcc', 'g1', 'f1', 'acc'] sub_keys = ['ppv', 'tpr', 'fpr', 'tnr', 'bm', 'mk', 'mcc', 'g1', 'f1', 'acc'] finite_thresh = thresh[finite_flags] for key in keys: if key in info: measure = info[key][finite_flags] try: max_idx = np.nanargmax(measure) except ValueError: best_thresh = np.nan best_measure = np.nan best_submeasures = {k: np.nan for k in sub_keys} best_submeasures['thresh'] = best_thresh else: best_thresh = float(finite_thresh[max_idx]) best_measure = float(measure[max_idx]) best_submeasures = {k: info[k][finite_flags][max_idx] for k in sub_keys} best_submeasures['thresh'] = best_thresh best_label = '{}={:0.2f}@{:0.2f}'.format(key, best_measure, best_thresh) info['max_{}'.format(key)] = best_label info['max_{}_submeasures'.format(key)] = best_submeasures info['_max_{}'.format(key)] = (best_measure, best_thresh) import sklearn.metrics # NOQA finite_trunc_fp = info['trunc_fp_count'] finite_trunc_fp = finite_trunc_fp[np.isfinite(finite_trunc_fp)] trunc_fpr_denom = finite_trunc_fp[-1] if len(finite_trunc_fp) else 0 if trunc_fpr_denom == 0: trunc_fpr_denom = 1 info['trunc_tpr'] = info['trunc_tp_count'] / info['realpos_total'] info['trunc_fpr'] = info['trunc_fp_count'] / trunc_fpr_denom try: info['trunc_auc'] = sklearn.metrics.auc(info['trunc_fpr'], info['trunc_tpr']) except ValueError: # At least 2 points are needed to compute area under curve, but x.shape = 1 info['trunc_auc'] = np.nan if len(info['trunc_fpr']) == 1 and len(info['trunc_tpr']) == 1: if info['trunc_fpr'][0] == 0 and info['trunc_tpr'][0] == 1: # Hard code AUC in the perfect detection case info['trunc_auc'] = 1.0 info['auc'] = info['trunc_auc'] """ Note: Apparently, consistent scoring is really hard to get right. For detection problems scoring via confusion_vectors+sklearn produces noticeably different results than the VOC method. There are a few reasons for this. The VOC method stops counting true positives after all assigned predicted boxes have been counted. It simply remembers the amount of original true positives to normalize the true positive rate. On the other hand, confusion vectors maintains a list of these unassigned true boxes and gives them a predicted index of -1 and a score of zero. This means that this function sees them as having a y_true of 1 and a y_score of 0, which allows the scikit-learn fp and tp counts to effectively get up to 100% recall when the threshold is zero. The VOC method simply ignores these and handles them implicitly. The problem is that if you remove these from the scikit-learn inputs, it wont see the correct number of positives and it will incorrectly normalize the recall. In summary: VOC: * remembers realpos_total * doesn't count unassigned truths as TP when the threshold is zero. CV+SKL: * counts unassigned truths as TP with score=0. * NEW: now counts unassigned truth as TP with score=-np.inf * Always ensure tpr=1, ppv=0 and ppv=1, tpr=0 cases exist. """ # --- # sklearn definition # AP = sum((R[n] - R[n - 1]) * P[n] for n in range(len(thresholds))) # stop when full recall attained SKLISH_AP = 1 if SKLISH_AP: last_ind = tpr.searchsorted(tpr[-1]) rec = np.r_[0, tpr[:last_ind + 1]] prec = np.r_[1, ppv[:last_ind + 1]] # scores = np.r_[0, thresh[:last_ind + 1]] # Precisions are weighted by the change in recall diff_items = np.diff(rec) prec_items = prec[1:] # baseline way info['sklish_ap'] = float(np.sum(diff_items * prec_items)) PYCOCOTOOLS_AP = True if PYCOCOTOOLS_AP: # similar to pycocotools style "AP" R = 101 feasible_idxs = np.where(thresh > -np.inf)[0] if len(feasible_idxs) == 0: info['pycocotools_ap'] = np.nan else: feasible_tpr = tpr[feasible_idxs[-1]] last_ind = tpr.searchsorted(feasible_tpr) rc = tpr[:last_ind + 1] pr = ppv[:last_ind + 1] recThrs = np.linspace(0, 1.0, R) inds = np.searchsorted(rc, recThrs, side='left') q = np.zeros((R,)) # ss = np.zeros((R,)) try: for ri, pi in enumerate(inds): q[ri] = pr[pi] # ss[ri] = scores[pi] except Exception: pass pycocotools_ap = q.mean() info['pycocotools_ap'] = float(pycocotools_ap) OUTLIER_AP = 1 if OUTLIER_AP: # Remove extreme outliers from ap calculation # only do this on the first or last 2 items. # Hueristically chosen. flags = diff_items > 0.1 idxs = np.where(flags)[0] max_idx = len(flags) - 1 idx_thresh = 2 try: idx_dist = np.minimum(idxs, max_idx - idxs) outlier_idxs = idxs[idx_dist < idx_thresh] outlier_flags = kwarray.boolmask(outlier_idxs, len(diff_items)) inlier_flags = ~outlier_flags score = prec_items.copy() score[outlier_flags] = score[inlier_flags].min() score[outlier_flags] = 0 ap = outlier_ap = np.sum(score * diff_items) info['outlier_ap'] = float(outlier_ap) except Exception: pass INF_THRESH_AP = 1 if INF_THRESH_AP: # This may become the de-facto scikit-learn implementation in the # future. from scipy import integrate # MODIFIED SKLEARN AVERAGE PRECISION FOR -INF THRESH # # Better way of marked unassigned truth as never-recallable. # We need to prevent these unassigned truths from incorrectly # bringing up our true positive rate at low thresholds. # # Simply bump last_ind to ensure it is feasible_idxs = np.where(thresh > -np.inf)[0] if len(feasible_idxs) == 0: info['sklearn_ap'] = np.nan else: feasible_tpr = tpr[feasible_idxs[-1]] last_ind = tpr.searchsorted(feasible_tpr) rec = np.r_[0, tpr[:last_ind + 1]] prec = np.r_[1, ppv[:last_ind + 1]] diff_items = np.diff(rec) prec_items = prec[1:] # Not sure which is beset here, we no longer have # assumption of max-tpr = 1 # ap = float(np.sum(diff_items * prec_items)) try: info['sklearn_ap'] = integrate.trapezoid(y=prec, x=rec) except AttributeError: info['sklearn_ap'] = integrate.trapz(y=prec, x=rec) info['ap_method'] = info.get('ap_method', 'pycocotools') if info['ap_method'] == 'pycocotools': ap = info['pycocotools_ap'] elif info['ap_method'] == 'outlier': ap = info['outlier_ap'] elif info['ap_method'] == 'sklearn': ap = info['sklearn_ap'] elif info['ap_method'] == 'sklish': ap = info['sklish_ap'] else: raise KeyError(info['ap_method']) # print('ap = {!r}'.format(ap)) # print('ap = {!r}'.format(ap)) # ap = np.sum(np.diff(rec) * prec[1:]) info['ap'] = float(ap)