"""
Evaluates a predicted coco dataset against a truth coco dataset.
This currently computes detection-level metrics.
The components in this module work programatically or as a command line script.
TODO:
- [ ] does evaluate return one result or multiple results
based on different configurations?
- [ ] max_dets - TODO: in original pycocoutils but not here
- [ ] Flag that allows for polygon instead of bounding box overlap
- [ ] How do we note what iou_thresh and area-range were in
the result plots?
CommandLine:
xdoctest -m kwcoco.coco_evaluator __doc__:0 --vd --slow
Example:
>>> from kwcoco.coco_evaluator import * # NOQA
>>> from kwcoco.coco_evaluator import CocoEvaluator
>>> import kwcoco
>>> # note: increase the number of images for better looking metrics
>>> true_dset = kwcoco.CocoDataset.demo('shapes8')
>>> from kwcoco.demo.perterb import perterb_coco
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': (0, 10),
>>> 'n_fn': (0, 10),
>>> 'with_probs': True,
>>> }
>>> pred_dset = perterb_coco(true_dset, **kwargs)
>>> print('true_dset = {!r}'.format(true_dset))
>>> print('pred_dset = {!r}'.format(pred_dset))
>>> config = {
>>> 'true_dataset': true_dset,
>>> 'pred_dataset': pred_dset,
>>> 'area_range': ['all', 'small'],
>>> 'iou_thresh': [0.3, 0.95],
>>> }
>>> coco_eval = CocoEvaluator(config)
>>> results = coco_eval.evaluate()
>>> # Now we can draw / serialize the results as we please
>>> dpath = ub.Path.appdir('kwcoco/tests/test_out_dpath').ensuredir()
>>> results_fpath = dpath / 'metrics.json'
>>> print('results_fpath = {!r}'.format(results_fpath))
>>> results.dump(results_fpath, indent=' ')
>>> measures = results['area_range=all,iou_thresh=0.3'].nocls_measures
>>> import pandas as pd
>>> print(pd.DataFrame(ub.dict_isect(
>>> measures, ['f1', 'g1', 'mcc', 'thresholds',
>>> 'ppv', 'tpr', 'tnr', 'npv', 'fpr',
>>> 'tp_count', 'fp_count',
>>> 'tn_count', 'fn_count'])).iloc[::100])
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> # xdoctest: +REQUIRES(--slow)
>>> results.dump_figures(dpath)
>>> print('dpath = {!r}'.format(dpath))
>>> # xdoctest: +REQUIRES(--vd)
>>> if ub.argflag('--vd') or 1:
>>> import xdev
>>> xdev.view_directory(dpath)
"""
import os
import scriptconfig as scfg
import ubelt as ub
import warnings
from os.path import exists
from os.path import isdir
from os.path import isfile
from kwcoco.util.dict_like import DictProxy
try:
from xdev import profile
except Exception:
profile = ub.identity
[docs]
class CocoEvalConfig(scfg.DataConfig):
"""
Evaluate and score predicted versus truth detections / classifications in a COCO dataset
"""
__default__ = {
'true_dataset': scfg.Value(None, type=str, help='coercable true detections', position=1),
'pred_dataset': scfg.Value(None, type=str, help='coercable predicted detections', position=2),
'ignore_classes': scfg.Value(
None, type=list, help='classes to ignore (give them zero weight)'),
'implicit_negative_classes': scfg.Value(['background']),
'implicit_ignore_classes': scfg.Value(['ignore']),
'fp_cutoff': scfg.Value(float('inf'), help='False positive cutoff for ROC'),
'iou_thresh': scfg.Value(
value=0.5,
help='One or more IoU overlap threshold for detection assignment',
alias=['ovthresh']
),
'compat': scfg.Value(
value='mutex',
choices=['all', 'mutex', 'ancestors'],
help=ub.paragraph(
'''
Matching strategy for which true annots are allowed to match
which predicted annots.
`mutex` means true boxes can only match predictions where the
true class has highest probability (pycocotools setting).
`all` means any class can match any other class.
Dont use `ancestors`, it is broken.
''')),
'monotonic_ppv': scfg.Value(True, help=ub.paragraph(
'''
if True forces precision to be monotonic. Defaults to True for
compatibility with pycocotools, but that might not be the best
option.
''')),
'ap_method': scfg.Value('pycocotools', help=ub.paragraph(
'''
Method for computing AP. Defaults to a setting comparable to
pycocotools. Can also be set to sklearn to use an alterative
method.
''')),
'use_area_attr': scfg.Value(
'try', help=ub.paragraph(
'''
if True (pycocotools setting) uses the area coco attribute to
filter area range instead of bbox area. Otherwise just filters
based on bbox area. If 'try' then it tries to use it but will
fallback if it does not exist.
''')),
'area_range': scfg.Value(
value=['all'],
help=ub.paragraph(
'''
Minimum and maximum object areas to consider. May specified as
a comma-separated code: '<min>-<max>'. These ranges are
inclusive. Also accepts scientific notation, inf, and special
keys all, small, medium, and large, which are equivalent to
small='0-1024', medium='1024-9216', large='9216-1e10', and
all='0-inf'. Note these are areas, e.g. 16x16 boxes are
considered small, but 17x16 boxes are medium. These can be
mixed an matched, e.g.: `--area_range=0-4e3,small,1024-inf`.
'''
)),
# TODO options:
'max_dets': scfg.Value(float('inf'), help=(
'maximum number of predictions to consider')),
'iou_bias': scfg.Value(1, help=(
'pycocotools setting is 1, but 0 may be better')),
# Extra options
'force_pycocoutils': scfg.Value(False, help=(
'ignore all other options and just use pycocoutils to score')),
# 'discard_classes': scfg.Value(None, type=list, help='classes to completely remove'), # TODO
'assign_workers': scfg.Value(8, help='number of background workers for assignment'),
'load_workers': scfg.Value(0, help='number of workers to load cached detections'),
# 'ovthresh': scfg.Value(None, help='deprecated, alias for iou_thresh'),
'classes_of_interest': scfg.Value(
None, type=list,
help='if specified only these classes are given weight'),
'use_image_names': scfg.Value(
False, help='if True use image file_name to associate images instead of ids'),
}
def __post_init__(self):
# if self['ovthresh'] is not None:
# warnings.warn('ovthresh is deprecated use iou_thresh')
# self['iou_thresh'] = self['ovthresh']
if self['area_range'] is not None:
parsed = []
code = self['area_range']
parts = []
if ub.iterable(code):
for p in code:
if isinstance(p, str) and ',' in p:
parts.extend(p.split(','))
else:
parts.append(p)
else:
if not isinstance(code, str):
raise TypeError('bad area code {}'.format(code))
parts = [code]
for p in parts:
minmax = p
if isinstance(p, str):
if '-' in p:
p = p.split('-')
minmax = tuple(map(float, p))
parsed.append(minmax)
self['area_range'] = parsed
[docs]
class CocoEvaluator(object):
"""
Abstracts the evaluation process to execute on two coco datasets.
This can be run as a standalone script where the user specifies the paths
to the true and predited dataset explicitly, or this can be used by a
higher level script that produces the predictions and then sends them to
this evaluator.
Example:
>>> from kwcoco.coco_evaluator import CocoEvaluator
>>> from kwcoco.demo.perterb import perterb_coco
>>> import kwcoco
>>> true_dset = kwcoco.CocoDataset.demo('shapes8')
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': (0, 10),
>>> 'n_fn': (0, 10),
>>> 'with_probs': True,
>>> }
>>> pred_dset = perterb_coco(true_dset, **kwargs)
>>> config = {
>>> 'true_dataset': true_dset,
>>> 'pred_dataset': pred_dset,
>>> 'classes_of_interest': [],
>>> }
>>> coco_eval = CocoEvaluator(config)
>>> results = coco_eval.evaluate()
"""
def __init__(coco_eval, config):
coco_eval.config = CocoEvalConfig(**config)
coco_eval._is_init = False
coco_eval._logs = []
coco_eval._verbose = 1
[docs]
def log(coco_eval, msg, level='INFO'):
if coco_eval._verbose:
print(msg)
coco_eval._logs.append((level, msg))
[docs]
def _init(coco_eval):
"""
Performs initial coercion from given inputs into dictionaries of
kwimage.Detection objects and attempts to ensure comparable category
and image ids.
"""
import numpy as np
# TODO: coerce into a cocodataset form if possible
# TODO: Optionally:
# * validate the input coco files,
# * ensure that the score attribute on the predictions exists.
# * ensure there is more than one category.
coco_eval.log('init truth dset')
# FIXME: What is the image names line up correctly, but the image ids
# do not? This will be common if an external detector is used.
load_workers = coco_eval.config['load_workers']
gid_to_true, true_extra = CocoEvaluator._coerce_dets(
coco_eval.config['true_dataset'], workers=load_workers)
coco_eval.log('init pred dset')
gid_to_pred, pred_extra = CocoEvaluator._coerce_dets(
coco_eval.config['pred_dataset'], workers=load_workers)
if coco_eval.config['use_image_names']:
# TODO: currently this is a hacky implementation that modifies the
# pred dset, we should not do that, just store a gid mapping.
# TODO: we will port the watch associate-images functionality soon,
# which should supercede this.
pred_to_true_gid = {}
true_coco = true_extra['coco_dset']
pred_coco = pred_extra['coco_dset']
for gid, true_img in true_coco.imgs.items():
fname = true_img['file_name']
if fname not in pred_coco.index.file_name_to_img:
continue
pred_img = pred_coco.index.file_name_to_img[fname]
pred_to_true_gid[pred_img['id']] = true_img['id']
if not pred_to_true_gid:
raise Exception('FAILED TO MAP IMAGE NAMES')
unused_pred_gids = set(pred_coco.imgs.keys()) - set(pred_to_true_gid.keys())
pred_coco.remove_images(unused_pred_gids)
new_gid_to_pred = {}
for pred_img in pred_coco.imgs.values():
old_gid = pred_img['id']
new_gid = pred_to_true_gid[old_gid]
pred = gid_to_pred[old_gid]
pred_img['id'] = new_gid
for ann in pred_coco.annots(gid=old_gid).objs:
ann['image_id'] = new_gid
new_gid_to_pred[new_gid] = pred
gid_to_pred = new_gid_to_pred
pred_coco._build_index()
pred_gids = sorted(gid_to_pred.keys())
true_gids = sorted(gid_to_true.keys())
gids = list(set(pred_gids) & set(true_gids))
true_classes = ub.peek(gid_to_true.values()).classes
pred_classes = ub.peek(gid_to_pred.values()).classes
if 0:
import xdev
xdev.set_overlaps(set(true_gids), set(pred_gids), 'true', 'pred')
classes, unified_cid_maps = CocoEvaluator._rectify_classes(
true_classes, pred_classes)
true_to_unified_cid = unified_cid_maps['true']
pred_to_unified_cid = unified_cid_maps['pred']
# Helper infor for mapping predicted probabilities
pred_new_idxs = []
pred_old_idxs = []
for old_idx, old_node in enumerate(pred_classes.idx_to_node):
old_cid = pred_classes.node_to_id[old_node]
new_cid = pred_to_unified_cid[old_cid]
new_idx = classes.id_to_idx[new_cid]
pred_old_idxs.append(old_idx)
pred_new_idxs.append(new_idx)
needs_prob_remap = (
(pred_new_idxs == pred_old_idxs) or
(len(classes) != len(pred_classes))
) or True
# Move truth to the unified class indices
for gid in ub.ProgIter(gids, desc='Rectify truth class idxs'):
det = gid_to_true[gid]
new_classes = classes
old_classes = det.meta['classes']
old_cidxs = det.data['class_idxs']
old_cids = [None if cx is None else old_classes.idx_to_id[cx] for cx in old_cidxs]
new_cids = [None if cid is None else true_to_unified_cid.get(cid, cid) for cid in old_cids]
new_cidxs = np.array([None if c is None else new_classes.id_to_idx[c] for c in new_cids])
det.meta['classes'] = new_classes
det.data['class_idxs'] = new_cidxs
# Move predictions to the unified class indices
for gid in ub.ProgIter(gids, desc='Rectify pred class idxs'):
det = gid_to_pred[gid]
new_classes = classes
old_classes = det.meta['classes']
old_cidxs = det.data['class_idxs']
old_cids = [None if cx is None else old_classes.idx_to_id[cx] for cx in old_cidxs]
new_cids = [None if cid is None else pred_to_unified_cid.get(cid, cid) for cid in old_cids]
new_cidxs = np.array([None if c is None else new_classes.id_to_idx[c] for c in new_cids])
det.meta['classes'] = new_classes
det.data['class_idxs'] = new_cidxs
if needs_prob_remap and 'probs' in det.data:
# Ensure predicted probabilities are in the unified class space
old_probs = det.data['probs']
new_probs = np.zeros_like(old_probs, shape=(len(old_probs), len(classes)))
if len(new_probs):
new_probs[:, pred_new_idxs] = old_probs[:, pred_old_idxs]
det.data['probs'] = new_probs
coco_eval.gids = gids
coco_eval.classes = classes
coco_eval.gid_to_true = gid_to_true
coco_eval.gid_to_pred = gid_to_pred
coco_eval.true_extra = true_extra
coco_eval.pred_extra = pred_extra
coco_eval._is_init = True
[docs]
def _ensure_init(coco_eval):
if not coco_eval._is_init:
coco_eval._init()
[docs]
@classmethod
def _rectify_classes(coco_eval, true_classes, pred_classes):
import kwcoco
# Determine if truth and model classes are compatible, attempt to remap
# if possible.
errors = []
for node1, id1 in true_classes.node_to_id.items():
if id1 in pred_classes.id_to_node:
node2 = pred_classes.id_to_node[id1]
if node1 != node2:
errors.append(
'id={} exists in pred and true but have '
'different names, {}, {}'.format(id1, node1, node2))
if node1 in pred_classes.node_to_id:
id2 = pred_classes.node_to_id[node1]
if id1 != id2:
errors.append(
'node={} exists in pred and true but have '
'different ids, {}, {}'.format(node1, id1, id2))
# TODO: determine if the class ids are the same, in which case we dont
# need to do unification.
# mappings to unified cids
unified_cid_maps = {
'true': {},
'pred': {},
}
def _normalize_name(name):
return name.lower().replace(' ', '_')
pred_norm = {_normalize_name(name): name for name in pred_classes}
true_norm = {_normalize_name(name): name for name in true_classes}
# TODO: remove background hack, use "implicit_negative_classes" or "negative_classes"
unified_names = list(ub.unique(['background'] + list(pred_norm) + list(true_norm)))
classes = kwcoco.CategoryTree.coerce(unified_names)
# raise Exception('\n'.join(errors))
for true_name, true_cid in true_classes.node_to_id.items():
true_norm_name = _normalize_name(true_name)
cid = classes.node_to_id[true_norm_name]
unified_cid_maps['true'][true_cid] = cid
for pred_name, pred_cid in pred_classes.node_to_id.items():
pred_norm_name = _normalize_name(pred_name)
cid = classes.node_to_id[pred_norm_name]
unified_cid_maps['pred'][pred_cid] = cid
# if errors:
# graph2 = pred_classes.graph.copy()
# for node1, id1 in true_classes.node_to_id.items():
# if node1 not in pred_classes.node_to_id:
# graph2.add_node(node1, id=id1)
# classes = kwcoco.CategoryTree(graph2)
return classes, unified_cid_maps
[docs]
@classmethod
def _coerce_dets(CocoEvaluator, dataset, verbose=0, workers=0):
"""
Coerce the input to a mapping from image-id to kwimage.Detection
Also capture a CocoDataset if possible.
Returns:
Tuple[Dict[int, Detections], Dict]:
gid_to_det: mapping from gid to dets
extra: any extra information we gathered via coercion
Example:
>>> from kwcoco.coco_evaluator import * # NOQA
>>> import kwcoco
>>> coco_dset = kwcoco.CocoDataset.demo('shapes8')
>>> gid_to_det, extras = CocoEvaluator._coerce_dets(coco_dset)
Example:
>>> # xdoctest: +REQUIRES(module:sqlalchemy)
>>> from kwcoco.coco_evaluator import * # NOQA
>>> import kwcoco
>>> coco_dset = kwcoco.CocoDataset.demo('shapes8').view_sql()
>>> gid_to_det, extras = CocoEvaluator._coerce_dets(coco_dset)
"""
# coerce the input into dictionary of detection objects.
import kwcoco
import kwimage
import numpy as np
import glob
try:
import ndsampler
COCO_SAMPLER_CLS = ndsampler.CocoSampler
except Exception:
COCO_SAMPLER_CLS = None
# We only need the box locations, but if we can coerce extra
# information we will maintain that as well.
extra = {}
if isinstance(dataset, dict):
if len(dataset):
first = ub.peek(dataset.values())
if isinstance(first, kwimage.Detections):
# We got what we wanted
gid_to_det = dataset
else:
raise NotImplementedError
else:
gid_to_det = {}
elif isinstance(dataset, kwcoco.AbstractCocoDataset):
extra['coco_dset'] = coco_dset = dataset
gid_to_det = {}
gids = sorted(coco_dset.imgs.keys())
classes = coco_dset.object_categories()
for gid in ub.ProgIter(gids, desc='convert coco to dets'):
aids = list(coco_dset.index.gid_to_aids[gid])
anns = [coco_dset.anns[aid] for aid in aids]
cids = [a['category_id'] for a in anns]
# remap truth cids to be consistent with "classes"
# cids = [cid_true_to_pred.get(cid, cid) for cid in cids]
cxs = np.array([None if c is None else classes.id_to_idx[c] for c in cids])
ssegs = [a.get('segmentation') for a in anns]
weights = [a.get('weight', 1) for a in anns]
# Is defaulting to NAN correct here?
default_score = 1.0
# default_score = np.nan
scores = [a.get('score', default_score) for a in anns]
kw = {}
if all('prob' in a for a in anns):
# TODO: can we ensure the probs are always in the proper
# order here? I think they are, but I'm not 100% sure.
kw['probs'] = [a['prob'] for a in anns]
dets = kwimage.Detections(
boxes=kwimage.Boxes([a['bbox'] for a in anns], 'xywh'),
segmentations=ssegs,
class_idxs=cxs,
classes=classes,
weights=np.array(weights),
scores=np.array(scores),
aids=np.array(aids),
datakeys=['aids'],
**kw,
).numpy()
gid_to_det[gid] = dets
elif COCO_SAMPLER_CLS and isinstance(dataset, COCO_SAMPLER_CLS):
# Input is an ndsampler.CocoSampler object
extra['sampler'] = sampler = dataset
coco_dset = sampler.dset
gid_to_det, _extra = CocoEvaluator._coerce_dets(
coco_dset, verbose, workers=workers)
extra.update(_extra)
elif isinstance(dataset, (str, os.PathLike)):
if exists(dataset):
# on-disk detections
if isdir(dataset):
if verbose:
print('Loading mscoco directory')
# directory of predictions
extra['coco_dpath'] = coco_dpath = dataset
pat = ub.Path(coco_dpath) / '**/*.json'
coco_fpaths = sorted(glob.glob(os.fspath(pat), recursive=True))
dets, coco_dset = _load_dets(coco_fpaths, workers=workers)
extra['coco_dset'] = coco_dset
# coco_dset = kwcoco.CocoDataset.from_coco_paths(
# coco_fpaths, max_workers=6, verbose=1, mode='process')
gid_to_det = {d.meta['gid']: d for d in dets}
elif isfile(dataset):
# mscoco file
if verbose:
print('Loading mscoco file')
extra['dataset_fpath'] = coco_fpath = dataset
coco_dset = kwcoco.CocoDataset(coco_fpath)
gid_to_det, _extra = CocoEvaluator._coerce_dets(
coco_dset, verbose, workers=workers)
extra.update(_extra)
else:
raise NotImplementedError
else:
raise Exception('{!r} does not exist'.format(dataset))
else:
raise TypeError('Unknown dataset type: {!r}'.format(type(dataset)))
return gid_to_det, extra
[docs]
def _build_dmet(coco_eval):
"""
Builds the detection metrics object
Returns:
DetectionMetrics - object that can perform assignment and
build confusion vectors.
Ignore:
dmet = coco_eval._build_dmet()
"""
from kwcoco.metrics import DetectionMetrics
coco_eval._ensure_init()
classes = coco_eval.classes
gid_to_true = coco_eval.gid_to_true
gid_to_pred = coco_eval.gid_to_pred
if 0:
true_names = []
for det in coco_eval.gid_to_true.values():
class_idxs = det.data['class_idxs']
cnames = list(ub.take(det.meta['classes'], class_idxs))
true_names += cnames
ub.dict_hist(true_names)
pred_names = []
for det in coco_eval.gid_to_pred.values():
class_idxs = det.data['class_idxs']
cnames = list(ub.take(det.meta['classes'], class_idxs))
pred_names += cnames
ub.dict_hist(pred_names)
dmet = DetectionMetrics(classes=classes)
for gid in ub.ProgIter(coco_eval.gids):
pred_dets = gid_to_pred[gid]
true_dets = gid_to_true[gid]
dmet.add_predictions(pred_dets, gid=gid)
dmet.add_truth(true_dets, gid=gid)
if 0:
voc_info = dmet.score_voc(ignore_classes='ignore')
print('voc_info = {!r}'.format(voc_info))
return dmet
[docs]
def evaluate(coco_eval):
"""
Executes the main evaluation logic. Performs assignments between
detections to make DetectionMetrics object, then creates per-item and
ovr confusion vectors, and performs various threshold-vs-confusion
analyses.
Returns:
CocoResults: container storing (and capable of drawing /
serializing) results
"""
import platform
import kwarray
import numpy as np
coco_eval.log('evaluating')
# print('coco_eval.config = {}'.format(ub.urepr(dict(coco_eval.config), nl=3)))
dmet = coco_eval._build_dmet()
# Ignore any categories with too few tests instances
classes = coco_eval.classes
negative_classes = coco_eval.config['implicit_negative_classes']
classes_of_interest = coco_eval.config['classes_of_interest']
ignore_classes = set(coco_eval.config['implicit_ignore_classes'])
if coco_eval.config['ignore_classes']:
ignore_classes.update(coco_eval.config['ignore_classes'])
if classes_of_interest:
ignore_classes.update(set(classes) - set(classes_of_interest))
coco_eval.log('negative_classes = {!r}'.format(negative_classes))
coco_eval.log('classes_of_interest = {!r}'.format(classes_of_interest))
coco_eval.log('ignore_classes = {!r}'.format(ignore_classes))
area_ranges = coco_eval.config['area_range']
iou_thresholds = coco_eval.config['iou_thresh']
if not ub.iterable(iou_thresholds):
iou_thresholds = [iou_thresholds]
if not area_ranges:
area_ranges = ['all']
print('Building confusion vectors')
if coco_eval.config['force_pycocoutils']:
# TODO: extract the PR curves from pycocotools
coco_scores = dmet.score_pycocotools(
verbose=3,
with_evaler=True,
with_confusion=True,
iou_thresholds=iou_thresholds,
# max_dets=coco_eval.config['max_dets'],
)
iou_to_cfsn_vecs = coco_scores['iou_to_cfsn_vecs']
else:
iou_to_cfsn_vecs = dmet.confusion_vectors(
ignore_classes=ignore_classes,
compat=coco_eval.config['compat'],
iou_thresh=iou_thresholds,
workers=coco_eval.config['assign_workers'],
bias=coco_eval.config['iou_bias'],
max_dets=coco_eval.config['max_dets'],
)
# Remove large datasets values in configs that are not file references
base_meta = dict(coco_eval.config)
if not isinstance(base_meta['true_dataset'], str):
base_meta['true_dataset'] = '<not-a-file-ref>'
if not isinstance(base_meta['pred_dataset'], str):
base_meta['pred_dataset'] = '<not-a-file-ref>'
# Add machine-specific metadata
base_meta['hostname'] = platform.node()
base_meta['timestamp'] = ub.timestamp()
resdata = {}
for iou_thresh in iou_thresholds:
cfsn_vecs = iou_to_cfsn_vecs[iou_thresh]
print('cfsn_vecs = {!r}'.format(cfsn_vecs))
print(cfsn_vecs.data.pandas())
# NOTE: translating to classless confusion vectors only works when
# compat='all', otherwise we would need to redo confusion vector
# computation.
measurekw = dict(
fp_cutoff=coco_eval.config['fp_cutoff'],
monotonic_ppv=coco_eval.config['monotonic_ppv'],
ap_method=coco_eval.config['ap_method'],
)
orig_weights = cfsn_vecs.data['weight'].copy()
weight_gen = dmet_area_weights(dmet, orig_weights, cfsn_vecs, area_ranges, coco_eval)
for minmax_key, minmax, new_weights in weight_gen:
cfsn_vecs.data['weight'] = new_weights
# Get classless and ovr binary detection measures
nocls_binvecs = cfsn_vecs.binarize_classless(negative_classes=negative_classes)
ovr_binvecs = cfsn_vecs.binarize_ovr(
ignore_classes=ignore_classes, approx=0, mode=1)
nocls_measures = nocls_binvecs.measures(**measurekw)
ovr_measures = ovr_binvecs.measures(**measurekw)['perclass']
# print('minmax = {!r}'.format(minmax))
# print('nocls_measures = {}'.format(ub.urepr(nocls_measures, nl=1, align=':')))
# print('ovr_measures = {!r}'.format(ovr_measures))
meta = base_meta.copy()
meta['iou_thresh'] = iou_thresh
meta['area_minmax'] = minmax
result = CocoSingleResult(
nocls_measures, ovr_measures, cfsn_vecs, meta,
)
# TODO: when making the ovr localization curves, it might be a good
# idea to include a second version where any COI prediction assigned
# to a non-COI truth is given a weight of zero, so we can focus on
# our TPR and FPR with respect to the COI itself and the background.
# This metric is useful when we assume we have a subsequent classifier.
if classes_of_interest:
ovr_binvecs2 = cfsn_vecs.binarize_ovr(ignore_classes=ignore_classes)
for key, vecs in ovr_binvecs2.cx_to_binvecs.items():
cx = cfsn_vecs.classes.index(key)
vecs.data['weight'] = vecs.data['weight'].copy()
assert not np.may_share_memory(
ovr_binvecs[key].data['weight'],
vecs.data['weight'])
# Find locations where the predictions or truth was COI
pred_coi = cfsn_vecs.data['pred'] == cx
# Find truth locations that are either background or this COI
true_coi_or_bg = kwarray.isect_flags(
cfsn_vecs.data['true'], {cx, -1})
# Find locations where we predicted this COI, but truth was a
# valid classes, but not this non-COI
ignore_flags = (pred_coi & (~true_coi_or_bg))
vecs.data['weight'][ignore_flags] = 0
ovr_measures2 = ovr_binvecs2.measures(**measurekw)['perclass']
# print('ovr_measures2 = {!r}'.format(ovr_measures2))
result.ovr_measures2 = ovr_measures2
reskey = ub.urepr(
dict(area_range=minmax_key, iou_thresh=iou_thresh),
nl=0, explicit=1, itemsep='', nobr=1, sv=1)
resdata[reskey] = result
if coco_eval.config['force_pycocoutils']:
resdata['pct_stats'] = coco_scores['evalar_stats']
print('reskey = {!r}'.format(reskey))
print('result = {!r}'.format(result))
results = CocoResults(resdata)
return results
[docs]
def dmet_area_weights(dmet, orig_weights, cfsn_vecs, area_ranges, coco_eval,
use_area_attr=False):
"""
Hacky function to compute confusion vector ignore weights for different
area thresholds. Needs to be slightly refactored.
"""
import kwarray
import numpy as np
if use_area_attr:
coco_true = coco_eval.true_extra['coco_dset']
try:
coco_pred = coco_eval.pred_extra['coco_dset']
except Exception:
pass
# Basic logic to handle area-range by weight modification.
for minmax_key in area_ranges:
if isinstance(minmax_key, str):
if minmax_key == 'small':
minmax = [0 ** 2, 32 ** 2]
elif minmax_key == 'medium':
minmax = [32 ** 2, 96 ** 2]
elif minmax_key == 'large':
minmax = [96 ** 2, 1e5 ** 2]
elif minmax_key == 'all':
minmax = [0, float('inf')]
else:
raise KeyError(minmax_key)
else:
minmax = minmax_key
area_min, area_max = minmax
gids, groupxs = kwarray.group_indices(cfsn_vecs.data['gid'])
new_ignore = np.zeros(len(cfsn_vecs.data), dtype=bool)
for gid, groupx in zip(gids, groupxs):
if use_area_attr:
# Use coco area attribute (if available)
try:
true_dets = dmet.gid_to_true_dets[gid]
true_annots = coco_true.annots(true_dets.data['aids'])
true_area = np.array(true_annots.lookup('area'))
except Exception:
if use_area_attr != 'try':
raise
# Yet another pycocotools inconsistency:
# We typically dont have segmentation area for predictions,
# so we have to use bbox area for predictions (which only
# matters if they are not assigned to a truth)
try:
pred_dets = dmet.gid_to_pred_dets[gid]
pred_annots = coco_pred.annots(pred_dets.data['aids'])
pred_area = np.array(pred_annots.lookup('area'))
except Exception:
if use_area_attr != 'try':
warnings.warn('Predictions do not have area attributes')
pred_area = dmet.gid_to_pred_dets[gid].boxes.area
else:
true_area = dmet.gid_to_true_dets[gid].boxes.area
pred_area = dmet.gid_to_pred_dets[gid].boxes.area
# pycocotools is inclusive (valid if min <= area <= max) on both
# ends of the area range so we are following that here as well.
# Ignore any truth outside the area bounds
txs = cfsn_vecs.data['txs'][groupx]
tx_flags = txs > -1
tarea = true_area[txs[tx_flags]].ravel()
is_toob = ((tarea < area_min) | (tarea > area_max))
toob_idxs = groupx[tx_flags][is_toob]
# Ignore any *unassigned* prediction outside the area bounds
pxs = cfsn_vecs.data['pxs'][groupx]
px_flags = (pxs > -1) & (txs < 0)
parea = pred_area[pxs[px_flags]].ravel()
is_poob = ((parea < area_min) | (parea > area_max))
poob_idxs = groupx[px_flags][is_poob]
new_ignore[poob_idxs] = True
new_ignore[toob_idxs] = True
new_weights = orig_weights.copy()
new_weights[new_ignore] = 0
yield minmax_key, minmax, new_weights
[docs]
class CocoResults(ub.NiceRepr, DictProxy):
"""
CommandLine:
xdoctest -m /home/joncrall/code/kwcoco/kwcoco/coco_evaluator.py CocoResults --profile
Example:
>>> from kwcoco.coco_evaluator import * # NOQA
>>> from kwcoco.coco_evaluator import CocoEvaluator
>>> import kwcoco
>>> true_dset = kwcoco.CocoDataset.demo('shapes2')
>>> from kwcoco.demo.perterb import perterb_coco
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': (0, 10),
>>> 'n_fn': (0, 10),
>>> }
>>> pred_dset = perterb_coco(true_dset, **kwargs)
>>> print('true_dset = {!r}'.format(true_dset))
>>> print('pred_dset = {!r}'.format(pred_dset))
>>> config = {
>>> 'true_dataset': true_dset,
>>> 'pred_dataset': pred_dset,
>>> 'area_range': ['small'],
>>> 'iou_thresh': [0.3],
>>> }
>>> coco_eval = CocoEvaluator(config)
>>> results = coco_eval.evaluate()
>>> # Now we can draw / serialize the results as we please
>>> dpath = ub.Path.appdir('kwcoco/tests/test_out_dpath').ensuredir()
>>> #
>>> # test deserialization works
>>> state = results.__json__()
>>> self2 = CocoResults.from_json(state)
>>> #
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> results.dump_figures(dpath, figsize=(3, 2), tight=False) # make this go faster
>>> results.dump(dpath / 'metrics.json', indent=' ')
"""
def __init__(results, resdata=None):
results.proxy = resdata
def __json__(results):
from kwcoco.util.util_json import ensure_json_serializable
state = {
k: (ensure_json_serializable(v)
if not hasattr(v, '__json__') else v.__json__())
for k, v in results.items()
}
# ensure_json_serializable(state, normalize_containers=True, verbose=0)
return state
[docs]
@classmethod
def from_json(cls, state):
self = cls({})
for key, val in state.items():
result = CocoSingleResult.from_json(val)
self[key] = result
return self
[docs]
def dump(result, file, indent=' '):
"""
Serialize to json file
"""
import json
import pathlib
if isinstance(file, (str, pathlib.Path)):
with open(file, 'w') as fp:
return result.dump(fp, indent=indent)
else:
state = result.__json__()
json.dump(state, file, indent=indent)
[docs]
class CocoSingleResult(ub.NiceRepr):
"""
Container class to store, draw, summarize, and serialize results from
CocoEvaluator.
Example:
>>> # xdoctest: +REQUIRES(--slow)
>>> from kwcoco.coco_evaluator import * # NOQA
>>> from kwcoco.coco_evaluator import CocoEvaluator
>>> import kwcoco
>>> true_dset = kwcoco.CocoDataset.demo('shapes8')
>>> from kwcoco.demo.perterb import perterb_coco
>>> kwargs = {
>>> 'box_noise': 0.2,
>>> 'n_fp': (0, 3),
>>> 'n_fn': (0, 3),
>>> 'with_probs': False,
>>> }
>>> pred_dset = perterb_coco(true_dset, **kwargs)
>>> print('true_dset = {!r}'.format(true_dset))
>>> print('pred_dset = {!r}'.format(pred_dset))
>>> config = {
>>> 'true_dataset': true_dset,
>>> 'pred_dataset': pred_dset,
>>> 'area_range': [(0, 32 ** 2), (32 ** 2, 96 ** 2)],
>>> 'iou_thresh': [0.3, 0.5, 0.95],
>>> }
>>> coco_eval = CocoEvaluator(config)
>>> results = coco_eval.evaluate()
>>> result = ub.peek(results.values())
>>> state = result.__json__()
>>> print('state = {}'.format(ub.urepr(state, nl=-1)))
>>> recon = CocoSingleResult.from_json(state)
>>> state = recon.__json__()
>>> print('state = {}'.format(ub.urepr(state, nl=-1)))
"""
def __init__(result, nocls_measures, ovr_measures, cfsn_vecs, meta=None):
result.nocls_measures = nocls_measures
result.ovr_measures = ovr_measures
result.cfsn_vecs = cfsn_vecs
result.meta = meta
def __nice__(result):
text = ub.urepr({
'nocls_measures': result.nocls_measures,
'ovr_measures': result.ovr_measures,
}, sv=1)
return text
[docs]
@classmethod
def from_json(cls, state):
from kwcoco.metrics.confusion_vectors import Measures
from kwcoco.metrics.confusion_vectors import PerClass_Measures
state['nocls_measures'] = Measures.from_json(state['nocls_measures'])
state['ovr_measures'] = PerClass_Measures.from_json(state['ovr_measures'])
if state.get('cfsn_vecs', None):
from kwcoco.metrics.confusion_vectors import ConfusionVectors
state['cfsn_vecs'] = ConfusionVectors.from_json(state['cfsn_vecs'])
else:
state['cfsn_vecs'] = None
self = cls(**state)
return self
def __json__(result):
state = {
'nocls_measures': result.nocls_measures.__json__(),
'ovr_measures': result.ovr_measures.__json__(),
# 'cfsn_vecs': result.cfsn_vecs.__json__(),
'meta': result.meta,
}
from kwcoco.util.util_json import ensure_json_serializable
ensure_json_serializable(state, normalize_containers=True, verbose=0)
return state
[docs]
def dump(result, file, indent=' '):
"""
Serialize to json file
"""
import json
try:
fpath = os.fspath(file)
except TypeError:
state = result.__json__()
json.dump(state, file, indent=indent)
else:
import safer
with safer.open(fpath, 'w', temp_file=not ub.WIN32) as fp:
return result.dump(fp, indent=indent)
[docs]
@profile
def _writefig(fig, metrics_dpath, fname, figsize, verbose, tight):
fig_fpath = ub.Path(metrics_dpath) / fname
if verbose:
print('write fig_fpath = {!r}'.format(fig_fpath))
fig.set_size_inches(figsize)
if tight:
fig.tight_layout()
fig.savefig(fig_fpath)
[docs]
def _load_dets(pred_fpaths, workers=0):
"""
Example:
>>> from kwcoco.coco_evaluator import _load_dets, _load_dets_worker
>>> import ubelt as ub
>>> import kwcoco
>>> dpath = ub.Path.appdir('kwcoco/tests/load_dets').ensuredir()
>>> N = 4
>>> pred_fpaths = []
>>> for i in range(1, N + 1):
>>> dset = kwcoco.CocoDataset.demo('shapes{}'.format(i))
>>> dset.fpath = dpath / 'shapes_{}.mscoco.json'.format(i)
>>> dset.dump(dset.fpath)
>>> pred_fpaths.append(dset.fpath)
>>> dets, coco_dset = _load_dets(pred_fpaths)
>>> print('dets = {!r}'.format(dets))
>>> print('coco_dset = {!r}'.format(coco_dset))
"""
# Process mode is much faster than thread.
import kwcoco
jobs = ub.JobPool(mode='process', max_workers=workers)
for single_pred_fpath in ub.ProgIter(pred_fpaths, desc='submit load dets jobs'):
job = jobs.submit(_load_dets_worker, single_pred_fpath, with_coco=True)
results = []
for job in ub.ProgIter(jobs.jobs, total=len(jobs), desc='loading cached dets'):
results.append(job.result())
dets = [r[0] for r in results]
pred_cocos = [r[1] for r in results]
coco_dset = kwcoco.CocoDataset.union(*pred_cocos)
return dets, coco_dset
[docs]
def _load_dets_worker(single_pred_fpath, with_coco=True):
"""
Ignore:
>>> from kwcoco.coco_evaluator import _load_dets, _load_dets_worker
>>> import ubelt as ub
>>> import kwcoco
>>> dpath = ub.Path.appdir('kwcoco/tests/load_dets').ensuredir()
>>> dset = kwcoco.CocoDataset.demo('shapes8')
>>> dset.fpath = dpath / 'shapes8.mscoco.json'
>>> dset.dump(dset.fpath)
>>> single_pred_fpath = dset.fpath
>>> dets, coco = _load_dets_worker(single_pred_fpath)
>>> print('dets = {!r}'.format(dets))
>>> print('coco = {!r}'.format(coco))
"""
import kwcoco
import kwimage
single_img_coco = kwcoco.CocoDataset(single_pred_fpath, autobuild=False)
dets = kwimage.Detections.from_coco_annots(single_img_coco.dataset['annotations'],
dset=single_img_coco)
if len(single_img_coco.dataset['images']) == 1:
# raise Exception('Expected predictions for a single image only')
gid = single_img_coco.dataset['images'][0]['id']
dets.meta['gid'] = gid
else:
warnings.warn('Loading dets with muliple images, must track gids carefully')
if with_coco:
return dets, single_img_coco
else:
return dets
if __name__ == '__main__':
"""
CommandLine:
python ~/code/kwcoco/kwcoco/coco_evaluator.py
"""
from kwcoco.cli import coco_eval as coco_eval_cli
coco_eval_cli.main()