Source code for kwcoco.metrics.assignment

"""
TODO:
    - [ ] _fast_pdist_priority: Look at absolute difference in sibling entropy
        when deciding whether to go up or down in the tree.

    - [ ] medschool applications true-pred matching (applicant proposing) fast
        algorithm.

    - [ ] Maybe looping over truth rather than pred is faster? but it makes you
        have to combine pred score / ious, which is weird.

    - [x] preallocate ndarray and use hstack to build confusion vectors?
        - doesn't help

    - [ ] relevant classes / classes / classes-of-interest we care about needs
        to be a first class member of detection metrics.

    - [ ] Add parameter that allows one prediction to "match" to more than one
        truth object. (example: we have a duck detector problem and all the
        ducks in a row are annotated as separate object, and we only care about
        getting the group)
"""
import warnings
import networkx as nx
import numpy as np
import ubelt as ub


[docs]USE_NEG_INF = True
[docs]def _assign_confusion_vectors(true_dets, pred_dets, bg_weight=1.0, iou_thresh=0.5, bg_cidx=-1, bias=0.0, classes=None, compat='all', prioritize='iou', ignore_classes='ignore', max_dets=None): """ Create confusion vectors for detections by assigning to ground true boxes Given predictions and truth for an image return (y_pred, y_true, y_score), which is suitable for sklearn classification metrics Args: true_dets (Detections): groundtruth with boxes, classes, and weights pred_dets (Detections): predictions with boxes, classes, and scores iou_thresh (float, default=0.5): bounding box overlap iou threshold required for assignment bias (float, default=0.0): for computing bounding box overlap, either 1 or 0 gids (List[int], default=None): which subset of images ids to compute confusion metrics on. If not specified all images are used. compat (str, default='all'): can be ('ancestors' | 'mutex' | 'all'). determines which pred boxes are allowed to match which true boxes. If 'mutex', then pred boxes can only match true boxes of the same class. If 'ancestors', then pred boxes can match true boxes that match or have a coarser label. If 'all', then any pred can match any true, regardless of its category label. prioritize (str, default='iou'): can be ('iou' | 'class' | 'correct') determines which box to assign to if mutiple true boxes overlap a predicted box. if prioritize is iou, then the true box with maximum iou (above iou_thresh) will be chosen. If prioritize is class, then it will prefer matching a compatible class above a higher iou. If prioritize is correct, then ancestors of the true class are preferred over descendents of the true class, over unreleated classes. bg_cidx (int, default=-1): The index of the background class. The index used in the truth column when a predicted bounding box does not match any true bounding box. classes (List[str] | kwcoco.CategoryTree): mapping from class indices to class names. Can also contain class heirarchy information. ignore_classes (str | List[str]): class name(s) indicating ignore regions max_dets (int): maximum number of detections to consider TODO: - [ ] This is a bottleneck function. An implementation in C / C++ / Cython would likely improve the overall system. - [ ] Implement crowd truth. Allow multiple predictions to match any truth objet marked as "iscrowd". Returns: dict: with relevant confusion vectors. This keys of this dict can be interpreted as columns of a data frame. The `txs` / `pxs` columns represent the indexes of the true / predicted annotations that were assigned as matching. Additionally each row also contains the true and predicted class index, the predicted score, the true weight and the iou of the true and predicted boxes. A `txs` value of -1 means that the predicted box was not assigned to a true annotation and a `pxs` value of -1 means that the true annotation was not assigne to any predicted annotation. Example: >>> # xdoctest: +REQUIRES(module:pandas) >>> import pandas as pd >>> import kwimage >>> # Given a raw numpy representation construct Detection wrappers >>> true_dets = kwimage.Detections( >>> boxes=kwimage.Boxes(np.array([ >>> [ 0, 0, 10, 10], [10, 0, 20, 10], >>> [10, 0, 20, 10], [20, 0, 30, 10]]), 'tlbr'), >>> weights=np.array([1, 0, .9, 1]), >>> class_idxs=np.array([0, 0, 1, 2])) >>> pred_dets = kwimage.Detections( >>> boxes=kwimage.Boxes(np.array([ >>> [6, 2, 20, 10], [3, 2, 9, 7], >>> [3, 9, 9, 7], [3, 2, 9, 7], >>> [2, 6, 7, 7], [20, 0, 30, 10]]), 'tlbr'), >>> scores=np.array([.5, .5, .5, .5, .5, .5]), >>> class_idxs=np.array([0, 0, 1, 2, 0, 1])) >>> bg_weight = 1.0 >>> compat = 'all' >>> iou_thresh = 0.5 >>> bias = 0.0 >>> import kwcoco >>> classes = kwcoco.CategoryTree.from_mutex(list(range(3))) >>> bg_cidx = -1 >>> y = _assign_confusion_vectors(true_dets, pred_dets, bias=bias, >>> bg_weight=bg_weight, iou_thresh=iou_thresh, >>> compat=compat) >>> y = pd.DataFrame(y) >>> print(y) # xdoc: +IGNORE_WANT pred true score weight iou txs pxs 0 1 2 0.5000 1.0000 1.0000 3 5 1 0 -1 0.5000 1.0000 -1.0000 -1 4 2 2 -1 0.5000 1.0000 -1.0000 -1 3 3 1 -1 0.5000 1.0000 -1.0000 -1 2 4 0 -1 0.5000 1.0000 -1.0000 -1 1 5 0 0 0.5000 0.0000 0.6061 1 0 6 -1 0 0.0000 1.0000 -1.0000 0 -1 7 -1 1 0.0000 0.9000 -1.0000 2 -1 Example: >>> # xdoctest: +REQUIRES(module:pandas) >>> import pandas as pd >>> from kwcoco.metrics import DetectionMetrics >>> dmet = DetectionMetrics.demo(nimgs=1, nclasses=8, >>> nboxes=(0, 20), n_fp=20, >>> box_noise=.2, cls_noise=.3) >>> classes = dmet.classes >>> gid = 0 >>> true_dets = dmet.true_detections(gid) >>> pred_dets = dmet.pred_detections(gid) >>> y = _assign_confusion_vectors(true_dets, pred_dets, >>> classes=dmet.classes, >>> compat='all', prioritize='class') >>> y = pd.DataFrame(y) >>> print(y) # xdoc: +IGNORE_WANT >>> y = _assign_confusion_vectors(true_dets, pred_dets, >>> classes=dmet.classes, >>> compat='ancestors', iou_thresh=.5) >>> y = pd.DataFrame(y) >>> print(y) # xdoc: +IGNORE_WANT Ignore: from xinspect.dynamic_kwargs import get_func_kwargs globals().update(get_func_kwargs(_assign_confusion_vectors)) """ import kwarray valid_compat_keys = {'ancestors', 'mutex', 'all'} if compat not in valid_compat_keys: raise KeyError(compat) if classes is None and compat == 'ancestors': compat = 'mutex' if compat == 'mutex': prioritize = 'iou' # Group true boxes by class # Keep track which true boxes are unused / not assigned unique_tcxs, tgroupxs = kwarray.group_indices(true_dets.class_idxs) cx_to_txs = dict(zip(unique_tcxs, tgroupxs)) unique_pcxs = np.array(sorted(set(pred_dets.class_idxs))) if classes is None: import kwcoco # Build mutually exclusive category tree all_cxs = sorted(set(map(int, unique_pcxs)) | set(map(int, unique_tcxs))) all_cxs = list(range(max(all_cxs) + 1)) classes = kwcoco.CategoryTree.from_mutex(all_cxs) cx_to_ancestors = classes.idx_to_ancestor_idxs() if prioritize == 'iou': pdist_priority = None # TODO: cleanup else: pdist_priority = _fast_pdist_priority(classes, prioritize) if compat == 'mutex': # assume classes are mutually exclusive if hierarchy is not given cx_to_matchable_cxs = {cx: [cx] for cx in unique_pcxs} elif compat == 'ancestors': cx_to_matchable_cxs = { cx: sorted([cx] + sorted(ub.take( classes.node_to_idx, nx.ancestors(classes.graph, classes.idx_to_node[cx])))) for cx in unique_pcxs } elif compat == 'all': cx_to_matchable_cxs = {cx: unique_tcxs for cx in unique_pcxs} else: raise KeyError(compat) if compat == 'all': # In this case simply run the full pairwise iou common_true_idxs = np.arange(len(true_dets)) cx_to_matchable_txs = {cx: common_true_idxs for cx in unique_pcxs} common_ious = pred_dets.boxes.ious(true_dets.boxes, bias=bias) # common_ious = pred_dets.boxes.ious(true_dets.boxes, impl='c', bias=bias) iou_lookup = dict(enumerate(common_ious)) else: # For each pred-category find matchable true-indices cx_to_matchable_txs = {} for cx, compat_cx in cx_to_matchable_cxs.items(): matchable_cxs = cx_to_matchable_cxs[cx] compat_txs = ub.take(cx_to_txs, matchable_cxs, default=[]) compat_txs = np.array(sorted(ub.flatten(compat_txs)), dtype=int) cx_to_matchable_txs[cx] = compat_txs # Batch up the IOU pre-computation between compatible truths / preds iou_lookup = {} unique_pred_cxs, pgroupxs = kwarray.group_indices(pred_dets.class_idxs) for cx, pred_idxs in zip(unique_pred_cxs, pgroupxs): true_idxs = cx_to_matchable_txs[cx] ious = pred_dets.boxes[pred_idxs].ious( true_dets.boxes[true_idxs], bias=bias) _px_to_iou = dict(zip(pred_idxs, ious)) iou_lookup.update(_px_to_iou) iou_thresh_list = ( [iou_thresh] if not ub.iterable(iou_thresh) else iou_thresh) iou_thresh_to_y = {} for iou_thresh_ in iou_thresh_list: isvalid_lookup = {px: ious > iou_thresh_ for px, ious in iou_lookup.items()} y = _critical_loop(true_dets, pred_dets, iou_lookup, isvalid_lookup, cx_to_matchable_txs, bg_weight, prioritize, iou_thresh_, pdist_priority, cx_to_ancestors, bg_cidx, ignore_classes=ignore_classes, max_dets=max_dets) iou_thresh_to_y[iou_thresh_] = y if ub.iterable(iou_thresh): return iou_thresh_to_y else: return y
[docs]def _critical_loop(true_dets, pred_dets, iou_lookup, isvalid_lookup, cx_to_matchable_txs, bg_weight, prioritize, iou_thresh_, pdist_priority, cx_to_ancestors, bg_cidx, ignore_classes, max_dets): # Note: # * Preallocating numpy arrays does not help # * It might be useful to code this critical loop up in C / Cython # * Could numba help? (I'm having an issue with cmath) import kwarray # Keep track of which true items have been used true_unused = np.ones(len(true_dets), dtype=np.bool) # sort predictions by descending score if 'scores' in pred_dets.data: _scores = pred_dets.scores else: _scores = np.ones(len(pred_dets)) _pred_sortx = _scores.argsort()[::-1] _pred_cxs = pred_dets.class_idxs.take(_pred_sortx, axis=0) _pred_scores = _scores.take(_pred_sortx, axis=0) if max_dets is not None and np.isfinite(max_dets): # for pycocoutils compat, probably not the most efficient way of # handling this _pred_sortx = _pred_sortx[0:max_dets] _pred_cxs = _pred_cxs[0:max_dets] _pred_scores = _pred_scores[0:max_dets] if ignore_classes is not None: # FIXME: does this use the iou threshold correctly? # iou_thresh is being used as iooa not iou to determine which # pred regions are ignored. true_ignore_flags, pred_ignore_flags = _filter_ignore_regions( true_dets, pred_dets, ioaa_thresh=iou_thresh_, ignore_classes=ignore_classes) # Remove ignored predicted regions from assignment consideration _pred_keep_flags = ~pred_ignore_flags[_pred_sortx] _pred_sortx = _pred_sortx[_pred_keep_flags] _pred_cxs = _pred_cxs[_pred_keep_flags] _pred_scores = _pred_scores[_pred_keep_flags] # Remove ignored truth regions from assignment consideration true_unused[true_ignore_flags] = False y_pred = [] y_true = [] y_score = [] y_weight = [] y_iou = [] y_pxs = [] y_txs = [] # NOTE: I don't think this actualy does anything anymore if prioritize == 'correct' or prioritize == 'class': used_truth_policy = 'next_best' else: used_truth_policy = 'mark_false' # Greedy assignment. For each predicted detection box. # Allow it to match the truth of compatible classes. for px, pred_cx, score in zip(_pred_sortx, _pred_cxs, _pred_scores): # Find compatible truth indices true_idxs = cx_to_matchable_txs[pred_cx] # Filter out any truth that has already been used unused = true_unused[true_idxs] unused_true_idxs = true_idxs[unused] ovmax = -np.inf ovidx = None weight = bg_weight tx = -1 # we will set this to the index of the assignd gt if len(unused_true_idxs): # First grab all candidate unused true boxes and lookup precomputed # ious between this pred and true_idxs cand_true_idxs = unused_true_idxs if prioritize == 'iou': # simply match the true box with the highest iou (that is also # considered matchable) if used_truth_policy == 'next_best': # TODO: VERIFY THIS IS NO DIFFERENT THAN "MARK_FALSE" AND # REMOVE. # Dont even consider matches to previously used groundtruth # (note this means it will be marked as a false positive) cand_ious = iou_lookup[px].compress(unused) ovidx = cand_ious.argmax() ovmax = cand_ious[ovidx] if ovmax > iou_thresh_: tx = cand_true_idxs[ovidx] elif used_truth_policy == 'mark_false': # Consider a match to a previously used truth a false (note # this means it will be marked as a false positive more # agressively than the next_best option, because there it # may match a different truth) cand_ious = iou_lookup[px] ovidx = cand_ious.argmax() ovmax = cand_ious[ovidx] if ovmax > iou_thresh_: tx = true_idxs[ovidx] if not unused[ovidx]: tx = -1 else: raise KeyError(used_truth_policy) elif prioritize == 'correct' or prioritize == 'class': if used_truth_policy != 'next_best': raise NotImplementedError(used_truth_policy) # Choose which (if any) of the overlapping true boxes to match # If there are any correct matches above the overlap threshold # choose to match that. # Flag any unused true box that overlaps overlap_flags = isvalid_lookup[px][unused] if overlap_flags.any(): cand_ious = iou_lookup[px][unused] cand_true_cxs = true_dets.class_idxs[cand_true_idxs] cand_true_idxs = cand_true_idxs[overlap_flags] cand_true_cxs = cand_true_cxs[overlap_flags] cand_ious = cand_ious[overlap_flags] # Choose candidate with highest priority # (prefer finer-grained correct classes over higher overlap, # but choose highest overlap in a tie). cand_class_priority = pdist_priority[pred_cx][cand_true_cxs] # ovidx = ub.argmax(zip(cand_class_priority, cand_ious)) ovidx = kwarray.arglexmax([cand_ious, cand_class_priority]) ovmax = cand_ious[ovidx] tx = cand_true_idxs[ovidx] else: raise KeyError(prioritize) if tx > -1: # If the prediction matched a true object, mark the assignment # as either a true or false positive # tx = unused_true_idxs[ovidx] true_unused[tx] = False # mark this true box as used if 'weights' in true_dets.data: weight = true_dets.weights[tx] else: weight = 1.0 true_cx = true_dets.class_idxs[tx] # If the prediction is a finer-grained category than the truth # change the prediction to match the truth (because it is # compatible). This is the key to hierarchical scoring. if pred_cx is not None and true_cx in cx_to_ancestors[pred_cx]: pred_cx = true_cx y_pred.append(pred_cx) y_true.append(true_cx) y_score.append(score) y_weight.append(weight) y_iou.append(ovmax) y_pxs.append(px) y_txs.append(tx) else: # Assign this prediction to a the background # Mark this prediction as a false positive y_pred.append(pred_cx) y_true.append(bg_cidx) y_score.append(score) y_weight.append(bg_weight) y_iou.append(-1) y_pxs.append(px) y_txs.append(tx) # All pred boxes have been assigned to a truth box or the background. # Mark unused true boxes we failed to predict as false negatives bg_px = -1 unused_txs = np.where(true_unused)[0] n = len(unused_txs) unused_y_true = true_dets.class_idxs[unused_txs].tolist() if 'weights' in true_dets.data: unused_y_weight = true_dets.weights[unused_txs].tolist() else: unused_y_weight = [1.0] * n y_pred.extend([-1] * n) y_true.extend(unused_y_true) if USE_NEG_INF: y_score.extend([-np.inf] * n) else: y_score.extend([0] * n) y_iou.extend([-1] * n) y_weight.extend(unused_y_weight) y_pxs.extend([bg_px] * n) y_txs.extend(unused_txs.tolist()) y = { 'pred': y_pred, 'true': y_true, 'score': y_score, 'weight': y_weight, 'iou': y_iou, 'txs': y_txs, # index into the original true box for this row 'pxs': y_pxs, # index into the original pred box for this row } # val_lens = ub.map_vals(len, y) # print('val_lens = {!r}'.format(val_lens)) # assert ub.allsame(val_lens.values()) return y
[docs]def _fast_pdist_priority(classes, prioritize, _cache={}): """ Custom priority computation. Needs some vetting. This is the priority used when deciding which prediction to assign to which truth. TODO: - [ ] Look at absolute difference in sibling entropy when deciding whether to go up or down in the tree. """ # Note: distances to ancestors will be negative and distances # to descendants will be positive. Prefer matching ancestors # over descendants. key = ub.hash_data('\n'.join(list(map(str, classes))), hasher='sha1') # key = ub.repr2(classes.__json__()) if key not in _cache: # classes = kwcoco.CategoryTree.from_json(classes) with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='invalid .* less') warnings.filterwarnings('ignore', message='invalid .* greater_equal') # Get basic distance between nodes pdist = classes.idx_pairwise_distance() pdist_priority = np.array(pdist, dtype=np.float32, copy=True) if prioritize == 'correct': # Prioritizes all ancestors first, and then descendants # afterwords, nodes off the direct lineage are ignored. valid_vals = pdist_priority[np.isfinite(pdist_priority)] maxval = (valid_vals.max() - valid_vals.min()) + 1 is_ancestor = (pdist_priority >= 0) is_descend = (pdist_priority < 0) # Prioritize ALL ancestors first pdist_priority[is_ancestor] = ( 2 * maxval - pdist_priority[is_ancestor]) # Prioritize ALL descendants next pdist_priority[is_descend] = ( maxval + pdist_priority[is_descend]) pdist_priority[np.isnan(pdist_priority)] = -np.inf elif prioritize == 'class': # Prioritizes the exact match first, and then it alternates # between ancestors and desendants based on distance to self pdist_priority[pdist_priority < -1] += .5 pdist_priority = np.abs(pdist_priority) pdist_priority[np.isnan(pdist_priority)] = np.inf pdist_priority = 1 / (pdist_priority + 1) else: raise KeyError(prioritize) _cache[key] = pdist_priority pdist_priority = _cache[key] return pdist_priority
[docs]def _filter_ignore_regions(true_dets, pred_dets, ioaa_thresh=0.5, ignore_classes='ignore'): """ Determine which true and predicted detections should be ignored. Args: true_dets (Detections) pred_dets (Detections) ioaa_thresh (float): intersection over other area thresh for ignoring a region. Returns: Tuple[ndarray, ndarray]: flags indicating which true and predicted detections should be ignored. Example: >>> from kwcoco.metrics.assignment import * # NOQA >>> from kwcoco.metrics.assignment import _filter_ignore_regions >>> import kwimage >>> pred_dets = kwimage.Detections.random(classes=['a', 'b', 'c']) >>> true_dets = kwimage.Detections.random( >>> segmentations=True, classes=['a', 'b', 'c', 'ignore']) >>> ignore_classes = {'ignore', 'b'} >>> ioaa_thresh = 0.5 >>> print('true_dets = {!r}'.format(true_dets)) >>> print('pred_dets = {!r}'.format(pred_dets)) >>> flags1, flags2 = _filter_ignore_regions( >>> true_dets, pred_dets, ioaa_thresh=ioaa_thresh, ignore_classes=ignore_classes) >>> print('flags1 = {!r}'.format(flags1)) >>> print('flags2 = {!r}'.format(flags2)) >>> flags3, flags4 = _filter_ignore_regions( >>> true_dets, pred_dets, ioaa_thresh=ioaa_thresh, >>> ignore_classes={c.upper() for c in ignore_classes}) >>> assert np.all(flags1 == flags3) >>> assert np.all(flags2 == flags4) """ true_ignore_flags = np.zeros(len(true_dets), dtype=np.bool) pred_ignore_flags = np.zeros(len(pred_dets), dtype=np.bool) if not ub.iterable(ignore_classes): ignore_classes = {ignore_classes} def _normalize_catname(name, classes): if classes is None: return name if name in classes: return name for cname in classes: if cname.lower() == name.lower(): return cname return name ignore_classes = {_normalize_catname(c, true_dets.classes) for c in ignore_classes} if true_dets.classes is not None: ignore_classes = ignore_classes & set(true_dets.classes) # Filter out true detections labeled as "ignore" if true_dets.classes is not None and ignore_classes: import kwarray ignore_cidxs = [true_dets.classes.index(c) for c in ignore_classes] true_ignore_flags = kwarray.isect_flags( true_dets.class_idxs, ignore_cidxs) if np.any(true_ignore_flags) and len(pred_dets): ignore_dets = true_dets.compress(true_ignore_flags) pred_boxes = pred_dets.data['boxes'] ignore_boxes = ignore_dets.data['boxes'] ignore_sseg = ignore_dets.data.get('segmentations', None) # Determine which predicted boxes are inside the ignore regions # note: using sum over max is delibrate here. with warnings.catch_warnings(): warnings.filterwarnings('ignore', message='invalid .* less') warnings.filterwarnings('ignore', message='invalid .* greater_equal') warnings.filterwarnings('ignore', message='invalid .* true_divide') ignore_overlap = (pred_boxes.isect_area(ignore_boxes) / pred_boxes.area).clip(0, 1).sum(axis=1) ignore_overlap = np.nan_to_num(ignore_overlap) ignore_idxs = np.where(ignore_overlap > ioaa_thresh)[0] if ignore_sseg is not None: from shapely.ops import unary_union # If the ignore region has segmentations further refine our # estimate of which predictions should be ignored. ignore_sseg = ignore_sseg.to_polygon_list() box_polys = ignore_boxes.to_polygons() ignore_polys = [ bp if p is None else p for bp, p in zip(box_polys, ignore_sseg.data) ] # FIXME: to to_shapely method can break, not sure if this is # the right way to fix this ignore_regions = [] for p in ignore_polys: try: ignore_regions.append(p.to_shapely()) except Exception: pass # ignore_regions = [p.to_shapely() for p in ignore_polys] ignore_region = unary_union(ignore_regions).buffer(0) cand_pred = pred_boxes.take(ignore_idxs) # Refine overlap estimates cand_regions = cand_pred.to_shapley() for idx, pred_region in zip(ignore_idxs, cand_regions): try: isect = ignore_region.intersection(pred_region) overlap = (isect.area / pred_region.area) ignore_overlap[idx] = overlap except Exception as ex: warnings.warn('ex = {!r}'.format(ex)) pred_ignore_flags = ignore_overlap > ioaa_thresh return true_ignore_flags, pred_ignore_flags
if __name__ == '__main__': """ CommandLine: python ~/code/kwcoco/kwcoco/metrics/assignment.py all """ import xdoctest xdoctest.doctest_module(__file__)