Source code for kwcoco.metrics.confusion_measures

"""
Classes that store accumulated confusion measures (usually derived from
confusion vectors).


For each chosen threshold value:
    * thresholds[i] - the i-th threshold value


The primary data we manipulate are arrays of "confusion" counts, i.e.

    * tp_count[i] - true positives at the i-th threshold
    * fp_count[i] - false positives at the i-th threshold
    * fn_count[i] - false negatives at the i-th threshold
    * tn_count[i] - true negatives at the i-th threshold

"""
import kwarray
import numpy as np
import ubelt as ub
import warnings
from kwcoco.util.dict_like import DictProxy


try:
    from xdev import profile
except Exception:
    profile = ub.identity


[docs] class Measures(ub.NiceRepr, DictProxy): """ Holds accumulated confusion counts, and derived measures Example: >>> from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors # NOQA >>> binvecs = BinaryConfusionVectors.demo(n=100, p_error=0.5) >>> self = binvecs.measures() >>> print('self = {!r}'.format(self)) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> self.draw(doclf=True) >>> self.draw(key='pr', pnum=(1, 2, 1)) >>> self.draw(key='roc', pnum=(1, 2, 2)) >>> kwplot.show_if_requested() """ def __init__(self, info): self.proxy = info @property def catname(self): return self.get('node', None) def __nice__(self): info = self.summary() return ub.urepr(info, nl=0, precision=3, strvals=True, align=':')
[docs] def reconstruct(self): populate_info(info=self) return self
[docs] @classmethod def from_json(cls, state): populate_info(state) return cls(state)
def __json__(self): """ Example: >>> from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors # NOQA >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> binvecs = BinaryConfusionVectors.demo(n=10, p_error=0.5) >>> self = binvecs.measures() >>> info = self.__json__() >>> print('info = {}'.format(ub.urepr(info, nl=1))) >>> populate_info(info) >>> print('info = {}'.format(ub.urepr(info, nl=1))) >>> recon = Measures.from_json(info) """ from kwcoco.util.util_json import ensure_json_serializable state = {} minimal = { 'fp_count', 'tp_count', 'tn_count', 'fn_count', 'thresholds', 'realpos_total', 'realneg_total', 'trunc_idx', 'nsupport', 'fp_cutoff', 'stabalize_thresh', 'cx', 'node', 'monotonic_ppv', } nice_to_have = { # recomputable # 'mcc', 'g1', 'f1', 'ppv', 'tpr', 'fpr', 'acc', 'bm', 'mk', # 'max_mcc', '_max_mcc', 'max_g1', '_max_g1', 'max_f1', # '_max_f1', 'max_acc', '_max_acc', # 'trunc_tpr', 'trunc_fpr', 'trunc_auc', 'auc', 'ap', # 'sklish_ap', 'pycocotools_ap', # 'outlier_ap', 'sklearn', } state = ub.dict_isect(self.proxy, minimal | nice_to_have) state = ensure_json_serializable(state) return state
[docs] def summary(self): return { 'ap': self.get('ap', None), 'auc': self.get('auc', None), # 'max_mcc': self['max_mcc'], 'max_f1': self.get('max_f1', None), # 'max_g1': self['max_g1'], 'nsupport': self.get('nsupport', None), 'realpos_total': self.get('realpos_total', None), # 'realneg_total': self['realneg_total'], 'catname': self.get('node', None), }
[docs] def maximized_thresholds(self): """ Returns thresholds that maximize metrics. """ # TODO: clean up the underlying storage of this info maximized_info = {} for k in self.keys(): if k.startswith('_max_'): v = self[k] metric_value, thresh_value = v metric_name = k.split('_max_', 1)[1] maximized_info[metric_name] = { 'thresh': thresh_value, 'metric_value': metric_value, 'metric_name': metric_name, } return maximized_info
[docs] def counts(self): counts_df = ub.dict_isect(self, [ 'fp_count', 'tp_count', 'tn_count', 'fn_count', 'thresholds']) counts_df = kwarray.DataFrameArray(counts_df) return counts_df
[docs] def draw(self, key=None, prefix='', **kw): """ Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> # xdoctest: +REQUIRES(module:pandas) >>> from kwcoco.metrics.confusion_vectors import ConfusionVectors # NOQA >>> cfsn_vecs = ConfusionVectors.demo() >>> ovr_cfsn = cfsn_vecs.binarize_ovr(keyby='name') >>> self = ovr_cfsn.measures()['perclass'] >>> self.draw('mcc', doclf=True, fnum=1) >>> self.draw('pr', doclf=1, fnum=2) >>> self.draw('roc', doclf=1, fnum=3) """ from kwcoco.metrics import drawing if key is None or key == 'thresh': return drawing.draw_threshold_curves(self, prefix=prefix, **kw) elif key == 'pr': return drawing.draw_prcurve(self, prefix=prefix, **kw) elif key == 'roc': return drawing.draw_roc(self, prefix=prefix, **kw)
[docs] def summary_plot(self, fnum=1, title='', subplots='auto'): """ Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.confusion_vectors import ConfusionVectors # NOQA >>> cfsn_vecs = ConfusionVectors.demo(n=3, p_error=0.5) >>> binvecs = cfsn_vecs.binarize_classless() >>> self = binvecs.measures() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> self.summary_plot() >>> kwplot.show_if_requested() """ if subplots == 'auto': subplots = ['pr', 'roc', 'thresh'] import kwplot kwplot.figure(fnum=fnum, figtitle=title) pnum_ = kwplot.PlotNums(nSubplots=len(subplots)) for key in subplots: kwplot.figure(fnum=fnum, pnum=pnum_()) self.draw(key, fnum=fnum)
# kwplot.figure(fnum=fnum, pnum=(1, 3, 2)) # self.draw('roc') # kwplot.figure(fnum=fnum, pnum=(1, 3, 3)) # self.draw('thresh', keys=['mcc', 'f1', 'acc'])
[docs] @classmethod def demo(cls, **kwargs): """ Create a demo Measures object for testing / demos Args: **kwargs: passed to :func:`BinaryConfusionVectors.demo`. some valid keys are: n, rng, p_rue, p_error, p_miss. """ from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors bin_cfsn = BinaryConfusionVectors.demo(**kwargs) measures = bin_cfsn.measures() return measures
[docs] @classmethod @profile def combine(cls, tocombine, precision=None, growth=None, thresh_bins=None): """ Combine binary confusion metrics Args: tocombine (List[Measures]): a list of measures to combine into one precision (int | None): If specified rounds thresholds to this precision which can prevent a RAM explosion when combining a large number of measures. However, this is a lossy operation and will impact the underlying scores. NOTE: use ``growth`` instead. growth (int | None): if specified this limits how much the resulting measures are allowed to grow by. If None, growth is unlimited. Otherwise, if growth is 'max', the growth is limited to the maximum length of an input. We might make this more numerical in the future. thresh_bins (int | None): Force this many threshold bins. Returns: kwcoco.metrics.confusion_measures.Measures Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> measures1 = Measures.demo(n=15) >>> measures2 = measures1 >>> tocombine = [measures1, measures2] >>> new_measures = Measures.combine(tocombine) >>> new_measures.reconstruct() >>> print('new_measures = {!r}'.format(new_measures)) >>> print('measures1 = {!r}'.format(measures1)) >>> print('measures2 = {!r}'.format(measures2)) >>> print(ub.urepr(measures1.__json__(), nl=1, sort=0)) >>> print(ub.urepr(measures2.__json__(), nl=1, sort=0)) >>> print(ub.urepr(new_measures.__json__(), nl=1, sort=0)) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.figure(fnum=1) >>> new_measures.summary_plot() >>> measures1.summary_plot() >>> measures1.draw('roc') >>> measures2.draw('roc') >>> new_measures.draw('roc') Ignore: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> rng = kwarray.ensure_rng(0) >>> tocombine = [ >>> Measures.demo(n=rng.randint(40, 50), rng=rng, p_true=0.2, p_error=0.4, p_miss=0.6) >>> for _ in range(80) >>> ] >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> for idx, growth in enumerate([None, 'max', 'log', 'root', 'half']): >>> combo = Measures.combine(tocombine, growth=growth).reconstruct() >>> print('growth = {!r}'.format(growth)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> combo.summary_plot(fnum=idx + 1, title=str(growth)) Example: >>> # Demonstrate issues that can arrise from choosing a precision >>> # that is too low when combining metrics. Breakpoints >>> # between different metrics can get muddled, but choosing a >>> # precision that is too high can overwhelm memory. >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> base = ub.map_vals(np.asarray, { >>> 'tp_count': [ 1, 1, 2, 2, 2, 2, 3], >>> 'fp_count': [ 0, 1, 1, 2, 3, 4, 5], >>> 'fn_count': [ 1, 1, 0, 0, 0, 0, 0], >>> 'tn_count': [ 5, 4, 4, 3, 2, 1, 0], >>> 'thresholds': [.0, .0, .0, .0, .0, .0, .0], >>> }) >>> # Make tiny offsets to thresholds >>> rng = kwarray.ensure_rng(0) >>> n = len(base['thresholds']) >>> offsets = [ >>> sorted(rng.rand(n) * 10 ** -rng.randint(4, 7))[::-1] >>> for _ in range(20) >>> ] >>> tocombine = [] >>> for offset in offsets: >>> base_n = base.copy() >>> base_n['thresholds'] += offset >>> measures_n = Measures(base_n).reconstruct() >>> tocombine.append(measures_n) >>> for precision in [6, 5, 2]: >>> combo = Measures.combine(tocombine, precision=precision).reconstruct() >>> print('precision = {!r}'.format(precision)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> for growth in [None, 'max', 'log', 'root', 'half']: >>> combo = Measures.combine(tocombine, growth=growth).reconstruct() >>> print('growth = {!r}'.format(growth)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> #print(combo.counts().pandas()) Example: >>> # Test case: combining a single measures should leave it unchanged >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> measures = Measures.demo(n=40, p_true=0.2, p_error=0.4, p_miss=0.6) >>> df1 = measures.counts().pandas().fillna(0) >>> print(df1) >>> tocombine = [measures] >>> combo = Measures.combine(tocombine) >>> df2 = combo.counts().pandas().fillna(0) >>> print(df2) >>> assert np.allclose(df1, df2) >>> combo = Measures.combine(tocombine, thresh_bins=2) >>> df3 = combo.counts().pandas().fillna(0) >>> print(df3) >>> # I am NOT sure if this is correct or not >>> thresh_bins = 20 >>> combo = Measures.combine(tocombine, thresh_bins=thresh_bins) >>> df4 = combo.counts().pandas().fillna(0) >>> print(df4) >>> combo = Measures.combine(tocombine, thresh_bins=np.linspace(0, 1, 20)) >>> df4 = combo.counts().pandas().fillna(0) >>> print(df4) assert np.allclose(combo['thresholds'], measures['thresholds']) assert np.allclose(combo['fp_count'], measures['fp_count']) assert np.allclose(combo['tp_count'], measures['tp_count']) assert np.allclose(combo['tp_count'], measures['tp_count']) globals().update(xdev.get_func_kwargs(Measures.combine)) Example: >>> # Test degenerate case >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> tocombine = [ >>> {'fn_count': [0.0], 'fp_count': [359980.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7747.0]}, >>> {'fn_count': [0.0], 'fp_count': [360849.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [424.0]}, >>> {'fn_count': [0.0], 'fp_count': [367003.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [991.0]}, >>> {'fn_count': [0.0], 'fp_count': [367976.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [1017.0]}, >>> {'fn_count': [0.0], 'fp_count': [676338.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7067.0]}, >>> {'fn_count': [0.0], 'fp_count': [676348.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7406.0]}, >>> {'fn_count': [0.0], 'fp_count': [676626.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [7858.0]}, >>> {'fn_count': [0.0], 'fp_count': [676693.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [10969.0]}, >>> {'fn_count': [0.0], 'fp_count': [677269.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11188.0]}, >>> {'fn_count': [0.0], 'fp_count': [677331.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11734.0]}, >>> {'fn_count': [0.0], 'fp_count': [677395.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11556.0]}, >>> {'fn_count': [0.0], 'fp_count': [677418.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11621.0]}, >>> {'fn_count': [0.0], 'fp_count': [677422.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [11424.0]}, >>> {'fn_count': [0.0], 'fp_count': [677648.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [9804.0]}, >>> {'fn_count': [0.0], 'fp_count': [677826.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [2470.0]}, >>> {'fn_count': [0.0], 'fp_count': [677834.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [2470.0]}, >>> {'fn_count': [0.0], 'fp_count': [677835.0], 'thresholds': [0.0], 'tn_count': [0.0], 'tp_count': [2470.0]}, >>> {'fn_count': [11123.0, 0.0], 'fp_count': [0.0, 676754.0], 'thresholds': [0.0002442002442002442, 0.0], 'tn_count': [676754.0, 0.0], 'tp_count': [2.0, 11125.0]}, >>> {'fn_count': [7738.0, 0.0], 'fp_count': [0.0, 676466.0], 'thresholds': [0.0002442002442002442, 0.0], 'tn_count': [676466.0, 0.0], 'tp_count': [0.0, 7738.0]}, >>> {'fn_count': [8653.0, 0.0], 'fp_count': [0.0, 676341.0], 'thresholds': [0.0002442002442002442, 0.0], 'tn_count': [676341.0, 0.0], 'tp_count': [0.0, 8653.0]}, >>> ] >>> thresh_bins = np.linspace(0, 1, 4) >>> combo = Measures.combine(tocombine, thresh_bins=thresh_bins).reconstruct() >>> print('tocombine = {}'.format(ub.urepr(tocombine, nl=2))) >>> print('thresh_bins = {!r}'.format(thresh_bins)) >>> print(ub.urepr(combo.__json__(), nl=1)) >>> for thresh_bins in [4096, 1]: >>> combo = Measures.combine(tocombine, thresh_bins=thresh_bins).reconstruct() >>> print('thresh_bins = {!r}'.format(thresh_bins)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> for precision in [6, 5, 2]: >>> combo = Measures.combine(tocombine, precision=precision).reconstruct() >>> print('precision = {!r}'.format(precision)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) >>> for growth in [None, 'max', 'log', 'root', 'half']: >>> combo = Measures.combine(tocombine, growth=growth).reconstruct() >>> print('growth = {!r}'.format(growth)) >>> print('combo = {}'.format(ub.urepr(combo, nl=1))) >>> print('num_thresholds = {}'.format(len(combo['thresholds']))) """ if precision is not None and np.isinf(precision): precision = None # For each item to combine, we merge all unique considered thresholds # into a single array, and then reconstruct the confusion counts with # respect to this new threshold array. tocombine_thresh = [m['thresholds'] for m in tocombine] if ub.iterable(thresh_bins): combo_thresh_asc = thresh_bins nonfinite_vals = [] else: combo_thresh_asc, nonfinite_vals = _combine_threshold( tocombine_thresh, thresh_bins, growth, precision) if nonfinite_vals: # Force inclusion of non-finite thresholds combo_thresh_asc = np.hstack([combo_thresh_asc, nonfinite_vals]) combo_thresh_asc.sort() new_thresh = combo_thresh_asc[::-1] # From this point on, the rest of the logic should (if the # implementation is correct) work with respect to an arbitrary choice # of threshold bins, thus the critical step is choosing what these bins # should be in order to minimize lossyness of the combined accumulated # measures while also keeping a reasonable memory footprint. # Regardless, it might be good to double check this logic. summable = { 'nsupport': 0, 'realpos_total': 0, 'realneg_total': 0, } # Initialize new counts for each entry in the new threshold array # XX_accum[idx] represents the number of *new* XX cases at index idx fp_accum = np.zeros(len(new_thresh)) tp_accum = np.zeros(len(new_thresh)) tn_accum = np.zeros(len(new_thresh)) fn_accum = np.zeros(len(new_thresh)) # For each item to combine, find where its counts should go in the new # combined confusion array. for measures in tocombine: # this thresh is descending thresholds = measures['thresholds'] # XX_pos[idx] is the number of *new* XX cases at index idx at # thresholds[idx] for *these* measures. fp_pos, _, _ = reversable_diff(measures['fp_count']) tp_pos, _, _ = reversable_diff(measures['tp_count']) tn_pos, tn_p, tn_s = reversable_diff(measures['tn_count'], reverse=1) fn_pos, _, _ = reversable_diff(measures['fn_count'], reverse=1) # Find the locations where the thresholds from "these" measures # should be inserted into the combined measures. right_idxs = np.searchsorted(combo_thresh_asc, thresholds, 'right') left_idxs = len(combo_thresh_asc) - right_idxs left_idxs = left_idxs.clip(0, len(combo_thresh_asc) - 1) # Accumulate these values from these measures into the appropriate # new threshold position. note: np.add.at is unbuffered np.add.at(fp_accum, left_idxs, fp_pos) np.add.at(tp_accum, left_idxs, tp_pos) np.add.at(fn_accum, left_idxs, fn_pos) np.add.at(tn_accum, left_idxs, tn_pos) for k in summable: if k in measures: summable[k] += measures[k] new_fp = np.cumsum(fp_accum) # will increase with index new_tp = np.cumsum(tp_accum) # will increase with index new_tn = np.cumsum(tn_accum[::-1])[::-1] # will decrease with index new_fn = np.cumsum(fn_accum[::-1])[::-1] # will decrease with index if -np.inf in nonfinite_vals: # Not sure if this is right... new_fp[-1] = np.inf new_tn[-1] = -np.inf new_info = { 'fp_count': new_fp, 'tp_count': new_tp, 'tn_count': new_tn, 'fn_count': new_fn, 'thresholds': new_thresh, } new_info.update(summable) other = { 'fp_cutoff', 'monotonic_ppv', 'node', 'cx', 'stabalize_thresh', 'trunc_idx' } rest = ub.dict_isect(tocombine[0], other) new_info.update(rest) new_measures = Measures(new_info) return new_measures
[docs] def _combine_threshold(tocombine_thresh, thresh_bins, growth, precision): """ Logic to take care of combining thresholds in the case bins are not given This can be fairly slow and lead to unnecessary memory usage """ orig_thresholds = np.unique(np.hstack(tocombine_thresh)) finite_flags = np.isfinite(orig_thresholds) nonfinite_flags = ~finite_flags if np.any(nonfinite_flags): orig_finite_thresholds = orig_thresholds[finite_flags] # If we have any non-finite threshold values we are stuck with them nonfinite_vals = orig_thresholds[nonfinite_flags] else: orig_finite_thresholds = orig_thresholds nonfinite_vals = [] if thresh_bins is None: if growth is not None: if precision is not None: raise ValueError('dont use precision with growth') # Keep a minimum number of threshold and allow # the resulting metrics to grow by some factor num_per_item = list(map(len, tocombine_thresh)) if growth == 'max': max_num_thresholds = max(num_per_item) elif growth == 'min': max_num_thresholds = min(num_per_item) elif growth == 'log': # Each item after the first is only allowed to contribute # some of its thresholds defined by the base-e-log. modulated_sizes = sorted([np.log(n) for n in num_per_item]) max_num_thresholds = int(round(sum(modulated_sizes[:-1]) + max(num_per_item))) elif growth == 'root': # Each item after the first is only allowed to contribute # some of its thresholds defined by the square root. modulated_sizes = sorted([np.sqrt(n) for n in num_per_item]) max_num_thresholds = int(round(sum(modulated_sizes[:-1]) + max(num_per_item))) elif growth == 'half': # Each item after the first is only allowed to contribute # some of its thresholds, half of them. modulated_sizes = sorted([n / 2 for n in num_per_item]) max_num_thresholds = int(round(sum(modulated_sizes[:-1]) + max(num_per_item))) else: raise KeyError(growth) chosen_idxs = np.linspace(0, len(orig_finite_thresholds) - 1, max_num_thresholds).round().astype(int) chosen_idxs = np.unique(chosen_idxs) combo_thresh_asc = orig_finite_thresholds[chosen_idxs] elif precision is not None: round_thresholds = orig_finite_thresholds.round(precision) combo_thresh_asc = np.unique(round_thresholds) else: combo_thresh_asc = orig_finite_thresholds else: assert growth is None assert precision is None if isinstance(thresh_bins, int): # Fixme, this might not be the correct way to do this. threshold_pool = orig_finite_thresholds # Subdivide threshold pool until we get enough values while thresh_bins > len(threshold_pool): # x = [1, 2, 3, 4, 5] # x = [1, 2, 3, 4, 5, 6] # even_slice = threshold_pool[:(len(threshold_pool) // 2) * 2] # odd_slice = threshold_pool[1:(len(threshold_pool) // 2) * 2] x = threshold_pool even_slice = x[:(len(x) // 2) * 2] odd_slice = x[1:len(x) - (len(x) % 2 == 0)] mean_vals = np.r_[[0, threshold_pool[0]], even_slice, odd_slice, [1, threshold_pool[-1]]].reshape(-1, 2).mean(axis=1) threshold_pool = np.hstack([mean_vals, threshold_pool]) threshold_pool = np.unique(threshold_pool) chosen_idxs = np.linspace(0, len(threshold_pool) - 1, thresh_bins).round().astype(int) chosen_idxs = np.unique(chosen_idxs) combo_thresh_asc = threshold_pool[chosen_idxs] else: raise TypeError return combo_thresh_asc, nonfinite_vals
[docs] def reversable_diff(arr, assume_sorted=1, reverse=False): """ Does a reversable array difference operation. This will be used to find positions where accumulation happened in confusion count array. Ignore: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> funcs = { >>> 'at_zero': [0, 3, 9, 43, 333], >>> 'at_nonzero': [2, 3, 9, 43, 333], >>> 'at_negative': [-2, 3, 9, 43, 333], >>> 'at_neginf': [-np.inf, 3, 9, 43, 333], >>> 'at_dblinf': [-np.inf, -np.inf, 3, 9, 43, 333, np.inf, np.inf], >>> } >>> for k, arr in funcs.items(): >>> arr = np.asarray(arr) + 30 >>> diff_arr, prefix, suffix = reversable_diff(arr) >>> recon_arr = np.cumsum(diff_arr) >>> recon_arr[0:len(prefix)] += prefix >>> recon_arr[len(recon_arr) - len(suffix):] += suffix >>> print('k = {!r}'.format(k)) >>> print('diff_arr = {!r}'.format(diff_arr)) >>> print('arr = {!r}'.format(arr)) >>> print('recon_arr = {!r}'.format(recon_arr)) >>> for k, arr in funcs.items(): >>> arr = np.asarray(arr)[::-1] + 30 >>> diff_arr, prefix, suffix = reversable_diff(arr, reverse=True) >>> recon_arr = np.cumsum(diff_arr[::-1])[::-1] >>> recon_arr[0:len(prefix)] += prefix >>> recon_arr[len(recon_arr) - len(suffix):] += suffix >>> print('k = {!r}'.format(k)) >>> print('diff_arr = {!r}'.format(diff_arr)) >>> print('arr = {!r}'.format(arr)) >>> print('recon_arr = {!r}'.format(recon_arr)) """ import math assert assume_sorted if len(arr) == 0: raise ValueError('todo: default value for empty') if reverse: arr = arr[::-1] n = len(arr) last_idx = n - 1 first_finite_idx = 0 for first_finite_idx, v in zip(range(n), arr): if math.isfinite(v): break last_finite_idx = n for last_finite_idx, v in zip(range(last_idx, -1, -1), arr[::-1]): if math.isfinite(v): break suffix_len = last_idx - last_finite_idx prefix_len = first_finite_idx offset = arr[first_finite_idx] # This is + C prefix = arr[:first_finite_idx] suffix = arr[last_finite_idx + 1:] finite_body = arr[first_finite_idx:last_finite_idx + 1] diff_arr = np.r_[[0] * prefix_len, [offset], np.diff(finite_body), [0] * suffix_len] # The goal is to be able to recon perfectly def invert(diff_arr): recon_arr = np.cumsum(diff_arr) recon_arr[0:len(prefix)] += prefix recon_arr[len(recon_arr) - len(suffix):] += suffix return recon_arr if reverse: diff_arr = diff_arr[::-1] prefix, suffix = suffix[::-1], prefix[::-1] return diff_arr, prefix, suffix
[docs] class PerClass_Measures(ub.NiceRepr, DictProxy): """ """ def __init__(self, cx_to_info): self.proxy = cx_to_info def __nice__(self): return ub.urepr(self.proxy, nl=2, strvals=True, align=':')
[docs] def summary(self): return {k: v.summary() for k, v in self.items()}
[docs] @classmethod def from_json(cls, state): state = ub.map_vals(Measures.from_json, state) self = cls(state) return self
def __json__(self): return {k: v.__json__() for k, v in self.items()}
[docs] def draw(self, key='mcc', prefix='', **kw): """ Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> from kwcoco.metrics.confusion_vectors import ConfusionVectors # NOQA >>> cfsn_vecs = ConfusionVectors.demo() >>> ovr_cfsn = cfsn_vecs.binarize_ovr(keyby='name') >>> self = ovr_cfsn.measures()['perclass'] >>> self.draw('mcc', doclf=True, fnum=1) >>> self.draw('pr', doclf=1, fnum=2) >>> self.draw('roc', doclf=1, fnum=3) """ from kwcoco.metrics import drawing if key == 'pr': return drawing.draw_perclass_prcurve(self, prefix=prefix, **kw) elif key == 'roc': return drawing.draw_perclass_roc(self, prefix=prefix, **kw) else: return drawing.draw_perclass_thresholds( self, key=key, prefix=prefix, **kw)
[docs] def draw_roc(self, prefix='', **kw): from kwcoco.metrics import drawing return drawing.draw_perclass_roc(self, prefix=prefix, **kw)
[docs] def draw_pr(self, prefix='', **kw): from kwcoco.metrics import drawing return drawing.draw_perclass_prcurve(self, prefix=prefix, **kw)
[docs] def summary_plot(self, fnum=1, title='', subplots='auto'): """ CommandLine: python ~/code/kwcoco/kwcoco/metrics/confusion_measures.py PerClass_Measures.summary_plot --show Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.detect_metrics import DetectionMetrics >>> dmet = DetectionMetrics.demo( >>> n_fp=(0, 1), n_fn=(0, 3), nimgs=32, nboxes=(0, 32), >>> classes=3, rng=0, newstyle=1, box_noise=0.7, cls_noise=0.2, score_noise=0.3, with_probs=False) >>> cfsn_vecs = dmet.confusion_vectors() >>> ovr_cfsn = cfsn_vecs.binarize_ovr(keyby='name', ignore_classes=['vector', 'raster']) >>> self = ovr_cfsn.measures()['perclass'] >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> import seaborn as sns >>> sns.set() >>> self.summary_plot(title='demo summary_plot ovr', subplots=['pr', 'roc']) >>> kwplot.show_if_requested() >>> self.summary_plot(title='demo summary_plot ovr', subplots=['mcc', 'acc'], fnum=2) """ if subplots == 'auto': subplots = ['pr', 'roc', 'mcc', 'f1', 'acc'] import kwplot pnum_ = kwplot.PlotNums(nSubplots=len(subplots)) kwplot.figure(fnum=fnum, doclf=True, figtitle=title) for key in subplots: self.draw(key, fnum=fnum, pnum=pnum_())
[docs] class MeasureCombiner: """ Helper to iteravely combine binary measures generated by some process Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors >>> rng = kwarray.ensure_rng(0) >>> bin_combiner = MeasureCombiner(growth='max') >>> for _ in range(80): >>> bin_cfsn_vecs = BinaryConfusionVectors.demo(n=rng.randint(40, 50), rng=rng, p_true=0.2, p_error=0.4, p_miss=0.6) >>> bin_measures = bin_cfsn_vecs.measures() >>> bin_combiner.submit(bin_measures) >>> combined = bin_combiner.finalize() >>> print('combined = {!r}'.format(combined)) """ def __init__(self, precision=None, growth=None, thresh_bins=None): self.measures = None self.growth = growth self.thresh_bins = thresh_bins self.precision = precision self.queue = [] @property def queue_size(self): return len(self.queue)
[docs] def submit(self, other): self.queue.append(other)
[docs] def combine(self): # Reduce measures over the chunk if self.measures is None: to_combine = self.queue else: to_combine = [self.measures] + self.queue if len(to_combine) == 0: pass if len(to_combine) == 1: self.measures = to_combine[0] else: self.measures = Measures.combine( to_combine, precision=self.precision, growth=self.growth, thresh_bins=self.thresh_bins) self.queue = []
[docs] def finalize(self): if self.queue: self.combine() if self.measures is None: # Note: might want to return None instead? return False else: self.measures.reconstruct() return self.measures
[docs] class OneVersusRestMeasureCombiner: """ Helper to iteravely combine ovr measures generated by some process Example: >>> from kwcoco.metrics.confusion_measures import * # NOQA >>> from kwcoco.metrics.confusion_vectors import OneVsRestConfusionVectors >>> rng = kwarray.ensure_rng(0) >>> ovr_combiner = OneVersusRestMeasureCombiner(growth='max') >>> for _ in range(80): >>> ovr_cfsn_vecs = OneVsRestConfusionVectors.demo() >>> ovr_measures = ovr_cfsn_vecs.measures() >>> ovr_combiner.submit(ovr_measures) >>> combined = ovr_combiner.finalize() >>> print('combined = {!r}'.format(combined)) """ def __init__(self, precision=None, growth=None, thresh_bins=None): self.catname_to_combiner = {} self.precision = precision self.growth = precision self.thresh_bins = thresh_bins self.queue_size = 0
[docs] def submit(self, other): self.queue_size += 1 for catname, other_m in other['perclass'].items(): if catname not in self.catname_to_combiner: combiner = MeasureCombiner( precision=self.precision, growth=self.growth, thresh_bins=self.thresh_bins) self.catname_to_combiner[catname] = combiner self.catname_to_combiner[catname].submit(other_m)
[docs] def _summary(self): for catname, combiner in self.catname_to_combiner.items(): # combiner summary # combiner.measures if combiner.measures is not None: combiner.measures.reconstruct() for qx, measure in enumerate(combiner.queue): measure.reconstruct() print(' * queue[{}] = {}'.format(qx, ub.urepr(measure, nl=1)))
[docs] def combine(self): for combiner in self.catname_to_combiner.values(): combiner.combine() self.queue_size = 0
[docs] def finalize(self): catname_to_measures = {} for catname, combiner in self.catname_to_combiner.items(): catname_to_measures[catname] = combiner.finalize() perclass = PerClass_Measures(catname_to_measures) # TODO: consolidate in kwcoco with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='Mean of empty slice') mAUC = np.nanmean([item['trunc_auc'] for item in perclass.values()]) mAP = np.nanmean([item['ap'] for item in perclass.values()]) compatible_format = { 'perclass': perclass, 'mAUC': mAUC, 'mAP': mAP, } return compatible_format
[docs] @profile def populate_info(info): """ Given raw accumulated confusion counts, populated secondary measures like AP, AUC, F1, MCC, etc.. """ info['tp_count'] = tp = np.array(info['tp_count']) info['fp_count'] = fp = np.array(info['fp_count']) has_tn = 'tn_count' in info if has_tn: info['tn_count'] = tn = np.array(info['tn_count']) info['fn_count'] = fn = np.array(info['fn_count']) info['thresholds'] = thresh = np.array(info['thresholds']) realpos_total = info.get('realpos_total', None) if realpos_total is None: realpos_total = info['tp_count'][-1] + info['fn_count'][-1] info['realpos_total'] = realpos_total realneg_total = info.get('realneg_total', None) if realneg_total is None: if has_tn: realneg_total = info['tn_count'][-1] + info['fp_count'][-1] info['realneg_total'] = realneg_total nsupport = info.get('nsupport', None) if nsupport is None: info['nsupport'] = nsupport = realneg_total + realpos_total monotonic_ppv = info.get('monotonic_ppv', True) info['monotonic_ppv'] = monotonic_ppv finite_flags = np.isfinite(thresh) trunc_fp = fp # Cutoff the curves at a comparable point fp_cutoff = info.get('fp_cutoff', None) if fp_cutoff is None: fp_cutoff = np.inf elif isinstance(fp_cutoff, str): if fp_cutoff == 'num_true': fp_cutoff = int(np.ceil(realpos_total)) else: raise KeyError(fp_cutoff) if np.isfinite(fp_cutoff): idxs = np.where(trunc_fp > fp_cutoff)[0] if len(idxs) == 0: trunc_idx = len(trunc_fp) else: trunc_idx = idxs[0] else: trunc_idx = None if trunc_idx is None and np.any(np.isinf(fp)): idxs = np.where(np.isinf(fp))[0] if len(idxs): trunc_idx = idxs[0] if trunc_idx is None: trunc_fp = fp trunc_tp = tp trunc_thresholds = thresh else: trunc_fp = fp[:trunc_idx] trunc_tp = tp[:trunc_idx] trunc_thresholds = thresh[:trunc_idx] # if the cuttoff was not reached, horizontally extend the curve # This will hurt the scores (aka we may be bias against small # scenes), but this will ensure that big scenes are comparable from kwcoco.metrics import assignment if len(trunc_fp) == 0: trunc_fp = np.array([fp_cutoff]) trunc_tp = np.array([0]) if assignment.USE_NEG_INF: trunc_thresholds = np.array([-np.inf]) else: trunc_thresholds = np.array([0]) # THIS WILL CAUSE AUC TO RAISE AN ERROR IF IT GETS HIT elif trunc_fp[-1] <= fp_cutoff and np.isfinite(fp_cutoff): trunc_fp = np.hstack([trunc_fp, [fp_cutoff]]) trunc_tp = np.hstack([trunc_tp, [trunc_tp[-1]]]) if assignment.USE_NEG_INF: trunc_thresholds = np.hstack([trunc_thresholds, [-np.inf]]) else: trunc_thresholds = np.hstack([trunc_thresholds, [0]]) info['trunc_idx'] = trunc_idx info['trunc_fp_count'] = trunc_fp info['trunc_tp_count'] = trunc_tp info['trunc_thresholds'] = trunc_thresholds with warnings.catch_warnings(): # It is very possible that we will divide by zero in this func warnings.filterwarnings('ignore', message='invalid .* true_divide') warnings.filterwarnings('ignore', message='invalid value') warnings.filterwarnings('ignore', message='divide by zero', category=RuntimeWarning) pred_pos = (tp + fp) # number of predicted positives ppv = tp / pred_pos # precision ppv[np.isnan(ppv)] = 0 if monotonic_ppv: # trick to make precision monotonic (which is probably not entirely # correct) ppv = np.maximum.accumulate(ppv[::-1])[::-1] # can set tpr_denom denominator to one tpr_denom = (tp + fn) # tpr_denom[~(tpr_denom > 0)] = 1 tpr = tp / tpr_denom # recall debug = 0 if debug: assert ub.allsame(tpr_denom), 'tpr denom should be constant' # tpr_denom should be equal to info['realpos_total'] if np.any(tpr_denom != info['realpos_total']): warnings.warn('realpos_total is inconsistent') if has_tn: tnr_denom = (tn + fp) tnr_denom[tnr_denom == 0] = 1 tnr = tn / tnr_denom pnv_denom = (tn + fn) pnv_denom[pnv_denom == 0] = 1 npv = tn / pnv_denom info['ppv'] = ppv info['tpr'] = tpr ppv_mul_tpr = ppv * tpr # fpr_denom is a proxy for fp + tn as tn is generally unknown in # the case where all negatives are specified in the confusion # vectors fpr_denom will be exactly (fp + tn) # fpr = fp / (fp + tn) finite_fp = fp[np.isfinite(fp)] fpr_denom = finite_fp[-1] if len(finite_fp) else 0 if fpr_denom == 0: fpr_denom = 1 fpr = info['fp_count'] / fpr_denom info['fpr'] = fpr if has_tn: info['bm'] = tpr + tnr - 1 # informedness info['mk'] = ppv + npv - 1 # markedness tp_add_tn = tp + tn # info['acc'] = (tp + tn) / (tp + tn + fp + fn) info['acc'] = (tp_add_tn) / (tp_add_tn + fp + fn) # https://en.wikipedia.org/wiki/Matthews_correlation_coefficient # mcc_numer = (tp * tn) - (fp * fn) # mcc_denom = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn)) # mcc_denom[np.isnan(mcc_denom) | (mcc_denom == 0)] = 1 # info['mcc'] = mcc_numer / mcc_denom real_pos = fn + tp # number of real positives p_denom = real_pos.copy() p_denom[p_denom == 0] = 1 fnr = fn / p_denom # miss-rate fdr = 1 - ppv # false discovery rate fmr = 1 - npv # false ommision rate (for) info['tnr'] = tnr info['npv'] = npv # info['mcc'] = np.sqrt(ppv * tpr * tnr * npv) - np.sqrt(fdr * fnr * fpr * fmr) info['mcc'] = np.sqrt(ppv_mul_tpr * tnr * npv) - np.sqrt(fdr * fnr * fpr * fmr) # f1_numer = (2 * ppv * tpr) f1_numer = (2 * ppv_mul_tpr) f1_denom = (ppv + tpr) f1_denom[f1_denom == 0] = 1 info['f1'] = f1_numer / f1_denom # https://erotemic.wordpress.com/2019/10/23/closed-form-of-the-mcc-when-tn-inf/ # info['g1'] = np.sqrt(ppv * tpr) info['g1'] = np.sqrt(ppv_mul_tpr) keys = ['mcc', 'g1', 'f1', 'acc'] finite_thresh = thresh[finite_flags] for key in keys: if key in info: measure = info[key][finite_flags] try: max_idx = np.nanargmax(measure) except ValueError: best_thresh = np.nan best_measure = np.nan else: best_thresh = float(finite_thresh[max_idx]) best_measure = float(measure[max_idx]) best_label = '{}={:0.2f}@{:0.2f}'.format(key, best_measure, best_thresh) # if np.isinf(best_thresh) or np.isnan(best_measure): # print('key = {!r}'.format(key)) # print('finite_flags = {!r}'.format(finite_flags)) # print('measure = {!r}'.format(measure)) # print('best_label = {!r}'.format(best_label)) # import xdev # xdev.embed() info['max_{}'.format(key)] = best_label info['_max_{}'.format(key)] = (best_measure, best_thresh) import sklearn.metrics # NOQA finite_trunc_fp = info['trunc_fp_count'] finite_trunc_fp = finite_trunc_fp[np.isfinite(finite_trunc_fp)] trunc_fpr_denom = finite_trunc_fp[-1] if len(finite_trunc_fp) else 0 if trunc_fpr_denom == 0: trunc_fpr_denom = 1 info['trunc_tpr'] = info['trunc_tp_count'] / info['realpos_total'] info['trunc_fpr'] = info['trunc_fp_count'] / trunc_fpr_denom try: info['trunc_auc'] = sklearn.metrics.auc(info['trunc_fpr'], info['trunc_tpr']) except ValueError: # At least 2 points are needed to compute area under curve, but x.shape = 1 info['trunc_auc'] = np.nan if len(info['trunc_fpr']) == 1 and len(info['trunc_tpr']) == 1: if info['trunc_fpr'][0] == 0 and info['trunc_tpr'][0] == 1: # Hard code AUC in the perfect detection case info['trunc_auc'] = 1.0 info['auc'] = info['trunc_auc'] """ Note: Apparently, consistent scoring is really hard to get right. For detection problems scoring via confusion_vectors+sklearn produces noticably different results than the VOC method. There are a few reasons for this. The VOC method stops counting true positives after all assigned predicted boxes have been counted. It simply remembers the amount of original true positives to normalize the true positive reate. On the other hand, confusion vectors maintains a list of these unassigned true boxes and gives them a predicted index of -1 and a score of zero. This means that this function sees them as having a y_true of 1 and a y_score of 0, which allows the scikit-learn fp and tp counts to effectively get up to 100% recall when the threshold is zero. The VOC method simply ignores these and handles them implicitly. The problem is that if you remove these from the scikit-learn inputs, it wont see the correct number of positives and it will incorrectly normalize the recall. In summary: VOC: * remembers realpos_total * doesn't count unassigned truths as TP when the threshold is zero. CV+SKL: * counts unassigned truths as TP with score=0. * NEW: now counts unassigned truth as TP with score=-np.inf * Always ensure tpr=1, ppv=0 and ppv=1, tpr=0 cases exist. """ # --- # sklearn definition # AP = sum((R[n] - R[n - 1]) * P[n] for n in range(len(thresholds))) # stop when full recall attained SKLISH_AP = 1 if SKLISH_AP: last_ind = tpr.searchsorted(tpr[-1]) rec = np.r_[0, tpr[:last_ind + 1]] prec = np.r_[1, ppv[:last_ind + 1]] # scores = np.r_[0, thresh[:last_ind + 1]] # Precisions are weighted by the change in recall diff_items = np.diff(rec) prec_items = prec[1:] # basline way info['sklish_ap'] = float(np.sum(diff_items * prec_items)) PYCOCOTOOLS_AP = True if PYCOCOTOOLS_AP: # similar to pycocotools style "AP" R = 101 feasible_idxs = np.where(thresh > -np.inf)[0] if len(feasible_idxs) == 0: info['pycocotools_ap'] = np.nan else: feasible_tpr = tpr[feasible_idxs[-1]] last_ind = tpr.searchsorted(feasible_tpr) rc = tpr[:last_ind + 1] pr = ppv[:last_ind + 1] recThrs = np.linspace(0, 1.0, R) inds = np.searchsorted(rc, recThrs, side='left') q = np.zeros((R,)) # ss = np.zeros((R,)) try: for ri, pi in enumerate(inds): q[ri] = pr[pi] # ss[ri] = scores[pi] except Exception: pass pycocotools_ap = q.mean() info['pycocotools_ap'] = float(pycocotools_ap) OUTLIER_AP = 1 if OUTLIER_AP: # Remove extreme outliers from ap calculation # only do this on the first or last 2 items. # Hueristically chosen. flags = diff_items > 0.1 idxs = np.where(flags)[0] max_idx = len(flags) - 1 idx_thresh = 2 try: idx_dist = np.minimum(idxs, max_idx - idxs) outlier_idxs = idxs[idx_dist < idx_thresh] outlier_flags = kwarray.boolmask(outlier_idxs, len(diff_items)) inlier_flags = ~outlier_flags score = prec_items.copy() score[outlier_flags] = score[inlier_flags].min() score[outlier_flags] = 0 ap = outlier_ap = np.sum(score * diff_items) info['outlier_ap'] = float(outlier_ap) except Exception: pass INF_THRESH_AP = 1 if INF_THRESH_AP: # This may become the de-facto scikit-learn implementation in the # future. from scipy import integrate # MODIFIED SKLEARN AVERAGE PRECISION FOR -INF THRESH # # Better way of marked unassigned truth as never-recallable. # We need to prevent these unassigned truths from incorrectly # bringing up our true positive rate at low thresholds. # # Simply bump last_ind to ensure it is feasible_idxs = np.where(thresh > -np.inf)[0] if len(feasible_idxs) == 0: info['sklearn_ap'] = np.nan else: feasible_tpr = tpr[feasible_idxs[-1]] last_ind = tpr.searchsorted(feasible_tpr) rec = np.r_[0, tpr[:last_ind + 1]] prec = np.r_[1, ppv[:last_ind + 1]] diff_items = np.diff(rec) prec_items = prec[1:] # Not sure which is beset here, we no longer have # assumption of max-tpr = 1 # ap = float(np.sum(diff_items * prec_items)) info['sklearn_ap'] = integrate.trapz(y=prec, x=rec) info['ap_method'] = info.get('ap_method', 'pycocotools') if info['ap_method'] == 'pycocotools': ap = info['pycocotools_ap'] elif info['ap_method'] == 'outlier': ap = info['outlier_ap'] elif info['ap_method'] == 'sklearn': ap = info['sklearn_ap'] elif info['ap_method'] == 'sklish': ap = info['sklish_ap'] else: raise KeyError(info['ap_method']) # print('ap = {!r}'.format(ap)) # print('ap = {!r}'.format(ap)) # ap = np.sum(np.diff(rec) * prec[1:]) info['ap'] = float(ap)