#!/usr/bin/env python3
"""
Experimental support for kwcoco-only.
Compute semantic segmentation evaluation metrics
TODO::
- RRMSE (relative root mean squared error) RMSE normalized by root mean square value where each residual is scaled against the actual value
sqrt((1 / n) * sum((y - y_hat) ** 2) / sum(y ** 2))
"""
import json
import kwarray
import kwcoco
import kwimage
import numpy as np
import os
import pandas as pd
import sklearn.metrics as skm
import ubelt as ub
import warnings
from kwcoco.coco_evaluator import CocoSingleResult
from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors
from kwcoco.metrics.confusion_measures import OneVersusRestMeasureCombiner
from kwcoco.metrics.confusion_vectors import OneVsRestConfusionVectors
from kwcoco.metrics.confusion_measures import MeasureCombiner
from kwcoco.metrics.confusion_measures import Measures
from typing import Dict
import scriptconfig as scfg
from shapely.ops import unary_union
from kwcoco.util.util_kwutil import _DelayedBlockingJobQueue, _MaxQueuePool
# The colors I traditionally use for truth and predictions
# TRUE_GREEN = 'limegreen'
# PRED_BLUE = 'dodgerblue'
# If we have a recent kwimage we can use kitware colors, which look pretty good
# in these roles too.
TRUE_GREEN = 'kitware_green'
PRED_BLUE = 'kitware_blue'
# Old scheme
# CONFUSION_COLOR_SCHEME = {
# 'TN': 'black',
# # 'TP': 'white',
# # 'TP': 'snow', # off white
# 'TP': 'whitesmoke', # off white
# 'FN': 'teal',
# 'FP': 'red',
# }
# Agree with colors in cli/coco_eval
#'pred_false_positive': kwimage.Color.coerce('kitware_red').ashex(),
#'pred_true_positive': kwimage.Color.coerce('kitware_blue').ashex(),
#'true_false_negative': kwimage.Color.coerce('purple').ashex(),
#'true_true_positive': kwimage.Color.coerce('kitware_green').ashex(),
CONFUSION_COLOR_SCHEME = {
'FP': '#f42836',
# 'pred_true_positive': '#0068c7',
'FN': '#800080',
'TP': '#3eae2b',
'TN': 'black',
# None: '#242a37',
}
# TODO: parameterize these class categories
# TODO: remove and generalize before porting to kwcoco
IGNORE_CLASSNAMES = {'ignore', 'Unknown'}
BACKGROUND_CLASSES = {'background'}
NEGATIVE_CLASSES = {'negative'}
UNDISTINGUISHED_CLASSES = {'positive'}
CONTEXT_CLASSES = {}
[docs]
class SegmentationEvalConfig(scfg.DataConfig):
"""
Evaluation script for change/segmentation task
"""
true_dataset = scfg.Value(None, help='path to the groundtruth dataset', tags=['in_path'])
pred_dataset = scfg.Value(None, help='path to the predicted dataset', tags=['in_path'])
eval_dpath = scfg.Value(None, help='directory to dump results', tags=['out_path'])
eval_fpath = scfg.Value(None, help='path to dump result summary', tags=['out_path', 'primary'])
score_space = scfg.Value('auto', help=ub.paragraph(
'''
can score in image or video space. If auto, chooses video if there are
any, otherwise image
'''), tags=['algo_param'])
resolution = scfg.Value(None, help=ub.paragraph(
'''
if specified, override the default resolution to score at
'''), tags=['algo_param'])
balance_area = scfg.Value(False, isflag=True, help=ub.paragraph(
'''
Upweight small instances, downweight large instances.
Can be:
* foreground_independent - this operates on each image
independently. The weight is balanced across all foreground
objects in an image. Background weights are not changed.
'''), tags=['algo_param'])
thresh_bins = scfg.Value(32 * 32, help='threshold resolution.', tags=['algo_param'])
salient_channel = scfg.Value('salient', help='channel that is the positive class', tags=['algo_param'])
# options
draw_curves = scfg.Value('auto', help='flag to draw curves or not', tags=['perf_param'])
draw_heatmaps = scfg.Value('auto', help='flag to draw heatmaps or not', tags=['perf_param'])
draw_legend = scfg.Value(True, help='enable/disable the class legend', tags=['perf_param'])
draw_weights = scfg.Value(False, help='enable/disable pixel weight visualization', tags=['perf_param'])
draw_components = scfg.Value(False, help='if True, draw individual components of the heatmap plots', tags=['perf_param'])
draw_burnin = scfg.Value(False, help='if True, burn in text on images for label purposes', tags=['perf_param'])
viz_thresh = scfg.Value('auto', help='visualization threshold')
workers = scfg.Value('auto', help='number of parallel scoring workers', tags=['perf_param'])
draw_workers = scfg.Value('auto', help='number of parallel drawing workers', tags=['perf_param'])
select_images = scfg.Value(
None, type=str, help=ub.paragraph(
'''
A json query (via the jq spec) that specifies which images
belong in the subset. Note, this is a passed as the body of
the following jq query format string to filter valid ids
'.images[] | select({select_images}) | .id'.
Examples for this argument are as follows:
'.id < 3' will select all image ids less than 3.
'.file_name | test(".*png")' will select only images with
file names that end with png.
'.file_name | test(".*png") | not' will select only images
with file names that do not end with png.
'.myattr == "foo"' will select only image dictionaries
where the value of myattr is "foo".
'.id < 3 and (.file_name | test(".*png"))' will select only
images with id less than 3 that are also pngs.
.myattr | in({"val1": 1, "val4": 1}) will take images
where myattr is either val1 or val4.
Requires the "jq" python library is installed.
'''))
select_videos = scfg.Value(
None, help=ub.paragraph(
'''
A json query (via the jq spec) that specifies which videos
belong in the subset. Note, this is a passed as the body of
the following jq query format string to filter valid ids
'.videos[] | select({select_images}) | .id'.
Examples for this argument are as follows:
'.name | startswith("foo")' will select only videos
where the name starts with foo.
Only applicable for dataset that contain videos.
Requires the "jq" python library is installed.
'''))
[docs]
def main(cmdline=True, **kwargs):
"""
Entry point: todo: doctest and CLI structure
todo: ProcessContext to track resource usage
"""
full_config = SegmentationEvalConfig.cli(
cmdline=cmdline, data=kwargs, strict=True)
import rich
rich.print('full_config = {}'.format(ub.urepr(full_config, nl=1)))
full_config = ub.udict(full_config)
true_coco = kwcoco.CocoDataset.coerce(full_config['true_dataset'])
pred_coco = kwcoco.CocoDataset.coerce(full_config['pred_dataset'])
eval_fpath = full_config['eval_fpath']
eval_dpath = full_config['eval_dpath']
config = full_config - {
'true_dataset', 'pred_dataset', 'eval_dpath', 'eval_fpath'}
evaluate_segmentations(true_coco, pred_coco, eval_dpath, eval_fpath,
config)
[docs]
class SingleImageSegmentationMetrics:
"""
Helper class which is a refactored version of an old function to compute
segmentation metrics between a single predicted and true image.
Args:
true_coco_img (kwcoco.CocoImage): detached true coco image
pred_coco_img (kwcoco.CocoImage): detached predicted coco image
thresh_bins (int): if specified rounds scores into this many bins
to make calculating metrics more efficient
config (None | dict): see usage
CommandLine:
xdoctest -m kwcoco.metrics.segmentation_metrics SingleImageSegmentationMetrics --show
Example:
>>> from kwcoco.metrics.segmentation_metrics import * # NOQA
>>> from kwcoco.coco_evaluator import CocoEvaluator
>>> from kwcoco.demo.perterb import perterb_coco
>>> import kwcoco
>>> # TODO: kwcoco demodata with easy dummy heatmap channels
>>> true_coco = kwcoco.CocoDataset.demo('vidshapes2', image_size=(512, 512))
>>> # Score an image against itself
>>> true_coco_img = true_coco.images()[0:1].coco_images[0]
>>> pred_coco_img = true_coco.images()[0:1].coco_images[0]
>>> config = {}
>>> config['balance_area'] = True
>>> config['balance_area'] = 'foreground_independent'
>>> true_dets = true_coco_img.annots().detections
>>> video1 = true_coco_img.video
>>> true_classes = true_coco.object_categories()
>>> config['salient_channel'] = 'r' # pretend red is the salient channel
>>> thresh_bins = np.linspace(0, 255, 1024)
>>> self = SingleImageSegmentationMetrics(
>>> pred_coco_img, true_coco_img, true_classes, true_dets,
>>> thresh_bins=thresh_bins, config=config, video1=video1)
>>> info = self.run()
>>> # xdoctest: +REQUIRES(--show)
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> full_classes = true_coco.object_categories()
>>> chunk_info = [info]
>>> true_gids = [info['row']['true_gid'] for info in chunk_info]
>>> true_coco_imgs = true_coco.images(true_gids).coco_images
>>> true_coco_imgs = [g.detach() for g in true_coco_imgs]
>>> title = 'test'
>>> heatmap_dpath = None
>>> import kwplot
>>> kwplot.autompl()
>>> config['draw_weights'] = True
>>> canvas, _ = draw_chunked_confusion(
>>> full_classes, true_coco_imgs, chunk_info, title=title,
>>> config=config)
>>> kwplot.imshow(canvas)
>>> kwplot.show_if_requested()
>>> print(np.unique(info['saliency_weights']))
"""
def __init__(self, pred_coco_img, true_coco_img, true_classes, true_dets,
video1=None, thresh_bins=None, config=None):
if config is None:
config = {}
self.pred_coco_img = pred_coco_img
self.true_coco_img = true_coco_img
self.true_classes = true_classes
self.true_dets = true_dets
self.video1 = video1
self.thresh_bins = thresh_bins
self.config = config
self.viz_thresh = config.get('viz_thresh', 'auto')
self.balance_area = config.get('balance_area', False)
if self.balance_area is True:
self.balance_area = 'foreground_independent'
# TODO: parameterize these class categories
# TODO: remove and generalize before porting to kwcoco
self._behavor_to_classes = {
'ignore': IGNORE_CLASSNAMES,
'background': BACKGROUND_CLASSES,
'undistinguished': UNDISTINGUISHED_CLASSES,
'context': CONTEXT_CLASSES,
'negative': NEGATIVE_CLASSES,
}
# HACK! FIXME: There needs to be a clear definition of what classes are
# scored and which are not.
self._behavor_to_classes['background_'] = (
self._behavor_to_classes['background'] |
self._behavor_to_classes['negative']
)
"""
The above heuristics should roughly be:
* ignore - ignore, Unknown
* background - background, negative
* undistinguished - positive
* context - unused
"""
[docs]
def run(self):
"""
Run like a function
"""
self.resolve_config_variables()
self.prepare_common_truth()
if self.has_saliency:
self.build_saliency_masks()
if self.classes_of_interest:
self.build_class_masks()
if self.has_saliency:
self.score_saliency_masks()
if self.classes_of_interest:
self.score_class_masks()
# TODO: look at the category ranking at each pixel by score.
# Is there a generalization of a confusion matrix to a ranking tensor?
# if 0:
# # TODO: Reintroduce hard-polygon segmentation scoring?
# # Score hard-threshold predicted annotations
# # SCORE PREDICTED ANNOTATIONS
# # Create a pred "panoptic segmentation" style mask
# pred_saliency = np.zeros(shape, dtype=np.uint8)
# pred_dets = pred_coco.annots(gid=gid2).detections
# for pred_sseg in pred_dets.data['segmentations']:
# pred_saliency = pred_sseg.fill(pred_saliency, value=1)
return self.info
[docs]
def resolve_config_variables(self):
"""
Messy helper to help transition from function to a class
"""
config = self.config
thresh_bins = self.thresh_bins
video1 = self.video1
score_space = config.get('score_space', 'auto')
resolution = config.get('resolution', None)
if score_space == 'auto':
pred_vidid = self.pred_coco_img.img.get('video_id', None)
true_vidid = self.true_coco_img.img.get('video_id', None)
if true_vidid is not None or pred_vidid is not None:
score_space = 'video'
else:
score_space = 'image'
if thresh_bins is not None:
if isinstance(thresh_bins, int):
left_bin_edges = np.linspace(0, 1, thresh_bins)
else:
left_bin_edges = thresh_bins
else:
left_bin_edges = None
self.left_bin_edges = left_bin_edges
if score_space == 'image':
img1 = self.true_coco_img.img
dsize = np.array((img1['width'], img1['height']))
elif score_space == 'video':
dsize = np.array((video1['width'], video1['height']))
else:
raise KeyError(score_space)
if resolution is None:
scale = None
else:
try:
scale = self.true_coco_img._scalefactor_for_resolution(
resolution=resolution, space=score_space)
except Exception as ex:
print(f'warning: ex={ex}')
scale = None
if scale is not None:
dsize = np.ceil(np.array(dsize) * np.array(scale)).astype(int)
row = {
'true_gid': self.true_coco_img.img['id'],
'pred_gid': self.pred_coco_img.img['id'],
}
if video1 is not None:
row['video'] = video1['name']
shape = dsize[::-1]
info = {
'row': row,
'shape': shape,
}
# Determine what true/predicted categories are in common
predicted_classes = []
for stream in self.pred_coco_img.channels.streams():
have = stream.intersection(self.true_classes)
predicted_classes.extend(have.parsed)
self.classes_of_interest = ub.oset(predicted_classes) - (
self._behavor_to_classes['negative'] |
self._behavor_to_classes['background'] |
self._behavor_to_classes['ignore'] |
self._behavor_to_classes['undistinguished']
)
# Determine if saliency has been predicted
salient_class = self.config.get('salient_channel', 'salient')
# TODO: allow the use of polygons as saliency predictions.
self.has_saliency = salient_class in self.pred_coco_img.channels
self.score_space = score_space
self.resolution = resolution
self.scale = scale
self.salient_class = salient_class
self.shape = shape
self.info = info
self.row = row
[docs]
def prepare_common_truth(self):
# Load ground truth annotations
scaled_true_dets = self.true_dets
if self.score_space == 'video':
warp_img_to_vid = kwimage.Affine.coerce(
self.true_coco_img.img.get('warp_img_to_vid', {'type': 'affine'}))
scaled_true_dets = scaled_true_dets.warp(warp_img_to_vid)
if self.scale is not None:
scaled_true_dets = scaled_true_dets.scale(self.scale)
self.scaled_true_dets = scaled_true_dets
self.info['true_dets'] = scaled_true_dets
self.true_cidxs = scaled_true_dets.data['class_idxs']
self.true_ssegs = scaled_true_dets.data['segmentations']
if 'weights' in scaled_true_dets.data:
self.true_weights = np.array(
scaled_true_dets.data['weights'], dtype=np.float32)
# Unspecified weights should be 1.0 (kwimage will default to nan)
self.true_weights[np.isnan(self.true_weights)] = 1.0
else:
self.true_weights = [1.0] * len(self.true_ssegs)
self.true_catnames = list(ub.take(scaled_true_dets.classes.idx_to_node, self.true_cidxs))
[docs]
def build_saliency_masks(self):
"""
Create a truth "panoptic segmentation" style mask and weights
"""
# Truth for saliency-task
true_saliency = np.zeros(self.shape, dtype=np.uint8)
saliency_weights = np.ones(self.shape, dtype=np.float32)
sseg_groups = {
'ignore': [],
'context': [],
'foreground': [],
'background': [],
}
for true_sseg, true_catname, true_weight in zip(self.true_ssegs, self.true_catnames, self.true_weights):
if true_catname in self._behavor_to_classes['background']:
key = 'background'
elif true_catname in self._behavor_to_classes['ignore']:
key = 'ignore'
elif true_catname in self._behavor_to_classes['context']:
key = 'context'
else:
key = 'foreground'
sseg_groups[key].append((true_sseg, true_weight))
if self.balance_area == 'foreground_independent':
if len(sseg_groups['foreground']):
# Combine all foreground polygons into a single shape
# (removes influence of overlapping polygons)
# TODO: should we account for weights here
fg_poly = unary_union([p.to_shapely() for p, w in sseg_groups['foreground']])
# The average weight each instance contributes
unit_sseg_share = fg_poly.area / len(sseg_groups['foreground'])
else:
unit_sseg_share = 1
# I don't like this foreground_background_independent definition it
# doesn't really make sense...
# elif self.balance_area == 'foreground_background_independent':
# if len(sseg_groups['foreground']):
# # Combine all foreground polygons into a single shape
# # (removes influence of overlapping polygons)
# total_area = np.prod(self.shape)
# fg_poly = unary_union([p.to_shapely() for p in sseg_groups['foreground']])
# fg_area = fg_poly.area
# bg_area = total_area - fg_area
# unit_sseg_share = total_area / (len(sseg_groups['foreground']) + 1)
# # The average weight each instance contributes
# # unit_sseg_share = fg_poly.area / len(sseg_groups['foreground'])
# bg_weight = unit_sseg_share / bg_area
# saliency_weights[:] = bg_weight
# else:
# unit_sseg_share = 1
elif not self.balance_area:
...
else:
raise KeyError(self.balance_area)
# background should be background, do nothing with it
sseg_groups['background']
# Ignore context classes in saliency
# Ignore no-activity and post-construction, ignore, and Unknown
for true_sseg, true_weight in sseg_groups['ignore']:
saliency_weights = true_sseg.fill(saliency_weights, value=0)
for true_sseg, true_weight in sseg_groups['context']:
# saliency_weights = true_sseg.fill(saliency_weights, value=0)
...
# Score positive, site prep, and active construction.
for true_sseg, true_weight in sseg_groups['foreground']:
true_saliency = true_sseg.fill(true_saliency, value=1)
if self.balance_area:
# Fill in the weights to upweight smaller areas.
instance_weight = true_weight * unit_sseg_share / true_sseg.area
saliency_weights = true_sseg.fill(saliency_weights, value=instance_weight)
elif true_weight != 1:
saliency_weights = true_sseg.fill(saliency_weights, value=float(true_weight))
# saliency_weights = saliency_weights / saliency_weights.max()
self.true_saliency = true_saliency
self.saliency_weights = saliency_weights
[docs]
def build_class_masks(self):
"""
Creates the truth masks and weights for each class
"""
# Truth for class-task
shape = self.shape
catname_to_true: Dict[str, np.ndarray] = {
catname: np.zeros(shape, dtype=np.float32)
for catname in self.classes_of_interest
}
class_weights = np.ones(shape, dtype=np.float32)
# initial_total_weight = class_weights.size
sseg_groups = {
'background': [],
'ignore': [],
'undistinguished': [],
'foreground': [],
}
for true_sseg, true_catname, true_weight in zip(self.true_ssegs, self.true_catnames, self.true_weights):
if true_catname in self._behavor_to_classes['background']:
key = 'background'
elif true_catname in self._behavor_to_classes['ignore']:
key = 'ignore'
elif true_catname in self._behavor_to_classes['undistinguished']:
key = 'undistinguished'
else:
key = 'foreground'
true_sseg.meta['true_catname'] = true_catname
sseg_groups[key].append((true_sseg, true_weight))
if self.balance_area == 'foreground_independent':
if len(sseg_groups['foreground']):
fg_poly = unary_union([p.to_shapely() for p, w in sseg_groups['foreground']])
unit_sseg_share = fg_poly.area / len(sseg_groups['foreground'])
else:
unit_sseg_share = 1
elif not self.balance_area:
...
else:
raise KeyError(self.balance_area)
# background should be background, do nothing with it
sseg_groups['background']
# Ignore context classes in saliency
# Ignore no-activity and post-construction, ignore, and Unknown
for true_sseg, true_weight in sseg_groups['ignore']:
class_weights = true_sseg.fill(class_weights, value=0)
for true_sseg, true_weight in sseg_groups['undistinguished']:
class_weights = true_sseg.fill(class_weights, value=0)
# Score positive, site prep, and active construction.
for true_sseg, true_weight in sseg_groups['foreground']:
true_catname = true_sseg.meta['true_catname']
if self.balance_area:
# Fill in the weights to upweight smaller areas.
instance_weight = true_weight * unit_sseg_share / true_sseg.area
class_weights = true_sseg.fill(class_weights, value=instance_weight)
elif true_weight != 1.0:
class_weights = true_sseg.fill(class_weights, value=float(true_weight))
catname_to_true[true_catname] = true_sseg.fill(catname_to_true[true_catname], value=1)
# Hack:
# normalize to 0-1, this downweights the background too much, but
# I think fixes a upstream issue. Remove (or justify?) if possible.
# class_weights = class_weights / class_weights.max()
self.class_weights = class_weights
self.catname_to_true = catname_to_true
[docs]
def score_saliency_masks(self):
"""
Compute scores for the pred / truth
"""
pred_coco_img = self.pred_coco_img
# TODO: consolidate this with above class-specific code
salient_delay = pred_coco_img.imdelay(
self.salient_class, space=self.score_space,
resolution=self.resolution,
nodata_method='float')
_got_salient_prob = salient_delay.finalize(nodata_method='float')
# FIXME: Delayed image bug? Not sure why it returns 2 channels
# sometimes instead of 3. That should not happen. Probably something to
# do with saving / reading the asset as a png?
if len(_got_salient_prob.shape) == 2:
salient_prob = _got_salient_prob
else:
salient_prob = _got_salient_prob[..., 0]
salient_prob_orig = salient_prob.copy()
invalid_mask = np.isnan(salient_prob)
salient_prob[invalid_mask] = 0
try:
self.saliency_weights[invalid_mask] = 0
except Exception:
print(f'invalid_mask.shape={invalid_mask.shape}')
print(f'saliency_weights.shape={self.saliency_weights.shape}')
raise
pred_score = salient_prob.ravel()
if self.left_bin_edges is not None:
rounded_idx = np.searchsorted(self.left_bin_edges, pred_score)
pred_score = self.left_bin_edges[rounded_idx]
data = kwarray.DataFrameArray({
'is_true': self.true_saliency.ravel(),
'pred_score': pred_score,
'weight': self.saliency_weights.ravel().astype(np.float32),
})
bin_cfns = BinaryConfusionVectors(data)
salient_measures: Measures = bin_cfns.measures()
salient_summary = salient_measures.summary()
salient_metrics = {
'salient_' + k: v
for k, v in ub.dict_isect(salient_summary, {
'ap', 'auc', 'max_f1'}).items()
}
try:
# Requires kwcoco 0.8.3
salient_metrics['realpos_total'] = salient_measures['realpos_total']
salient_metrics['realneg_total'] = salient_measures['realneg_total']
submeasures = salient_measures['max_f1_submeasures']
salient_metrics['salient_max_f1_thresh'] = submeasures['thresh']
salient_metrics['salient_max_f1_ppv'] = submeasures['ppv']
salient_metrics['salient_max_f1_tpr'] = submeasures['tpr']
salient_metrics['salient_max_f1_fpr'] = submeasures['fpr']
salient_metrics['salient_max_f1_tnr'] = submeasures['tnr']
except Exception:
...
self.row.update(salient_metrics)
self.info.update({
'salient_measures': salient_measures,
'salient_prob': salient_prob_orig,
'true_saliency': self.true_saliency,
})
if 1:
maximized_info = salient_measures.maximized_thresholds()
# This cherry-picks a threshold per image!
if self.viz_thresh == 'auto':
cherry_picked_thresh = maximized_info['f1']['thresh']
saliency_thresh = cherry_picked_thresh
else:
saliency_thresh = self.viz_thresh
pred_saliency = salient_prob > saliency_thresh
y_true = self.true_saliency.ravel()
y_pred = pred_saliency.ravel()
sample_weight = self.saliency_weights.ravel()
mat = skm.confusion_matrix(y_true, y_pred, labels=np.array([0, 1]),
sample_weight=sample_weight)
self.info.update({
'mat': mat,
'pred_saliency': pred_saliency,
'saliency_thresh': saliency_thresh,
'saliency_weights': self.saliency_weights,
})
[docs]
def score_class_masks(self):
# handle multiclass case
pred_chan_of_interest = '|'.join(self.classes_of_interest)
delayed_probs = self.pred_coco_img.imdelay(
pred_chan_of_interest, space=self.score_space,
resolution=self.resolution, nodata_method='float').as_xarray()
# Do we need xarray anymore?
class_probs = delayed_probs.finalize()
invalid_mask = np.isnan(class_probs).all(axis=2)
self.class_weights[invalid_mask] = 0
catname_to_prob = {}
cx_to_binvecs = {}
for cx, cname in enumerate(self.classes_of_interest):
is_true = self.catname_to_true[cname]
score = class_probs.loc[:, :, cname].data.copy()
invalid_mask = np.isnan(score)
weights = self.class_weights.copy()
weights[invalid_mask] = 0
score[invalid_mask] = 0
pred_score = score.ravel()
if self.left_bin_edges is not None:
# round scores down to the nearest bin
rounded_idx = np.searchsorted(self.left_bin_edges, pred_score)
pred_score = self.left_bin_edges[rounded_idx]
catname_to_prob[cname] = score
bin_data = {
# is_true denotes if the true class of the item is the
# category of interest.
'is_true': is_true.ravel(),
'pred_score': pred_score,
'weight': weights.ravel(),
}
bin_data = kwarray.DataFrameArray(bin_data)
bin_cfsn = BinaryConfusionVectors(bin_data, cx, self.classes_of_interest)
# TODO: use me?
# bin_measures = bin_cfsn.measures()
# bin_measures.summary()
cx_to_binvecs[cname] = bin_cfsn
ovr_cfns = OneVsRestConfusionVectors(cx_to_binvecs, self.classes_of_interest)
class_measures = ovr_cfns.measures()
self.row['mAP'] = class_measures['mAP']
self.row['mAUC'] = class_measures['mAUC']
self.info.update({
'class_weights': self.class_weights,
'class_measures': class_measures,
'catname_to_true': self.catname_to_true,
'catname_to_prob': catname_to_prob,
})
[docs]
def single_image_segmentation_metrics(pred_coco_img, true_coco_img,
true_classes, true_dets, video1=None,
thresh_bins=None, config=None):
"""
DEPRECATED, Use SingleImageSegmentationMetrics instead
Args:
true_coco_img (kwcoco.CocoImage): detached true coco image
pred_coco_img (kwcoco.CocoImage): detached predicted coco image
thresh_bins (int): if specified rounds scores into this many bins
to make calculating metrics more efficient
config (None | dict): see usage
"""
self = SingleImageSegmentationMetrics(
pred_coco_img, true_coco_img, true_classes, true_dets,
thresh_bins=thresh_bins, config=config, video1=video1)
info = self.run()
return info
[docs]
@ub.memoize
def _memo_legend(label_to_color):
import kwplot
legend_img = kwplot.make_legend_img(label_to_color)
return legend_img
[docs]
def draw_confusion_image(pred, target):
canvas = np.zeros_like(pred)
np.putmask(canvas, (target == 0) & (pred == 0), 0) # true-neg
np.putmask(canvas, (target == 1) & (pred == 1), 1) # true-pos
np.putmask(canvas, (target == 1) & (pred == 0), 2) # false-neg
np.putmask(canvas, (target == 0) & (pred == 1), 3) # false-pos
return canvas
[docs]
def colorize_class_probs(probs, classes):
"""
probs = pred_cat_ohe
classes = pred_classes
"""
# color = classes.graph.nodes[node].get('color', None)
# Define default colors
# default_cidx_to_color = kwimage.Color.distinct(len(data))
# try and read colors from classes CategoryTree
# try:
# cidx_to_color = []
cidx_to_color = []
for cidx in range(len(probs)):
node = classes[cidx]
color = classes.graph.nodes[node].get('color', None)
if color is not None:
color = kwimage.Color(color).as01()
cidx_to_color.append(color)
import distinctipy
have_colors = [c for c in cidx_to_color if c is not None]
num_need = sum(c is None for c in cidx_to_color)
if num_need:
new_colors = distinctipy.get_colors(
num_need, exclude_colors=have_colors, rng=569944)
new_color_iter = iter(new_colors)
cidx_to_color = [next(new_color_iter) if c is None else c for c in cidx_to_color]
canvas_dtype = np.float32
# Each class gets its own color, and modulates the alpha
h, w = probs.shape[-2:]
layer_shape = (h, w, 4)
background = np.zeros(layer_shape, dtype=canvas_dtype)
background[..., 3] = 1.0
layers = []
for cidx, chan in enumerate(probs):
color = cidx_to_color[cidx]
layer = np.empty(layer_shape, dtype=canvas_dtype)
layer[..., 3] = chan
layer[..., 0:3] = color
layers.append(layer)
layers.append(background)
colormask = kwimage.overlay_alpha_layers(
layers, keepalpha=False, dtype=canvas_dtype)
return colormask
[docs]
def draw_truth_borders(true_dets, canvas, alpha=1.0, color=None):
true_sseg = true_dets.data['segmentations']
true_cidxs = true_dets.data['class_idxs']
_classes = true_dets.data['classes']
if color is None:
_nodes = ub.take(_classes.idx_to_node, true_cidxs)
_node_data = ub.take(_classes.graph.nodes, _nodes)
_node_colors = [d['color'] for d in _node_data]
color = _node_colors
canvas = kwimage.ensure_float01(canvas)
if alpha < 1.0:
# remove this condition when kwimage 0.8.3 is released always take else
empty_canvas = np.zeros_like(canvas, shape=(canvas.shape[0:2] + (4,)))
overlay_canvas = true_sseg.draw_on(empty_canvas, fill=False,
border=True, color=color, alpha=1.0)
overlay_canvas[..., 3] *= alpha
canvas = kwimage.overlay_alpha_images(overlay_canvas, canvas)
else:
canvas = true_sseg.draw_on(canvas, fill=False, border=True,
color=color, alpha=alpha)
return canvas
[docs]
def draw_chunked_confusion(full_classes, true_coco_imgs, chunk_info,
title=None, config=None):
"""
Draw a a sequence of true/pred image predictions
"""
color_labels = ['TN', 'TP', 'FN', 'FP']
score_space = config.get('score_space', 'video')
colors = list(ub.take(CONFUSION_COLOR_SCHEME, color_labels))
# colors = ['blue', 'green', 'yellow', 'red']
# colors = ['black', 'white', 'yellow', 'red']
color_lut = np.array([kwimage.Color(c).as255() for c in colors])
# full_classes: kwcoco.CategoryTree = true_coco.object_categories()
if config is None:
config = {}
resolution = config.get('resolution', None)
draw_legend = config.get('draw_legend', True)
DRAW_VIZ_COMPONENTS = config.get('draw_components', False)
draw_burnin = config.get('draw_burnin', False)
# Make a legend
color01_lut = color_lut / 255.0
legend_images = []
if 'catname_to_prob' in chunk_info[0]:
# Class Legend
label_to_color = {
node: kwimage.Color(data['color']).as01()
for node, data in full_classes.graph.nodes.items()}
label_to_color = ub.sorted_keys(label_to_color)
legend_img_class = _memo_legend(label_to_color)
legend_images.append(legend_img_class)
if 'pred_saliency' in chunk_info[0]:
# Confusion Legend
label_to_color = ub.dzip(color_labels, color01_lut)
if draw_legend:
legend_img_saliency_cfsn = _memo_legend(label_to_color)
legend_img_saliency_cfsn = kwimage.ensure_uint255(legend_img_saliency_cfsn)
legend_images.append(legend_img_saliency_cfsn)
if len(legend_images):
legend_img = kwimage.stack_images(legend_images, axis=0, pad=5)
else:
legend_img = None
# Draw predictions on each frame
frame_parts = {}
parts = []
frame_nums = []
true_gids = []
unique_vidnames = set()
for info, true_coco_img in zip(chunk_info, true_coco_imgs):
row = info['row']
if row.get('video', ''):
unique_vidnames.add(row['video'])
# true_gid = row['true_gid']
# true_coco_img = true_coco.coco_image(true_gid)
true_gid = true_coco_img.img['id']
true_img = true_coco_img.img
frame_index = true_img.get('frame_index', None)
if frame_index is not None:
frame_nums.append(frame_index)
true_gids.append(true_gid)
# image_header_text = f'{frame_index} - gid = {true_gid}'
header_lines = build_image_header_text(
img=true_img,
name=None,
_header_extra=None,
)
# date_captured = true_img.get('date_captured', '')
# frame_index = true_img.get('frame_index', None)
# gid = true_img.get('id', None)
# sensor_coarse = true_img.get('sensor_coarse', 'unknown')
# _header_extra = None
# header_line_infos = [
# [f'gid={gid}, frame={frame_index}', _header_extra],
# [sensor_coarse, date_captured],
# ]
# header_lines = []
# for line_info in header_line_infos:
# header_line = ' '.join([p for p in line_info if p])
# if header_line:
# header_lines.append(header_line)
image_header_text = '\n'.join(header_lines)
imgw = info['shape'][1]
# SC_smt_it_stm_p8_newanns_weighted_raw_v39_epoch=52-step=2269088
header = kwimage.draw_header_text(
{'width': imgw},
# image=confusion_image,
# image=None,
text=image_header_text, color='red', stack=False)
vert_parts = {}
vert_parts['header'] = header
DRAW_WEIGHTS = config.get('draw_weights', False)
if 'catname_to_prob' in info:
true_dets = info['true_dets']
true_dets.data['classes'] = full_classes
pred_classes = kwcoco.CategoryTree.coerce(list(info['catname_to_prob'].keys()))
true_classes = kwcoco.CategoryTree.coerce(list(info['catname_to_true'].keys()))
# todo: ensure colors are robust and consistent
for node in pred_classes.graph.nodes():
pred_classes.graph.nodes[node]['color'] = full_classes.graph.nodes[node]['color']
for node in true_classes.graph.nodes():
true_classes.graph.nodes[node]['color'] = full_classes.graph.nodes[node]['color']
# pred_classes = kwcoco.CategoryTree
pred_cat_ohe = np.stack(list(info['catname_to_prob'].values()))
true_cat_ohe = np.stack(list(info['catname_to_true'].values()))
# class_pred_idx = pred_cat_ohe.argmax(axis=0)
# class_true_idx = true_cat_ohe.argmax(axis=0)
true_overlay = colorize_class_probs(true_cat_ohe, true_classes)[..., 0:3]
# true_heatmap = kwimage.Heatmap(class_probs=true_cat_ohe, classes=true_classes)
# true_overlay = true_heatmap.colorize('class_probs')[..., 0:3]
true_overlay = draw_truth_borders(true_dets, true_overlay, alpha=1.0)
true_overlay = kwimage.ensure_uint255(true_overlay)
if draw_burnin:
true_overlay = kwimage.draw_text_on_image(
true_overlay, 'true class', org=(1, 1), valign='top',
color=TRUE_GREEN, border=1)
vert_parts['class_truth'] = true_overlay
if DRAW_WEIGHTS:
class_weights = info['class_weights']
if class_weights.max() > 1:
weight_image = colorize_weights(class_weights)
weight_title = 'weights (yellow means > 1)'
# weight_image = kwarray.normalize(class_weights, min_val=0)
# weight_title = 'weights (normed)'
else:
weight_image = class_weights
weight_title = 'weights'
weight_image = kwimage.ensure_uint255(weight_image)
if draw_burnin:
weight_image = kwimage.draw_text_on_image(
weight_image,
weight_title,
org=(1, 1), valign='top',
color='pink', border=1)
vert_parts['class_weights'] = weight_image
pred_overlay = colorize_class_probs(pred_cat_ohe, pred_classes)[..., 0:3]
# pred_heatmap = kwimage.Heatmap(class_probs=pred_cat_ohe, classes=pred_classes)
# pred_overlay = pred_heatmap.colorize('class_probs')[..., 0:3]
pred_overlay = draw_truth_borders(true_dets, pred_overlay, alpha=0.05, color='white')
# pred_overlay = draw_truth_borders(true_dets, pred_overlay, alpha=0.05)
pred_overlay = kwimage.ensure_uint255(pred_overlay)
if draw_burnin:
pred_overlay = kwimage.draw_text_on_image(
pred_overlay, 'pred class', org=(1, 1), valign='top',
color=PRED_BLUE, border=1)
vert_parts['class_heatmap'] = pred_overlay
if 'pred_saliency' in info:
pred_saliency = info['pred_saliency'].astype(np.uint8)
true_saliency = info['true_saliency']
saliency_thresh = info['saliency_thresh']
confusion_idxs = draw_confusion_image(pred_saliency, true_saliency)
confusion_image = color_lut[confusion_idxs]
confusion_image = kwimage.ensure_uint255(confusion_image)
if draw_burnin:
confusion_image = kwimage.draw_text_on_image(
confusion_image,
f'confusion saliency: thresh={saliency_thresh:0.3f}',
org=(1, 1), valign='top',
color='white', border=1)
vert_parts['saliency_confusion'] = confusion_image
if DRAW_WEIGHTS:
saliency_weights = info['saliency_weights']
if saliency_weights.max() > 1:
# weight_image = kwarray.normalize(saliency_weights, min_val=0)
# weight_title = 'weights (normed)'
weight_image = colorize_weights(saliency_weights)
weight_title = 'weights (yellow means > 1)'
else:
weight_image = saliency_weights
weight_title = 'weights'
weight_image = kwimage.ensure_uint255(weight_image)
if draw_burnin:
weight_image = kwimage.draw_text_on_image(
weight_image,
weight_title,
org=(1, 1), valign='top',
color='pink', border=1)
vert_parts['saliency_weights'] = weight_image
elif 'true_saliency' in info:
true_saliency = info['true_saliency']
true_saliency = true_saliency.astype(np.float32)
heatmap = kwimage.make_heatmask(
true_saliency, with_alpha=0.5, cmap='plasma')
# heatmap[invalid_mask] = 0
heatmap_int = kwimage.ensure_uint255(heatmap[..., 0:3])
if draw_burnin:
heatmap_int = kwimage.draw_text_on_image(
heatmap_int, 'true saliency', org=(1, 1), valign='top',
color=TRUE_GREEN, border=1)
vert_parts['saliency_heatmap'] = heatmap_int
# confusion_image = kwimage.draw_text_on_image(
# confusion_image, image_text, org=(1, 1), valign='top',
# color='white', border={'color': 'black'})
# TODO:
# Can we show the reference image?
# TODO:
# Show the datetime on the top of the image (and the display band?)
real_image_norm = None
real_image_int = None
TRY_IMREAD = 1
if TRY_IMREAD:
avali_chans = {p2 for p1 in true_coco_img.channels.spec.split(',') for p2 in p1.split('|')}
chosen_viz_channs = None
if len(avali_chans & {'red', 'green', 'blue'}) == 3:
chosen_viz_channs = 'red|green|blue'
elif len(avali_chans & {'r', 'g', 'b'}) == 3:
chosen_viz_channs = 'r|g|b'
elif len(avali_chans & {'pan'}) == 3:
chosen_viz_channs = 'pan'
else:
chosen_viz_channs = true_coco_img.primary_asset()['channels']
try:
real_image = true_coco_img.imdelay(chosen_viz_channs,
space=score_space,
nodata_method='float',
resolution=resolution).finalize()[:]
real_image_norm = kwimage.normalize_intensity(real_image)
real_image_norm = kwimage.fill_nans_with_checkers(real_image_norm)
real_image_int = kwimage.ensure_uint255(real_image_norm)
except Exception as ex:
print('ex = {!r}'.format(ex))
TRY_SOFT = 1
salient_prob = None
if TRY_SOFT:
salient_prob = info.get('salient_prob', None)
# invalid_mask = info.get('invalid_mask', None)
if salient_prob is not None:
invalid_mask = np.isnan(salient_prob)
heatmap = kwimage.make_heatmask(
salient_prob, with_alpha=0.5, cmap='plasma')
heatmap[invalid_mask] = np.nan
heatmap = kwimage.fill_nans_with_checkers(heatmap)
# heatmap[invalid_mask] = 0
heatmap_int = kwimage.ensure_uint255(heatmap[..., 0:3])
if draw_burnin:
heatmap_int = kwimage.draw_text_on_image(
heatmap_int, 'pred saliency', org=(1, 1), valign='top',
color=PRED_BLUE, border=1)
vert_parts['salient_pred'] = heatmap_int
# if real_image_norm is not None:
# overlaid = kwimage.overlay_alpha_layers([heatmap, real_image_norm.mean(axis=2)])
# overlaid = kwimage.ensure_uint255(overlaid[..., 0:3])
# vert_parts.append(overlaid)
if real_image_int is not None:
vert_parts['input_image'] = real_image_int
vert_parts = {k: kwimage.ensure_uint255(c) for k, c in vert_parts.items()}
vert_stack = kwimage.stack_images(list(vert_parts.values()), axis=0)
if DRAW_VIZ_COMPONENTS:
frame_parts[true_gid] = vert_parts
parts.append(vert_stack)
max_frame = None if len(frame_nums) == 0 else max(frame_nums)
min_frame = None if len(frame_nums) == 0 else min(frame_nums)
max_gid = max(true_gids)
min_gid = min(true_gids)
try:
# num_digits = _max_digits(max_num) # TODO
if max_frame == min_frame:
frame_part = f'{min_frame:04d}'
else:
frame_part = f'{min_frame:04d}-{max_frame:04d}'
except TypeError:
frame_part = f'{min_frame}'
try:
if max_gid == min_gid:
gid_part = f'{min_gid:04d}'
else:
gid_part = f'{min_gid:04d}-{max_gid:04d}'
except TypeError:
gid_part = f'{min_gid}'
vidname_part = '_'.join(list(unique_vidnames))
if not vidname_part:
vidname_part = '_loose_images'
plot_fstem = f'{vidname_part}-{frame_part}-{gid_part}'
canvas_title_parts = []
if title:
canvas_title_parts.append(title)
canvas_title_parts.append(plot_fstem)
canvas_title = '\n'.join(canvas_title_parts)
plot_canvas = kwimage.stack_images(parts, axis=1, overlap=-10)
if draw_legend:
if legend_img is not None:
plot_canvas = kwimage.stack_images(
[plot_canvas, legend_img], axis=1, overlap=-10)
header = kwimage.draw_header_text(
{'width': plot_canvas.shape[1]}, canvas_title)
plot_canvas = kwimage.stack_images([header, plot_canvas], axis=0)
plot_info = {}
plot_info['vidname_part'] = vidname_part
plot_info['plot_fstem'] = plot_fstem
if DRAW_VIZ_COMPONENTS:
plot_info['frame_parts'] = frame_parts
return plot_canvas, plot_info
[docs]
def dump_chunked_confusion(full_classes, true_coco_imgs, chunk_info,
heatmap_dpath, title=None, config=None):
"""
Draw and write a sequence of true/pred image predictions
"""
config = config or {}
DRAW_VIZ_COMPONENTS = config.get('draw_components', False)
plot_canvas, plot_info = draw_chunked_confusion(
full_classes, true_coco_imgs, chunk_info, title=title, config=config)
vidname_part = plot_info['vidname_part']
plot_fstem = plot_info['plot_fstem']
heatmap_dpath = ub.Path(str(heatmap_dpath))
if DRAW_VIZ_COMPONENTS:
frame_parts = plot_info['frame_parts']
components_dpath = heatmap_dpath / '_components'
components_dpath.ensuredir()
for gid, vert_parts in frame_parts.items():
gid_dpath = components_dpath / f'_img_{gid:05d}'
gid_dpath.ensuredir()
for key, part in vert_parts.items():
part_fpath = gid_dpath / (key + '.jpg')
kwimage.imwrite(part_fpath, part)
vid_plot_dpath = (heatmap_dpath / vidname_part).ensuredir()
plot_fpath = vid_plot_dpath / (plot_fstem + '.jpg')
kwimage.imwrite(str(plot_fpath), plot_canvas)
[docs]
def evaluate_segmentations(true_coco, pred_coco, eval_dpath=None,
eval_fpath=None, config=None):
"""
TODO:
- [ ] Fold non-critical options into the config
CommandLine:
XDEV_PROFILE=1 xdoctest -m geowatch.tasks.fusion.evaluate evaluate_segmentations
Example:
>>> # xdoctest: +REQUIRES(module:kwutil)
>>> from kwcoco.coco_evaluator import CocoEvaluator
>>> from kwcoco.demo.perterb import perterb_coco
>>> import kwcoco
>>> true_coco1 = kwcoco.CocoDataset.demo('vidshapes2', image_size=(64, 64))
>>> true_coco2 = kwcoco.CocoDataset.demo('shapes2', image_size=(64, 64))
>>> #true_coco1 = kwcoco.CocoDataset.demo('vidshapes9')
>>> #true_coco2 = kwcoco.CocoDataset.demo('shapes128')
>>> true_coco = kwcoco.CocoDataset.union(true_coco1, true_coco2)
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': (0, 10),
>>> 'n_fn': (0, 10),
>>> 'with_probs': True,
>>> 'with_heatmaps': True,
>>> 'verbose': 1,
>>> }
>>> # TODO: it would be nice to demo the soft metrics
>>> # functionality by adding "salient_prob" or "class_prob"
>>> # auxiliary channels to this demodata.
>>> print('perterbing')
>>> pred_coco = perterb_coco(true_coco, **kwargs)
>>> eval_dpath = ub.Path.appdir('kwcoco/tests/fusion_eval').ensuredir()
>>> print('eval_dpath = {!r}'.format(eval_dpath))
>>> config = {}
>>> config['score_space'] = 'image'
>>> draw_curves = 'auto'
>>> draw_heatmaps = 'auto'
>>> #draw_heatmaps = False
>>> config['workers'] = 'min(avail-2,6)'
>>> #workers = 0
>>> evaluate_segmentations(true_coco, pred_coco, eval_dpath, config=config)
Example:
>>> # xdoctest: +REQUIRES(env:SLOW_DOCTEST)
>>> # xdoctest: +REQUIRES(module:kwutil)
>>> from kwcoco.metrics.segmentation_metrics import * # NOQA
>>> from kwcoco.coco_evaluator import CocoEvaluator
>>> from kwcoco.demo.perterb import perterb_coco
>>> import kwcoco
>>> true_coco = kwcoco.CocoDataset.demo('vidshapes2', image_size=(512, 512))
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': (0, 10),
>>> 'n_fn': (0, 10),
>>> 'with_probs': True,
>>> 'with_heatmaps': True,
>>> 'verbose': 1,
>>> }
>>> # TODO: it would be nice to demo the soft metrics
>>> # functionality by adding "salient_prob" or "class_prob"
>>> # auxiliary channels to this demodata.
>>> print('perterbing')
>>> pred_coco = perterb_coco(true_coco, **kwargs)
>>> eval_dpath = ub.Path.appdir('kwcoco/tests/fusion_eval-video').ensuredir()
>>> print('eval_dpath = {!r}'.format(eval_dpath))
>>> config = {}
>>> config['score_space'] = 'video'
>>> config['draw_weights'] = True
>>> config['balance_area'] = True
>>> draw_curves = 'auto'
>>> draw_heatmaps = 'auto'
>>> #draw_heatmaps = False
>>> config['workers'] = 'min(avail-2,6)'
>>> config['workers'] = 1
>>> #workers = 0
>>> evaluate_segmentations(true_coco, pred_coco, eval_dpath, config=config)
"""
import rich
from kwutil import process_context
from kwutil import util_progress
from kwutil import util_parallel
if config is None:
config = {}
draw_curves = config.get('draw_curves', 'auto')
draw_heatmaps = config.get('draw_heatmaps', 'auto')
score_space = config.get('score_space', 'auto')
draw_workers = config.get('draw_workers', 'auto')
if score_space == 'auto':
if true_coco.n_videos:
score_space = 'video'
else:
score_space = 'image'
config['score_space'] = score_space
# Ensure each class has colors.
ensure_heuristic_coco_colors(true_coco)
true_classes = list(true_coco.object_categories())
full_classes: kwcoco.CategoryTree = true_coco.object_categories()
# Sometimes supercategories dont get colors, this fixes that.
ensure_heuristic_category_tree_colors(full_classes)
workers = util_parallel.coerce_num_workers(config.get('workers', 0))
if draw_workers == 'auto':
draw_workers = min(2, workers)
else:
draw_workers = util_parallel.coerce_num_workers(draw_workers)
# Extract metadata about the predictions to persist
meta = {}
meta['info'] = info = []
if pred_coco.fpath is not None:
pred_fpath = ub.Path(pred_coco.fpath)
meta['pred_name'] = '_'.join((list(pred_fpath.parts[-2:-1]) + [pred_fpath.stem]))
predicted_info = pred_coco.dataset.get('info', [])
for item in predicted_info:
if item.get('type', None) == 'measure':
info.append(item)
if item.get('type', None) == 'process':
proc_name = item.get('properties', {}).get('name', None)
if proc_name == 'geowatch.tasks.fusion.predict':
package_fpath = item['properties']['config'].get('package_fpath')
if 'title' not in item:
item['title'] = ub.Path(package_fpath).stem
if 'package_name' not in item:
item['package_name'] = ub.Path(package_fpath).stem
# FIXME: title should also include pred-config info
meta['title'] = item['title']
meta['package_name'] = item['package_name']
info.append(item)
# Title contains the model package name if we can infer it
package_name = meta.get('package_name', '')
pred_name = meta.get('pred_name', '')
title_parts = [p for p in [package_name, pred_name] if p]
resolution = config.get('resolution', None)
balance_area = config.get('balance_area', False)
if resolution is not None:
title_parts.append(f'space={score_space} @ {resolution}, balance_area={balance_area}')
else:
title_parts.append(f'space={score_space} balance_area={balance_area}')
meta['title_parts'] = title_parts
title = meta['title'] = ' - '.join(title_parts)
required_marked = 'auto' # parametarize
if required_marked == 'auto':
# In "auto" mode dont require marks if all images are unmarked,
# otherwise assume that we should restirct to marked images
required_marked = any(pred_coco.images().lookup('has_predictions', False))
# Handle user-specified filter
from kwcoco._helpers import _query_image_ids
valid_image_ids = _query_image_ids(
true_coco,
config.get('select_images', None),
config.get('select_videos', None)
)
matches = associate_images(
true_coco, pred_coco, key_fallback='id',
valid_image_ids=valid_image_ids)
video_matches = matches['video']
image_matches = matches['image']
n_vid_matches = len(video_matches)
n_img_per_vid_matches = [len(d['match_gids1']) for d in video_matches]
n_img_matches = len(image_matches['match_gids1'])
print('n_img_per_vid_matches = {}'.format(ub.urepr(n_img_per_vid_matches, nl=1)))
print('n_vid_matches = {}'.format(ub.urepr(n_vid_matches, nl=1)))
print('n_img_matches = {!r}'.format(n_img_matches))
rich.print(f'Eval Dpath: [link={eval_dpath}]{eval_dpath}[/link]')
chunk_size = 5
num_thresh_bins = config.get('thresh_bins', 32 * 32)
thresh_bins = np.linspace(0, 1, num_thresh_bins) # this is more stable using an ndarray
if draw_curves == 'auto':
draw_curves = bool(eval_dpath is not None)
if draw_heatmaps == 'auto':
draw_heatmaps = bool(eval_dpath is not None)
pcontext = process_context.ProcessContext(
name='geowatch.tasks.fusion.evaluate',
config=config,
)
pcontext.start()
if eval_dpath is None:
heatmap_dpath = None
else:
eval_dpath = ub.Path(eval_dpath)
curve_dpath = (eval_dpath / 'curves').ensuredir()
pcontext.write_invocation(curve_dpath / 'invocation.sh')
# Objects that will aggregate confusion across multiple images
salient_measure_combiner = MeasureCombiner(thresh_bins=thresh_bins)
class_measure_combiner = OneVersusRestMeasureCombiner(thresh_bins=thresh_bins)
# Gather the true and predicted image pairs to be scored
total_images = 0
if required_marked:
for video_match in video_matches:
gids1 = video_match['match_gids1']
gids2 = video_match['match_gids2']
flags = pred_coco.images(gids2).lookup('has_predictions', False)
video_match['match_gids1'] = list(ub.compress(gids1, flags))
video_match['match_gids2'] = list(ub.compress(gids2, flags))
total_images += len(gids1)
gids1 = image_matches['match_gids1']
gids2 = image_matches['match_gids2']
flags = pred_coco.images(gids2).lookup('has_predictions', False)
image_matches['match_gids1'] = list(ub.compress(gids1, flags))
image_matches['match_gids2'] = list(ub.compress(gids2, flags))
total_images += len(gids1)
else:
total_images = None
# Prepare job pools
print('workers = {!r}'.format(workers))
print('draw_workers = {!r}'.format(draw_workers))
# draw_executor = ub.Executor(mode='process', max_workers=draw_workers)
# metrics_executor = ub.Executor(mode='process', max_workers=workers)
# We want to prevent too many evaluate jobs from piling up results to draw,
# as it takes longer to draw than it does to score. For this reason, block
# if the draw queue gets too big.
metrics_executor = _DelayedBlockingJobQueue(max_unhandled_jobs=max(1, workers), mode='process', max_workers=workers)
draw_executor = _MaxQueuePool(mode='process', max_workers=draw_workers, max_queue_size=draw_workers * 4)
prog = ub.ProgIter(total=total_images, desc='submit scoring jobs', adjust=False, freq=1)
prog.begin()
job_chunks = []
draw_jobs = []
# Submit scoring jobs over pairs of true-predicted images in videos
for video_match in video_matches:
prog.set_postfix_str('comparing ' + video_match['vidname'])
gids1 = video_match['match_gids1']
gids2 = video_match['match_gids2']
if required_marked:
flags = pred_coco.images(gids2).lookup('has_predictions', False)
gids1 = list(ub.compress(gids1, flags))
gids2 = list(ub.compress(gids2, flags))
current_chunk = []
for gid1, gid2 in zip(gids1, gids2):
pred_coco_img = pred_coco.coco_image(gid1).detach()
true_coco_img = true_coco.coco_image(gid2).detach()
true_dets = true_coco.annots(gid=gid1).detections
vidid1 = true_coco.imgs[gid1]['video_id']
video1 = true_coco.index.videos[vidid1]
job = metrics_executor.submit(
single_image_segmentation_metrics, pred_coco_img,
true_coco_img, true_classes, true_dets, video1,
thresh_bins=thresh_bins, config=config)
if len(current_chunk) >= chunk_size:
job_chunks.append(current_chunk)
current_chunk = []
current_chunk.append(job)
prog.update()
if len(current_chunk) > 0:
job_chunks.append(current_chunk)
# Submit scoring jobs over pairs of true-predicted images without videos
if score_space == 'image':
gids1 = image_matches['match_gids1']
gids2 = image_matches['match_gids2']
gid_pairs = list(zip(gids1, gids2))
# Might want to vary the order (or shuffle) depending on user input
gid_pairs = sorted(gid_pairs, key=lambda x: x[0])
# TODO: modify to prevent to many unhandled jobs from building up and
# causing memory issues. Maybe with kwutil.BlockingJobQueue
for gid1, gid2 in gid_pairs:
pred_coco_img = pred_coco.coco_image(gid1).detach()
true_coco_img = true_coco.coco_image(gid2).detach()
true_dets = true_coco.annots(gid=gid1).detections
video1 = None
job = metrics_executor.submit(
single_image_segmentation_metrics, pred_coco_img,
true_coco_img, true_classes, true_dets, video1,
thresh_bins=thresh_bins, config=config)
prog.update()
job_chunks.append([job])
else:
if len(image_matches['match_gids1']) > 0:
warnings.warn(ub.paragraph(
f'''
Scoring was requested in video mode, but there are
{len(image_matches['match_gids1'])} true/pred image pairs that
are unassociated with a video. These pairs will not be included
in video space scoring.
'''))
prog.end()
num_jobs = sum(map(len, job_chunks))
RICH_PROG = 'auto'
if RICH_PROG == 'auto':
# Use rich outside of slurm
RICH_PROG = not os.environ.get('SLURM_JOBID', '')
pman = util_progress.ProgressManager(backend='rich' if RICH_PROG else 'progiter')
DEBUG = 0
if DEBUG:
orig_infos = []
VERBOSE_DEBUG = 0
rows = []
with pman:
score_prog = pman.progiter(desc="[cyan] Scoring...", total=num_jobs)
score_prog.start()
if draw_heatmaps:
draw_prog = pman.progiter(desc="[green] Drawing...", total=len(job_chunks))
draw_prog.start()
for job_chunk in job_chunks:
chunk_info = []
for job in job_chunk:
info = job.result()
if VERBOSE_DEBUG:
print('Gather job result')
if DEBUG:
orig_infos.append(info)
score_prog.update(1)
rows.append(info['row'])
if VERBOSE_DEBUG:
print(f'Add new row: {info["row"]}')
print(f'Table size: {len(rows)}')
class_measures = info.get('class_measures', None)
salient_measures = info.get('salient_measures', None)
if salient_measures is not None:
salient_measure_combiner.submit(salient_measures)
if class_measures is not None:
class_measure_combiner.submit(class_measures)
if draw_heatmaps:
chunk_info.append(info)
# Once a job chunk is done, clear its memory
if VERBOSE_DEBUG:
print(f'Clear job chunk of len {len(job_chunk)}')
job = None
job_chunk.clear()
# Reduce measures over the chunk
if salient_measure_combiner.queue_size > chunk_size:
salient_measure_combiner.combine()
if class_measure_combiner.queue_size > chunk_size:
class_measure_combiner.combine()
if draw_heatmaps:
heatmap_dpath = (ub.Path(eval_dpath) / 'heatmaps').ensuredir()
# Let the draw executor release any memory it can
remaining_draw_jobs = []
if VERBOSE_DEBUG:
print(f'Handle {len(draw_jobs)} draw jobs')
for draw_job in draw_jobs:
if draw_job.done():
draw_job.result()
draw_prog.update(1)
else:
remaining_draw_jobs.append(draw_job)
draw_job = None
draw_jobs = remaining_draw_jobs
if VERBOSE_DEBUG:
print(f'Remaining draw jobs: {len(draw_jobs)}')
# As chunks of evaluation jobs complete, submit background jobs to
# draw results to disk if requested.
true_gids = [info['row']['true_gid'] for info in chunk_info]
true_coco_imgs = true_coco.images(true_gids).coco_images
true_coco_imgs = [g.detach() for g in true_coco_imgs]
if VERBOSE_DEBUG:
print(f'Submit {len(true_gids)} new draw jobs')
draw_job = draw_executor.submit(
dump_chunked_confusion, full_classes, true_coco_imgs,
chunk_info, heatmap_dpath, title=title, config=config)
draw_jobs.append(draw_job)
if VERBOSE_DEBUG:
print('Finished metric jobs')
metrics_executor.shutdown()
if draw_heatmaps:
# Allow all drawing jobs to finalize
if VERBOSE_DEBUG:
print(f'Finalize {len(draw_jobs)} draw jobs')
while draw_jobs:
job = draw_jobs.pop()
job.result()
draw_prog.update(1)
draw_executor.shutdown()
df = pd.DataFrame(rows)
df_summary = df.describe().T
print('Per Image Pixel Measures')
rich.print(df)
rich.print(df_summary.to_string())
if eval_dpath is not None:
perimage_table_fpath = eval_dpath / 'perimage_table.json'
perimage_summary_fpath = eval_dpath / 'perimage_summary.json'
perimage_table_fpath.write_text(df.to_json(orient='table', indent=4))
perimage_summary_fpath.write_text(df_summary.to_json(orient='table', indent=4))
# Finalize all of the aggregated measures
print('Finalize salient measures')
# Note: this will return False if there are no salient measures
salient_combo_measures = salient_measure_combiner.finalize()
if salient_combo_measures is False or salient_combo_measures is None:
# Use nan measures from empty binary confusion vectors
salient_combo_measures = BinaryConfusionVectors(None).measures()
# print('salient_combo_measures = {!r}'.format(salient_combo_measures))
if DEBUG:
# Redo salient combine
tocombine = []
for p in tocombine:
z = ub.dict_isect(p, {'fp_count', 'tp_count', 'fn_count', 'tn_count', 'thresholds', 'nsupport'})
print(ub.urepr(ub.map_vals(list, z), nl=0))
salient_measure_combiner = MeasureCombiner(thresh_bins=thresh_bins)
print('salient_combo_measures.__dict__ = {!r}'.format(salient_combo_measures.__dict__))
# precision = None
# growth = None
from kwcoco.metrics.confusion_measures import Measures
for info in orig_infos:
class_measures = info.get('class_measures', None)
salient_measures = info.get('salient_measures', None)
if salient_measures is not None:
tocombine.append(salient_measures)
salient_measure_combiner.submit(salient_measures)
combo = Measures.combine(tocombine, thresh_bins=thresh_bins).reconstruct()
print('combo = {!r}'.format(combo))
combo = Measures.combine(tocombine, precision=2)
combo.reconstruct()
print('combo = {!r}'.format(combo))
combo = Measures.combine(tocombine, growth='max')
combo.reconstruct()
print('combo = {!r}'.format(combo))
salient_combo_measures = salient_measure_combiner.finalize()
print('salient_combo_measures = {!r}'.format(salient_combo_measures))
print('Finalize class measures')
class_combo_measure_dict = class_measure_combiner.finalize()
ovr_combo_measures = class_combo_measure_dict['perclass']
# Combine class + salient measures using the "SingleResult" container
# (TODO: better API)
result = CocoSingleResult(
salient_combo_measures, ovr_combo_measures, None, meta)
rich.print('result = {}'.format(result))
meta['info'].append(pcontext.stop())
if salient_combo_measures is not None:
if eval_dpath is not None:
if isinstance(salient_combo_measures, dict):
salient_combo_measures['meta'] = meta
title = '\n'.join(meta.get('title_parts', [meta.get('title', '')]))
if eval_fpath is None:
eval_fpath = curve_dpath / 'measures2.json'
print('Dump eval_fpath={}'.format(eval_fpath))
result.dump(os.fspath(eval_fpath))
if draw_curves:
import kwplot
# kwplot.autompl()
with kwplot.BackendContext('agg'):
fig = kwplot.figure(doclf=True)
print('Dump salient figures')
salient_combo_measures.summary_plot(fnum=1, title=title)
fig = kwplot.autoplt().gcf()
fig.savefig(str(curve_dpath / 'salient_summary.png'))
print('Dump class figures')
result.dump_figures(curve_dpath, expt_title=title)
summary = {}
if class_combo_measure_dict is not None:
summary['class_mAP'] = class_combo_measure_dict['mAP']
summary['class_mAUC'] = class_combo_measure_dict['mAUC']
if salient_combo_measures is not None:
summary['salient_ap'] = salient_combo_measures['ap']
summary['salient_auc'] = salient_combo_measures['auc']
summary['salient_max_f1'] = salient_combo_measures['max_f1']
rich.print('summary = {}'.format(ub.urepr(
summary, nl=1, precision=4, align=':', sort=0)))
rich.print(f'Eval Dpath: [link={eval_dpath}]{eval_dpath}[/link]')
print(f'eval_fpath={eval_fpath}')
return df
[docs]
def _redraw_measures(eval_dpath):
"""
hack helper for developer, not critical
"""
curve_dpath = ub.Path(eval_dpath) / 'curves'
measures_fpath = curve_dpath / 'measures.json'
with open(measures_fpath, 'r') as file:
state = json.load(file)
salient_combo_measures = Measures.from_json(state)
meta = salient_combo_measures.get('meta', [])
title = ''
if meta is not None:
if isinstance(meta, list):
# Old
for item in meta:
title = item.get('title', title)
else:
# title = meta.get('title', title)
title = '\n'.join(meta.get('title_parts', [meta.get('title', '')]))
import kwplot
with kwplot.BackendContext('agg'):
salient_combo_measures.summary_plot(fnum=1, title=title)
fig = kwplot.autoplt().gcf()
fig.savefig(str(curve_dpath / 'summary_redo.png'))
[docs]
def _max_digits(max_num):
"""
Use like this:
your_var = 231
max_num = 9180
num_digits = _max_digits(max_num)
f'{your_var:0{num_digits}d}'
# or
f'{your_var:0{_max_digits(max_num)}d}'
"""
import math
if max_num is None:
num_digits = 8
else:
num_digits = int(math.log10(max(max_num, 1))) + 1
return num_digits
[docs]
def associate_images(dset1, dset2, key_fallback=None, valid_image_ids=None):
"""
Builds an association between image-ids in two datasets.
One use for this is if ``dset1`` is a truth dataset and ``dset2`` is a
prediction dataset, and you need the to know which images are in common so
they can be scored.
Args:
dset1 (kwcoco.CocoDataset): a kwcoco dataset.
dset2 (kwcoco.CocoDataset): another kwcoco dataset
key_fallback (str):
The fallback key to use if the image "name" is not specified.
This can either be "file_name" or "id" or None.
valid_image_ids (set | None): if given, filter out matches where
the truth image ids are not in this set.
TODO:
- [ ] port to kwcoco proper
- [ ] use in kwcoco eval as a robust image/video association method
Example:
>>> import kwcoco
>>> from kwcoco.demo.perterb import perterb_coco
>>> dset1 = kwcoco.CocoDataset.demo('shapes2')
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': (0, 10),
>>> 'n_fn': (0, 10),
>>> }
>>> dset2 = perterb_coco(dset1, **kwargs)
>>> matches = associate_images(dset1, dset2, key_fallback='file_name')
>>> assert len(matches['image']['match_gids1'])
>>> assert len(matches['image']['match_gids2'])
>>> assert not len(matches['video'])
Example:
>>> import kwcoco
>>> from kwcoco.demo.perterb import perterb_coco
>>> dset1 = kwcoco.CocoDataset.demo('vidshapes2')
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': (0, 10),
>>> 'n_fn': (0, 10),
>>> }
>>> dset2 = perterb_coco(dset1, **kwargs)
>>> matches = associate_images(dset1, dset2, key_fallback='file_name')
>>> assert not len(matches['image']['match_gids1'])
>>> assert not len(matches['image']['match_gids2'])
>>> assert len(matches['video'])
"""
common_vidnames = (set(dset1.index.name_to_video) &
set(dset2.index.name_to_video))
def image_keys(dset, gids):
# Generate image "keys" that should be compatible between datasets
for gid in gids:
img = dset.imgs[gid]
if img.get('name', None) is not None:
yield img['name']
else:
if key_fallback is None:
raise Exception('images require names to associate')
elif key_fallback == 'id':
yield img['id']
elif key_fallback == 'file_name':
yield img['file_name']
else:
raise KeyError(key_fallback)
all_gids1 = list(dset1.imgs.keys())
all_gids2 = list(dset2.imgs.keys())
all_keys1 = list(image_keys(dset1, all_gids1))
all_keys2 = list(image_keys(dset2, all_gids2))
key_to_gid1 = ub.dzip(all_keys1, all_gids1)
key_to_gid2 = ub.dzip(all_keys2, all_gids2)
gid_to_key1 = ub.invert_dict(key_to_gid1)
gid_to_key2 = ub.invert_dict(key_to_gid2)
video_matches = []
all_match_gids1 = set()
all_match_gids2 = set()
for vidname in common_vidnames:
video1 = dset1.index.name_to_video[vidname]
video2 = dset2.index.name_to_video[vidname]
vidid1 = video1['id']
vidid2 = video2['id']
gids1 = dset1.index.vidid_to_gids[vidid1]
gids2 = dset2.index.vidid_to_gids[vidid2]
keys1 = ub.oset(ub.take(gid_to_key1, gids1))
keys2 = ub.oset(ub.take(gid_to_key2, gids2))
match_keys = ub.oset(keys1) & ub.oset(keys2)
match_gids1 = list(ub.take(key_to_gid1, match_keys))
match_gids2 = list(ub.take(key_to_gid2, match_keys))
all_match_gids1.update(match_gids1)
all_match_gids2.update(match_gids2)
video_matches.append({
'vidname': vidname,
'match_gids1': match_gids1,
'match_gids2': match_gids2,
})
# Associate loose images not belonging to any video
unmatched_gid_to_key1 = ub.dict_diff(gid_to_key1, all_match_gids1)
unmatched_gid_to_key2 = ub.dict_diff(gid_to_key2, all_match_gids2)
remain_keys = (set(unmatched_gid_to_key1.values()) &
set(unmatched_gid_to_key2.values()))
remain_gids1 = [key_to_gid1[key] for key in remain_keys]
remain_gids2 = [key_to_gid2[key] for key in remain_keys]
image_matches = {
'match_gids1': remain_gids1,
'match_gids2': remain_gids2,
}
matches = {
'image': image_matches,
'video': video_matches,
}
if valid_image_ids is not None:
# Filter invalid images
for item in video_matches + [image_matches]:
gids1 = item['match_gids1']
gids2 = item['match_gids1']
new_gids1 = []
new_gids2 = []
for gid1, gid2 in zip(gids1, gids2):
if gid1 in valid_image_ids:
new_gids1.append(gid1)
new_gids2.append(gid2)
item['match_gids1'] = new_gids1
item['match_gids2'] = new_gids2
return matches
[docs]
def ensure_heuristic_coco_colors(coco_dset, force=False):
"""
Args:
coco_dset (kwcoco.CocoDataset): object to modify
force (bool): if True, overwrites existing colors if needed
TODO:
- [ ] Move this non-heuristic functionality to
:func:`kwcoco.CocoDataset.ensure_class_colors`
Example:
>>> import kwcoco
>>> coco_dset = kwcoco.CocoDataset.demo()
>>> ensure_heuristic_coco_colors(coco_dset)
>>> assert all(c['color'] for c in coco_dset.cats.values())
"""
CATEGORIES = []
for hcat in CATEGORIES:
cat = coco_dset.index.name_to_cat.get(hcat['name'], None)
if cat is not None:
if force or cat.get('color', None) is None:
cat['color'] = hcat['color']
data_dicts = coco_dset.dataset['categories']
_ensure_distinct_dict_colors(data_dicts)
[docs]
def ensure_heuristic_category_tree_colors(classes, force=False):
"""
Args:
classes (kwcoco.CategoryTree): object to modify
force (bool): if True, overwrites existing colors if needed
TODO:
- [ ] Move this non-heuristic functionality to
:func:`kwcoco.CategoryTree.ensure_colors`
- [ ] Consolidate with ~/code/watch/geowatch/tasks/fusion/utils :: category_tree_ensure_color
- [ ] Consolidate with ~/code/watch/geowatch/utils/kwcoco_extensions :: category_category_colors
- [ ] Consolidate with ~/code/watch/geowatch/heuristics.py :: ensure_heuristic_category_tree_colors
- [ ] Consolidate with ~/code/watch/geowatch/heuristics.py :: ensure_heuristic_coco_colors
Example:
>>> # xdoctest: +REQUIRES(module:kwutil)
>>> import kwcoco
>>> classes = kwcoco.CategoryTree.coerce(['ignore', 'positive', 'Active Construction', 'foobar', 'Unknown', 'baz'])
>>> ensure_heuristic_category_tree_colors(classes)
>>> assert all(d['color'] for n, d in classes.graph.nodes(data=True))
"""
# Set any missing class color with the heuristic category
CATEGORIES = []
for hcat in CATEGORIES:
node_data = classes.graph.nodes.get(hcat['name'], None)
if node_data is not None:
if force or node_data.get('color', None) is None:
node_data['color'] = hcat['color']
data_dicts = [data for node, data in classes.graph.nodes(data=True)]
_ensure_distinct_dict_colors(data_dicts)
[docs]
def _ensure_distinct_dict_colors(data_dicts, force=False):
# Generalized part that could move to kwcoco
have_dicts = [d for d in data_dicts if d.get('color', None) is not None]
miss_dicts = [d for d in data_dicts if d.get('color', None) is None]
num_uncolored = len(miss_dicts)
if num_uncolored:
import kwimage
existing_colors = [kwimage.Color(d['color']).as01() for d in have_dicts]
new_colors = kwimage.Color.distinct(
num_uncolored, existing=existing_colors, legacy=False)
for d, c in zip(miss_dicts, new_colors):
d['color'] = c
[docs]
def colorize_weights(weights):
"""
Normally weights will range between 0 and 1, but in some cases they may
range higher. We handle this by coloring the 0-1 range in grayscale and
the 1-infinity range in color.
This could move to kwplot, or even kwimage.Heatmap, but we want to figure
out a better name to indicate this is for things that "should be" in the
0-1 range, but there are cases where a weight of 1 can be exceeded.
We should also be able to return a legend for this.
Example:
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> import kwarray
>>> weights = kwimage.gaussian_patch((32, 32))
>>> weights = kwarray.normalize(weights)
>>> weights[:16, :16] *= 10
>>> weights[16:, :16] *= 100
>>> weights[16:, 16:] *= 1000
>>> weights[:16, 16:] *= 10000
>>> canvas = colorize_weights(weights)
>>> # xdoctest: +REQUIRES(--show)
>>> canvas = kwimage.imresize(canvas, dsize=(512, 512), interpolation='nearest').clip(0, 1)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-10', org=(1, 1), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-100', org=(256, 1), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-1000', org=(256, 256), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-10000', org=(1, 256), border=True)
>>> import kwplot
>>> import kwplot
>>> kwplot.plt.ion()
>>> kwplot.imshow(canvas)
Example:
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> import kwarray
>>> weights = kwimage.gaussian_patch((32, 32))
>>> n = 512
>>> weight_rows = [
>>> np.linspace(0, 1, n),
>>> np.linspace(0, 10, n),
>>> np.linspace(0, 100, n),
>>> np.linspace(0, 1000, n),
>>> np.linspace(0, 2000, n),
>>> np.linspace(0, 5000, n),
>>> np.linspace(0, 8000, n),
>>> np.linspace(0, 10000, n),
>>> np.linspace(0, 100000, n),
>>> np.linspace(0, 1000000, n),
>>> ]
>>> canvas = np.array([colorize_weights(row[None, :])[0] for row in weight_rows])
>>> # xdoctest: +REQUIRES(--show)
>>> canvas = kwimage.imresize(canvas, dsize=(512, 512), interpolation='nearest').clip(0, 1)
>>> p = int(512 / len(weight_rows))
>>> canvas = kwimage.draw_text_on_image(canvas, '0-1', org=(1, 1 + p * 0), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-10', org=(1, 1 + p * 1), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-100', org=(1, 1 + p * 2), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-1000', org=(1, 1 + p * 3), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-2000', org=(1, 1 + p * 4), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-5000', org=(1, 1 + p * 5), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-8000', org=(1, 1 + p * 6), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-10000', org=(1, 1 + p * 7), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-100000', org=(1, 1 + p * 8), border=True)
>>> canvas = kwimage.draw_text_on_image(canvas, '0-1000000', org=(1, 1 + p * 9), border=True)
>>> import kwplot
>>> import kwplot
>>> kwplot.plt.ion()
>>> kwplot.imshow(canvas)
"""
# import xdev
# with xdev.embed_on_exception_context:
try:
canvas = kwimage.atleast_3channels(weights.copy())
except ValueError:
# probably an integer?
canvas = np.full((1, 1, 3), fill_value=weights)
is_gt_one = weights > 1.0
if np.any(is_gt_one):
import matplotlib as mpl
import matplotlib.cm # NOQA
from scipy import interpolate
cmap_ = mpl.colormaps['YlOrRd']
# cmap_ = mpl.colormaps['gist_rainbow']
gt_one_values = weights[is_gt_one]
max_val = gt_one_values.max()
# Define a function that maps values from [1,inf) to [0,1]
# the last max value part does depend on the inputs, which is fine.
mapper = interpolate.interp1d(x=[1.0, 10.0, 100.0, max(max_val, 1000.0)],
y=[0.0, 0.5, 0.75, 1.0])
cmap_values = mapper(gt_one_values)
colors01 = cmap_(cmap_values)[..., 0:3]
rs, cs = np.where(is_gt_one)
canvas[rs, cs, :] = colors01
return canvas
[docs]
def _poc_online_binary_saliency_measures_demo():
"""
proof of concpet for online segmentation measures
"""
import kwarray
# import numpy as np
# import kwarray
class BinarySegmentationMetrics:
"""
TODO: expose and make similar to DetectionMetrics
TODO: add multi-class variant
"""
def __init__(self, thresh_bins=128, max_queue_size=5):
from kwcoco.metrics.confusion_measures import MeasureCombiner
self.thresh_bins = thresh_bins
self.max_queue_size = max_queue_size
self.left_bin_edges = np.linspace(0, 1, self.thresh_bins)
self.combiner = MeasureCombiner(thresh_bins=thresh_bins)
def add_item(self, true_mask, pred_probs, weights=None):
"""
true_mask: np.ndarray of shape (H, W), bool or int (0 or 1)
pred_probs: np.ndarray of shape (H, W), float in [0, 1]
"""
from kwcoco.metrics.confusion_vectors import BinaryConfusionVectors
assert pred_probs.shape == true_mask.shape
probs = pred_probs.copy()
truth = true_mask.astype(bool)
if weights is None:
weights = np.ones_like(probs, dtype=np.float32)
invalid_mask = np.isnan(probs)
probs[invalid_mask] = 0
weights[invalid_mask] = 0
pred_score = probs.ravel()
if self.left_bin_edges is not None:
# pre-clip the scores to bins (makes combination more
# efficient)
rounded_idx = np.searchsorted(self.left_bin_edges, pred_score)
pred_score = self.left_bin_edges[np.clip(rounded_idx, 0, len(self.left_bin_edges) - 1)]
bin_cfns = BinaryConfusionVectors(kwarray.DataFrameArray({
'is_true': truth.ravel(),
'pred_score': pred_score,
'weight': weights.ravel().astype(np.float32),
}))
self.combiner.submit(bin_cfns.measures())
if self.combiner.queue_size > self.max_queue_size:
self.combiner.combine()
def measures(self):
return self.combiner.finalize()
bin_sseg_metrics = BinarySegmentationMetrics(
thresh_bins=128,
max_queue_size=5,
)
rng = kwarray.ensure_rng()
num_items = 10
for _ in range(num_items):
# Pretend we have some truth labels and prediction probabilities
truth = (rng.rand(32, 32) > 0.5)
probs = rng.rand(32, 32)
bin_sseg_metrics.add_item(truth, probs)
final_measures = bin_sseg_metrics.measures()
print(f'final_measures = {ub.urepr(final_measures, nl=1)}')
if __name__ == '__main__':
main()