import ubelt as ub
import numpy as np
[docs]
def perterb_coco(coco_dset, **kwargs):
"""
Perterbs a coco dataset
Args:
rng (int, default=0):
box_noise (int, default=0):
cls_noise (int, default=0):
null_pred (bool, default=False):
with_probs (bool, default=False):
score_noise (float, default=0.2):
hacked (int, default=1):
Example:
>>> from kwcoco.demo.perterb import * # NOQA
>>> from kwcoco.demo.perterb import _demo_construct_probs
>>> import kwcoco
>>> coco_dset = true_dset = kwcoco.CocoDataset.demo('shapes2')
>>> kwargs = {
>>> 'box_noise': 0.5,
>>> 'n_fp': 3,
>>> 'with_probs': 1,
>>> 'with_heatmaps': 1,
>>> }
>>> pred_dset = perterb_coco(true_dset, **kwargs)
>>> pred_dset._check_json_serializable()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> gid = 1
>>> canvas = true_dset.delayed_load(gid).finalize()
>>> canvas = true_dset.annots(gid=gid).detections.draw_on(canvas, color='green')
>>> canvas = pred_dset.annots(gid=gid).detections.draw_on(canvas, color='blue')
>>> kwplot.imshow(canvas)
Ignore:
import xdev
from kwcoco.demo.perterb import perterb_coco # NOQA
defaultkw = xdev.get_func_kwargs(perterb_coco)
for k, v in defaultkw.items():
desc = ''
print('{} ({}, default={}): {}'.format(k, type(v).__name__, v, desc))
"""
import kwimage
import kwarray
# Parse kwargs
rng = kwarray.ensure_rng(kwargs.get('rng', 0))
box_noise = kwargs.get('box_noise', 0)
cls_noise = kwargs.get('cls_noise', 0)
null_pred = kwargs.get('null_pred', False)
with_probs = kwargs.get('with_probs', False)
with_heatmaps = kwargs.get('with_heatmaps', False)
verbose = kwargs.get('verbose', 0)
# specify an amount of overlap between true and false scores
score_noise = kwargs.get('score_noise', 0.2)
# Build random variables
from kwarray import distributions
DiscreteUniform = distributions.DiscreteUniform.seeded(rng=rng)
def _parse_arg(key, default):
value = kwargs.get(key, default)
try:
low, high = value
return (low, high + 1)
except Exception:
return (value, value + 1)
n_fp_RV = DiscreteUniform(*_parse_arg('n_fp', 0))
n_fn_RV = DiscreteUniform(*_parse_arg('n_fn', 0))
box_noise_RV = distributions.Normal(0, box_noise, rng=rng)
cls_noise_RV = distributions.Bernoulli(cls_noise, rng=rng)
# the values of true and false scores starts off with no overlap and
# the overlap increases as the score noise increases.
def _interp(v1, v2, alpha):
return v1 * alpha + (1 - alpha) * v2
mid = 0.5
# true_high = 2.0
true_high = 1.0
false_low = 0.0
true_low = _interp(0, mid, score_noise)
false_high = _interp(true_high, mid - 1e-3, score_noise)
true_mean = _interp(0.5, .8, score_noise)
false_mean = _interp(0.5, .2, score_noise)
true_score_RV = distributions.TruncNormal(
mean=true_mean, std=.5, low=true_low, high=true_high, rng=rng)
false_score_RV = distributions.TruncNormal(
mean=false_mean, std=.5, low=false_low, high=false_high, rng=rng)
# Create the category hierarcy
classes = coco_dset.object_categories()
cids = coco_dset.cats.keys()
cidxs = [classes.id_to_idx[c] for c in cids]
frgnd_cx_RV = distributions.CategoryUniform(cidxs, rng=rng)
new_dset = coco_dset.copy()
remove_aids = []
false_anns = []
index_invalidated = False
for gid in ub.ProgIter(coco_dset.imgs.keys(), desc='perterb imgs',
verbose=verbose):
# Sample random variables
n_fp_ = n_fp_RV()
n_fn_ = n_fn_RV()
true_annots = coco_dset.annots(gid=gid)
aids = true_annots.aids
for aid in aids:
# Perterb box coordinates
ann = new_dset.anns[aid]
old_bbox = np.array(ann['bbox'])
new_bbox = (old_bbox + box_noise_RV(4)).tolist()
new_x, new_y, new_w, new_h = new_bbox
allow_neg_boxes = 0
if not allow_neg_boxes:
new_w = max(new_w, 0)
new_h = max(new_h, 0)
old_cxywh = kwimage.Boxes([old_bbox], 'xywh').to_cxywh()
new_cxywh = kwimage.Boxes([new_bbox], 'xywh').to_cxywh()
old_sseg = kwimage.Segmentation.coerce(ann['segmentation'])
# Compute the transform of the box so we can modify the
# other attributes (TODO: we could use a random affine transform
# for everything)
offset = new_cxywh.data[0, 0:2] - old_cxywh.data[0, 0:2]
scale = new_cxywh.data[0, 2:4] / old_cxywh.data[0, 2:4]
old_to_new = kwimage.Affine.coerce(offset=offset, scale=scale)
new_sseg = old_sseg.warp(old_to_new)
# Overwrite the data
ann['segmentation'] = new_sseg.to_coco(style='new')
ann['bbox'] = [new_x, new_y, new_w, new_h]
ann['score'] = float(true_score_RV(1)[0])
if cls_noise_RV():
# Perterb class predictions
ann['category_id'] = classes.idx_to_id[frgnd_cx_RV()]
index_invalidated = True
# Drop true positive boxes
if n_fn_:
import kwarray
drop_idxs = kwarray.shuffle(np.arange(len(aids)), rng=rng)[0:n_fn_]
remove_aids.extend(list(ub.take(aids, drop_idxs)))
# Add false positive boxes
if n_fp_:
try:
img = coco_dset.imgs[gid]
scale = (img['width'], img['height'])
except KeyError:
scale = 100
false_boxes = kwimage.Boxes.random(num=n_fp_, scale=scale,
rng=rng, format='cxywh')
false_cxs = frgnd_cx_RV(n_fp_)
false_scores = false_score_RV(n_fp_)
false_dets = kwimage.Detections(
boxes=false_boxes,
class_idxs=false_cxs,
scores=false_scores,
classes=classes,
)
for ann in list(false_dets.to_coco('new')):
ann['category_id'] = classes.node_to_id[ann.pop('category_name')]
ann['image_id'] = gid
x, y, w, h = ann['bbox']
sseg = kwimage.MultiPolygon.random().scale((w, h)).translate((x, y))
ann['segmentation'] = sseg
false_anns.append(ann)
if null_pred:
raise NotImplementedError
if index_invalidated:
new_dset.index.clear()
new_dset._build_index()
new_dset.remove_annotations(remove_aids)
for ann in false_anns:
new_dset.add_annotation(**ann)
# Hack in the probs
if with_probs:
annots = new_dset.annots()
pred_cids = annots.lookup('category_id')
pred_cxs = np.array([classes.id_to_idx[cid] for cid in pred_cids])
pred_scores = np.array(annots.lookup('score'))
# Transform the scores for the assigned class into a predicted
# probability for each class. (Currently a bit hacky).
pred_probs = _demo_construct_probs(
pred_cxs, pred_scores, classes, rng,
hacked=kwargs.get('hacked', 1))
for aid, prob in zip(annots.aids, pred_probs):
new_dset.anns[aid]['prob'] = prob.tolist()
# Hack in the per-class heatmaps
if with_heatmaps:
for gid in ub.ProgIter(new_dset.images(), desc='Perterb heatmaps', verbose=verbose):
annots = new_dset.annots(gid=gid)
img = new_dset.index.imgs[gid]
w = img['width']
h = img['height']
c = len(classes)
# Build up basic prob masks
heatmaps = np.zeros((c, h, w), dtype=np.float32)
for ann in annots.objs:
poly = kwimage.Segmentation.coerce(ann['segmentation']).to_multi_polygon()
cid = ann['category_id']
cidx = classes.id_to_idx[cid]
probs = heatmaps[cidx]
poly.fill(probs, 1)
chan_datas = []
# Add lots of noise to the data
dims = (h, w)
for cidx in range(len(classes)):
chan_data = heatmaps[cidx]
chan_data += (rng.randn(*dims) * 0.1)
chan_data + chan_data.clip(0, 1)
chan_data = kwimage.gaussian_blur(chan_data, sigma=1.2)
chan_data = chan_data.clip(0, 1)
mask = rng.randn(*dims)
chan_data = chan_data * ((kwimage.fourier_mask(chan_data, mask)[..., 0]) + .5)
chan_data += (rng.randn(*dims) * 0.1)
chan_data = chan_data.clip(0, 1)
chan_datas.append(chan_data)
hwc_probs = np.stack(chan_datas, axis=2)
coco_img = new_dset.coco_image(gid)
chanspec = '|'.join(list(classes))
# heatmap_fpath = dummy_heatmap_dpath / 'dummy_heatmap_{}.tif'.format(img['id'])
# kwimage.imwrite(heatmap_fpath, hwc_probs, backend='gdal', compress='NONE',
# blocksize=96)
coco_img.add_auxiliary_item(
# file_name=str(heatmap_fpath),
imdata=hwc_probs,
channels=chanspec,
)
return new_dset
[docs]
def _demo_construct_probs(pred_cxs, pred_scores, classes, rng, hacked=1):
"""
Constructs random probabilities for demo data
Example:
>>> import kwcoco
>>> import kwarray
>>> rng = kwarray.ensure_rng(0)
>>> classes = kwcoco.CategoryTree.coerce(10)
>>> hacked = 1
>>> pred_cxs = rng.randint(0, 10, 10)
>>> pred_scores = rng.rand(10)
>>> probs = _demo_construct_probs(pred_cxs, pred_scores, classes, rng, hacked)
>>> probs.sum(axis=1)
"""
# Setup probs such that the assigned class receives a probability
# equal-(ish) to the assigned score.
# Its a bit tricky to setup hierarchical probs such that we get the
# scores in the right place. We punt and just make probs
# conditional. The right thing to do would be to do this, and then
# perterb ancestor categories such that the probability evenetually
# converges on the right value at that specific classes depth.
# import torch
# Ensure probs
pred_scores2 = pred_scores.clip(0, 1.0)
class_energy = rng.rand(len(pred_scores2), len(classes)).astype(np.float32)
is_mutex = 0
if hasattr(classes, 'is_mutex') and classes.is_mutex():
is_mutex = 1
if isinstance(classes, (list, tuple)):
is_mutex = 1
if is_mutex:
class_energy = class_energy / class_energy.sum(axis=1, keepdims=True)
for p, x, s in zip(class_energy, pred_cxs, pred_scores2):
# ensure sum to 1 when classes are known mutex
rest = p[0:x].sum() + p[x + 1:].sum()
if s <= 1:
p[:] = p * ((1 - s) / rest)
p[x] = s
else:
for p, x, s in zip(class_energy, pred_cxs, pred_scores2):
p[x] = s
if hacked:
# HACK! All that nice work we did is too slow for doctests
return class_energy
raise AssertionError('must be hacked')
# class_energy = torch.Tensor(class_energy)
# cond_logprobs = classes.conditional_log_softmax(class_energy, dim=1)
# cond_probs = torch.exp(cond_logprobs).numpy()
# # I was having a difficult time getting this right, so an
# # inefficient per-item non-vectorized implementation it is.
# # Note: that this implementation takes 70% of the time in this function
# # and is a bottleneck for the doctests. A vectorized implementation would
# # be nice.
# idx_to_ancestor_idxs = classes.idx_to_ancestor_idxs()
# idx_to_groups = {idx: group for group in classes.idx_groups for idx in group}
# def set_conditional_score(row, cx, score, idx_to_groups):
# group_cxs = np.array(idx_to_groups[cx])
# flags = group_cxs == cx
# group_row = row[group_cxs]
# # Ensure that that heriarchical probs sum to 1
# current = group_row[~flags]
# other = current * (1 - score) / current.sum()
# other = np.nan_to_num(other)
# group_row[~flags] = other
# group_row[flags] = score
# row[group_cxs] = group_row
# for row, cx, score in zip(cond_probs, pred_cxs, pred_scores2):
# set_conditional_score(row, cx, score, idx_to_groups)
# for ancestor_cx in idx_to_ancestor_idxs[cx]:
# if ancestor_cx != cx:
# # Hack all parent probs to 1.0 so conditional probs
# # turn into real probs.
# set_conditional_score(row, ancestor_cx, 1.0, idx_to_groups)
# # TODO: could add a fudge factor here so the
# # conditional prob is higher than score, but parent
# # probs are less than 1.0
# # TODO: could also maximize entropy of descendant nodes
# # so classes.decision2 would stop at this node
# # For each level the conditional probs must sum to 1
# if cond_probs.size > 0:
# for idxs in classes.idx_groups:
# level = cond_probs[:, idxs]
# totals = level.sum(axis=1)
# assert level.shape[1] == 1 or np.allclose(totals, 1.0), str(level) + ' : ' + str(totals)
# cond_logprobs = torch.Tensor(cond_probs).log()
# class_probs = classes._apply_logprob_chain_rule(cond_logprobs, dim=1).exp().numpy()
# class_probs = class_probs.reshape(-1, len(classes))
# # print([p[x] for p, x in zip(class_probs, pred_cxs)])
# # print(pred_scores2)
# return class_probs