# -*- coding: utf-8 -*-
"""
Generates "toydata" for demo and testing purposes.
This is the video version of the toydata generator and should be prefered to
the loose image version in toydata_image.
"""
from __future__ import absolute_import, division, print_function
from os.path import join
import numpy as np
import ubelt as ub
import kwarray
import kwimage
from kwcoco.demo.toypatterns import CategoryPatterns
try:
from xdev import profile
except Exception:
[docs]TOYDATA_VIDEO_VERSION = 20
@profile
[docs]def random_video_dset(
num_videos=1, num_frames=2, num_tracks=2, anchors=None,
image_size=(600, 600), verbose=3, render=False, aux=None,
multispectral=False, rng=None, dpath=None, max_speed=0.01,
channels=None, **kwargs):
"""
Create a toy Coco Video Dataset
Args:
num_videos (int) : number of videos
num_frames (int) : number of images per video
num_tracks (int) : number of tracks per video
image_size (Tuple[int, int]):
The width and height of the generated images
render (bool | dict):
if truthy the toy annotations are synthetically rendered. See
:func:`render_toy_image` for details.
rng (int | None | RandomState): random seed / state
dpath (str): only used if render is truthy, place to write rendered
images.
verbose (int, default=3): verbosity mode
aux (bool): if True generates dummy auxiliary channels
multispectral (bool): similar to aux, but does not have the concept of
a "main" image.
max_speed (float): max speed of movers
channels (str): experimental new way to get MSI with specific
band distributions.
**kwargs : used for old backwards compatible argument names
gsize - alias for image_size
SeeAlso:
random_single_video_dset
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> dset = random_video_dset(render=True, num_videos=3, num_frames=2,
>>> num_tracks=5, image_size=(128, 128))
>>> # xdoctest: +REQUIRES(--show)
>>> dset.show_image(1, doclf=True)
>>> dset.show_image(2, doclf=True)
>>> from kwcoco.demo.toydata_video import * # NOQA
dset = random_video_dset(render=False, num_videos=3, num_frames=2,
num_tracks=10)
dset._tree()
dset.imgs[1]
dset = random_single_video_dset()
dset._tree()
dset.imgs[1]
from kwcoco.demo.toydata_video import * # NOQA
dset = random_video_dset(render=True, num_videos=3, num_frames=2,
num_tracks=10)
print(dset.imgs[1])
print('dset.bundle_dpath = {!r}'.format(dset.bundle_dpath))
dset._tree()
import xdev
globals().update(xdev.get_func_kwargs(random_video_dset))
num_videos = 2
"""
if 'gsize' in kwargs: # nocover
if 0:
# TODO: enable this warning
import warnings
warnings.warn('gsize is deprecated. Use image_size param instead',
DeprecationWarning)
image_size = kwargs.pop('gsize')
assert len(kwargs) == 0, 'unknown kwargs={}'.format(**kwargs)
rng = kwarray.ensure_rng(rng)
subsets = []
tid_start = 1
gid_start = 1
for vidid in range(1, num_videos + 1):
dset = random_single_video_dset(
image_size=image_size, num_frames=num_frames,
num_tracks=num_tracks, tid_start=tid_start, anchors=anchors,
gid_start=gid_start, video_id=vidid, render=False, autobuild=False,
aux=aux, multispectral=multispectral, max_speed=max_speed,
channels=channels, rng=rng)
try:
gid_start = dset.dataset['images'][-1]['id'] + 1
tid_start = dset.dataset['annotations'][-1]['track_id'] + 1
except IndexError:
pass
subsets.append(dset)
if num_videos == 0:
raise AssertionError
if num_videos == 1:
dset = subsets[0]
else:
import kwcoco
assert len(subsets) > 1, '{}'.format(len(subsets))
dset = kwcoco.CocoDataset.union(*subsets)
# The dataset has been prepared, now we just render it and we have
# a nice video dataset.
renderkw = {
'dpath': None,
}
if isinstance(render, dict):
renderkw.update(render)
else:
if not render:
renderkw = None
if renderkw:
if dpath is None:
dpath = ub.ensure_app_cache_dir('kwcoco', 'demo_vidshapes')
render_toy_dataset(dset, rng=rng, dpath=dpath, renderkw=renderkw)
dset.fpath = join(dpath, 'data.kwcoco.json')
dset._build_index()
return dset
@profile
[docs]def random_single_video_dset(image_size=(600, 600), num_frames=5,
num_tracks=3, tid_start=1, gid_start=1,
video_id=1, anchors=None, rng=None, render=False,
dpath=None, autobuild=True, verbose=3, aux=None,
multispectral=False, max_speed=0.01,
channels=None, **kwargs):
"""
Create the video scene layout of object positions.
Note:
Does not render the data unless specified.
Args:
image_size (Tuple[int, int]): size of the images
num_frames (int): number of frames in this video
num_tracks (int): number of tracks in this video
tid_start (int, default=1): track-id start index
gid_start (int, default=1): image-id start index
video_id (int, default=1): video-id of this video
anchors (ndarray | None): base anchor sizes of the object boxes we will
generate.
rng (RandomState): random state / seed
render (bool | dict): if truthy, does the rendering according to
provided params in the case of dict input.
autobuild (bool, default=True): prebuild coco lookup indexes
verbose (int): verbosity level
aux (bool | List[str]): if specified generates auxiliary channels
multispectral (bool): if specified simulates multispectral imagry
This is similar to aux, but has no "main" file.
max_speed (float):
max speed of movers
channels (str | None | ChannelSpec):
if specified generates multispectral images with dummy channels
**kwargs : used for old backwards compatible argument names
gsize - alias for image_size
TODO:
- [ ] Need maximum allowed object overlap measure
- [ ] Need better parameterized path generation
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> anchors = np.array([ [0.3, 0.3], [0.1, 0.1]])
>>> dset = random_single_video_dset(render=True, num_frames=10, num_tracks=10, anchors=anchors, max_speed=0.2)
>>> # xdoctest: +REQUIRES(--show)
>>> # Show the tracks in a single image
>>> import kwplot
>>> kwplot.autompl()
>>> annots = dset.annots()
>>> tids = annots.lookup('track_id')
>>> tid_to_aids = ub.group_items(annots.aids, tids)
>>> paths = []
>>> track_boxes = []
>>> for tid, aids in tid_to_aids.items():
>>> boxes = dset.annots(aids).boxes.to_cxywh()
>>> path = boxes.data[:, 0:2]
>>> paths.append(path)
>>> track_boxes.append(boxes)
>>> import kwplot
>>> plt = kwplot.autoplt()
>>> ax = plt.gca()
>>> ax.cla()
>>> #
>>> import kwimage
>>> colors = kwimage.Color.distinct(len(track_boxes))
>>> for i, boxes in enumerate(track_boxes):
>>> color = colors[i]
>>> path = boxes.data[:, 0:2]
>>> boxes.draw(color=color, centers={'radius': 0.01}, alpha=0.5)
>>> ax.plot(path.T[0], path.T[1], 'x-', color=color)
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> anchors = np.array([ [0.2, 0.2], [0.1, 0.1]])
>>> gsize = np.array([(600, 600)])
>>> print(anchors * gsize)
>>> dset = random_single_video_dset(render=True, num_frames=10,
>>> anchors=anchors, num_tracks=10,
>>> image_size='random')
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> plt = kwplot.autoplt()
>>> plt.clf()
>>> gids = list(dset.imgs.keys())
>>> pnums = kwplot.PlotNums(nSubplots=len(gids))
>>> for gid in gids:
>>> dset.show_image(gid, pnum=pnums(), fnum=1, title=False)
>>> pnums = kwplot.PlotNums(nSubplots=len(gids))
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> dset = random_single_video_dset(num_frames=10, num_tracks=10, aux=True)
>>> assert 'auxiliary' in dset.imgs[1]
>>> assert dset.imgs[1]['auxiliary'][0]['channels']
>>> assert dset.imgs[1]['auxiliary'][1]['channels']
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> multispectral = True
>>> dset = random_single_video_dset(num_frames=1, num_tracks=1, multispectral=True)
>>> dset._check_json_serializable()
>>> dset.dataset['images']
>>> assert dset.imgs[1]['auxiliary'][1]['channels']
>>> # test that we can render
>>> render_toy_dataset(dset, rng=0, dpath=None, renderkw={})
Ignore:
import xdev
globals().update(xdev.get_func_kwargs(random_single_video_dset))
dset = random_single_video_dset(render=True, num_frames=10,
anchors=anchors, num_tracks=10)
dset._tree()
"""
import pandas as pd
import kwcoco
from kwarray import distributions
rng = kwarray.ensure_rng(rng)
if 'gsize' in kwargs: # nocover
if 0:
# TODO: enable this warning
import warnings
warnings.warn('gsize is deprecated. Use image_size param instead',
DeprecationWarning)
image_size = kwargs.pop('gsize')
assert len(kwargs) == 0, 'unknown kwargs={}'.format(**kwargs)
image_ids = list(range(gid_start, num_frames + gid_start))
track_ids = list(range(tid_start, num_tracks + tid_start))
if isinstance(image_size, str):
if image_size == 'random':
image_size = (
distributions.Uniform(200, 800),
distributions.Uniform(200, 800),
)
coercable_width = image_size[0]
coercable_height = image_size[1]
if isinstance(coercable_width, distributions.Distribution):
video_width = int(coercable_width.sample())
video_height = int(coercable_height.sample())
image_width_distri = coercable_width
image_height_distri = coercable_height
else:
video_width = coercable_width
video_height = coercable_height
image_width_distri = distributions.Constant(coercable_width)
image_height_distri = distributions.Constant(coercable_height)
video_dsize = (video_width, video_height)
video_window = kwimage.Boxes([[0, 0, video_width, video_height]], 'xywh')
dset = kwcoco.CocoDataset(autobuild=False)
dset.add_video(
name='toy_video_{}'.format(video_id),
width=video_width,
height=video_height,
id=video_id,
)
if bool(multispectral) + bool(aux) + bool(channels) > 1:
raise ValueError('can only have one of multispectral, aux, or channels')
# backwards compat
no_main_image = False
if channels is None:
if aux is True:
channels = 'disparity,flowx|flowy'
if multispectral:
channels = 'B1,B8,B8a,B10,B11'
no_main_image = True
else:
no_main_image = True
special_fusedbands_to_scale = {
'disparity': 1,
'flowx|flowy': 1,
'B1': 1,
'B8': 1 / 6,
'B8a': 1 / 3,
'B10': 1,
'B11': 1 / 3,
}
if channels is not None:
channels = kwcoco.ChannelSpec.coerce(channels)
for frame_idx, gid in enumerate(image_ids):
image_height = int(image_height_distri.sample())
image_width = int(image_width_distri.sample())
warp_vid_from_img = kwimage.Affine.coerce(
scale=(video_width / image_width, video_height / image_height)
)
warp_img_from_vid = warp_vid_from_img.inv()
img = {
'id': gid,
'file_name': '<todo-generate-{}-{}>'.format(video_id, frame_idx),
'width': image_width,
'height': image_height,
'warp_img_to_vid': warp_vid_from_img.concise(),
'frame_index': frame_idx,
'video_id': video_id,
}
if no_main_image:
# TODO: can we do better here?
img['name'] = 'generated-{}-{}'.format(video_id, frame_idx)
img['file_name'] = None
if channels is not None:
img['auxiliary'] = []
for stream in channels.streams():
scale = special_fusedbands_to_scale.get(stream.spec, None)
if scale is not None:
warp_img_from_aux = kwimage.Affine.scale(scale=1 / scale)
else:
warp_img_from_aux = kwimage.Affine.random(rng=rng)
warp_aux_from_img = warp_img_from_aux.inv()
warp_aux_from_vid = warp_aux_from_img @ warp_img_from_vid
aux_window = video_window.warp(warp_aux_from_vid).quantize()
aux_width = aux_window.width.item()
aux_height = aux_window.height.item()
auxitem = {
'file_name': '<todo-generate-{}-{}>'.format(video_id, frame_idx),
'channels': stream.spec,
'width': aux_width,
'height': aux_height,
'warp_aux_to_img': warp_img_from_aux.concise(),
}
# hack for dtype
if stream.spec.startswith('B'):
auxitem['dtype'] = 'uint16'
else:
auxitem['dtype'] = 'uint8'
img['auxiliary'].append(auxitem)
dset.add_image(**img)
classes = ['star', 'superstar', 'eff']
for catname in classes:
dset.ensure_category(name=catname)
catpats = CategoryPatterns.coerce(classes, rng=rng)
if True:
# TODO: add ensure keypoint category to dset
# Add newstyle keypoint categories
kpname_to_id = {}
dset.dataset['keypoint_categories'] = []
for kpcat in catpats.keypoint_categories:
dset.dataset['keypoint_categories'].append(kpcat)
kpname_to_id[kpcat['name']] = kpcat['id']
# Generate paths in a way that they are dependant on each other
paths = random_multi_object_path(
num_frames=num_frames,
num_objects=num_tracks, rng=rng,
max_speed=max_speed)
def warp_within_bounds(self, x_min, y_min, x_max, y_max):
"""
Translate / scale the boxes to fit in the bounds
FIXME: do something reasonable
Example:
>>> from kwimage.structs.boxes import * # NOQA
>>> self = Boxes.random(10).scale(1).translate(-10)
>>> x_min, y_min, x_max, y_max = 10, 10, 20, 20
>>> x_min, y_min, x_max, y_max = 0, 0, 20, 20
>>> print('self = {!r}'.format(self))
>>> scaled = warp_within_bounds(self, x_min, y_min, x_max, y_max)
>>> print('scaled = {!r}'.format(scaled))
"""
tlbr = self.to_tlbr()
tl_x, tl_y, br_x, br_y = tlbr.components
tl_xy_min = np.c_[tl_x, tl_y].min(axis=0)
br_xy_max = np.c_[br_x, br_y].max(axis=0)
tl_xy_lb = np.array([x_min, y_min])
br_xy_ub = np.array([x_max, y_max])
size_ub = br_xy_ub - tl_xy_lb
size_max = br_xy_max - tl_xy_min
tl_xy_over = np.minimum(tl_xy_lb - tl_xy_min, 0)
# tl_xy_over = -tl_xy_min
# Now at the minimum coord
tmp = tlbr.translate(tl_xy_over)
_tl_x, _tl_y, _br_x, _br_y = tmp.components
tmp_tl_xy_min = np.c_[_tl_x, _tl_y].min(axis=0)
# tmp_br_xy_max = np.c_[_br_x, _br_y].max(axis=0)
tmp.translate(-tmp_tl_xy_min)
sf = np.minimum(size_ub / size_max, 1)
out = tmp.scale(sf).translate(tmp_tl_xy_min)
return out
for tid, path in zip(track_ids, paths):
if anchors is None:
anchors_ = anchors
else:
anchors_ = np.array([anchors[rng.randint(0, len(anchors))]])
# Box scale
video_boxes = kwimage.Boxes.random(
num=num_frames, scale=1.0, format='cxywh', rng=rng,
anchors=anchors_)
# Smooth out varying box sizes
alpha = rng.rand() * 0.1
wh = pd.DataFrame(video_boxes.data[:, 2:4], columns=['w', 'h'])
ar = wh['w'] / wh['h']
min_ar = 0.25
max_ar = 1 / min_ar
wh['w'][ar < min_ar] = wh['h'] * 0.25
wh['h'][ar > max_ar] = wh['w'] * 0.25
box_dims = wh.ewm(alpha=alpha, adjust=False).mean()
video_boxes.data[:, 0:2] = path
video_boxes.data[:, 2:4] = box_dims.values
video_boxes = video_boxes.scale(video_dsize, about='origin')
video_boxes = video_boxes.scale(0.9, about='center')
# oob_pad = -20 # allow some out of bounds
# oob_pad = 20 # allow some out of bounds
# video_boxes = video_boxes.to_tlbr()
# TODO: need better path distributions
# video_boxes = warp_within_bounds(video_boxes, 0 - oob_pad, 0 - oob_pad, image_size[0] + oob_pad, image_size[1] + oob_pad)
video_boxes = video_boxes.to_xywh()
video_boxes.data = video_boxes.data.round(1)
cidx = rng.randint(0, len(classes))
video_dets = kwimage.Detections(
boxes=video_boxes,
class_idxs=np.array([cidx] * len(video_boxes)),
classes=classes,
)
WITH_KPTS_SSEG = True
if WITH_KPTS_SSEG:
kpts = []
ssegs = []
ddims = video_boxes.data[:, 2:4].astype(int)[:, ::-1]
offsets = video_boxes.data[:, 0:2].astype(int)
cnames = [classes[cidx] for cidx in video_dets.class_idxs]
for dims, xy_offset, cname in zip(ddims, offsets, cnames):
info = catpats._todo_refactor_geometric_info(cname, xy_offset, dims)
kpts.append(info['kpts'])
if False and rng.rand() > 0.5:
# sseg dropout
ssegs.append(None)
else:
ssegs.append(info['segmentation'])
video_dets.data['keypoints'] = kwimage.PointsList(kpts)
video_dets.data['segmentations'] = kwimage.SegmentationList(ssegs)
start_frame = 0
for frame_index, video_det in enumerate(video_dets, start=start_frame):
frame_img = dset.dataset['images'][frame_index]
warp_vid_from_img = kwimage.Affine.coerce(frame_img['warp_img_to_vid'])
warp_img_from_vid = warp_vid_from_img.inv()
image_det = video_det.warp(warp_img_from_vid)
image_ann = list(image_det.to_coco(dset=dset, style='new'))[0]
image_ann['track_id'] = tid
image_ann['image_id'] = frame_img['id']
dset.add_annotation(**image_ann)
HACK_FIX_COCO_KPTS_FORMAT = True
if HACK_FIX_COCO_KPTS_FORMAT:
# Hack to fix coco formatting from Detections.to_coco
kpt_cats = dset.keypoint_categories()
for ann in dset.dataset['annotations']:
for kpt in ann.get('keypoints', []):
if 'keypoint_category_id' not in kpt:
kp_catname = kpt.pop('keypoint_category')
kpt['keypoint_category_id'] = kpt_cats.node_to_id[kp_catname]
# The dataset has been prepared, now we just render it and we have
# a nice video dataset.
renderkw = {
'dpath': dpath,
}
if isinstance(render, dict):
renderkw.update(render)
else:
if not render:
renderkw = None
if renderkw is not None:
render_toy_dataset(dset, rng=rng, dpath=dpath, renderkw=renderkw)
if autobuild:
dset._build_index()
return dset
@profile
[docs]def render_toy_dataset(dset, rng, dpath=None, renderkw=None):
"""
Create toydata_video renderings for a preconstructed coco dataset.
Args:
dset (CocoDataset):
A dataset that contains special "renderable" annotations. (e.g.
the demo shapes). Each image can contain special fields that
influence how an image will be rendered.
Currently this process is simple, it just creates a noisy image
with the shapes superimposed over where they should exist as
indicated by the annotations. In the future this may become more
sophisticated.
Each item in `dset.dataset['images']` will be modified to add
the "file_name" field indicating where the rendered data is writen.
rng (int | None | RandomState): random state
dpath (str):
The location to write the images to. If unspecified, it is written
to the rendered folder inside the kwcoco cache directory.
renderkw (dict): See :func:`render_toy_image` for details.
Also takes imwrite keywords args only handled in this function.
TODO better docs.
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> import kwarray
>>> rng = None
>>> rng = kwarray.ensure_rng(rng)
>>> num_tracks = 3
>>> dset = random_video_dset(rng=rng, num_videos=3, num_frames=5,
>>> num_tracks=num_tracks, image_size=(128, 128))
>>> dset = render_toy_dataset(dset, rng)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> plt = kwplot.autoplt()
>>> plt.clf()
>>> gids = list(dset.imgs.keys())
>>> pnums = kwplot.PlotNums(nSubplots=len(gids), nRows=num_tracks)
>>> for gid in gids:
>>> dset.show_image(gid, pnum=pnums(), fnum=1, title=False)
>>> pnums = kwplot.PlotNums(nSubplots=len(gids))
"""
rng = kwarray.ensure_rng(rng)
dset._build_index()
if 0:
dset._ensure_json_serializable()
hashid = dset._build_hashid()[0:24]
if dpath is None:
dset_name = 'rendered_{}'.format(hashid)
bundle_dpath = ub.ensure_app_cache_dir('kwcoco', 'rendered', dset_name)
dset.fpath = join(bundle_dpath, 'data.kwcoco.json')
bundle_dpath = dset.bundle_dpath
else:
bundle_dpath = dpath
img_dpath = ub.ensuredir((bundle_dpath, '_assets/images'))
imwrite_kw = {}
imwrite_ops = {'compress', 'blocksize', 'interleave', 'options'}
if renderkw is None:
renderkw = {}
imwrite_kw = ub.dict_isect(renderkw, imwrite_ops)
if imwrite_kw:
# imwrite_kw['backend'] = 'gdal'
# imwrite_kw['space'] = None
# imwrite kw requries gdal
from osgeo import gdal # NOQA
for gid in dset.imgs.keys():
# Render data inside the image
img = render_toy_image(dset, gid, rng=rng, renderkw=renderkw)
# Extract the data from memory and write it to disk
fname = 'img_{:05d}.png'.format(gid)
imdata = img.pop('imdata', None)
if imdata is not None:
img_fpath = join(img_dpath, fname)
img.update({
'file_name': img_fpath,
'channels': 'r|g|b',
})
kwimage.imwrite(img_fpath, imdata)
auxiliaries = img.pop('auxiliary', None)
if auxiliaries is not None:
for auxdict in auxiliaries:
aux_dpath = ub.ensuredir(
(bundle_dpath, '_assets',
'aux', 'aux_' + auxdict['channels']))
aux_fpath = ub.augpath(join(aux_dpath, fname), ext='.tif')
ub.ensuredir(aux_dpath)
auxdict['file_name'] = aux_fpath
auxdata = auxdict.pop('imdata', None)
try:
from osgeo import gdal # NOQA
kwimage.imwrite(
aux_fpath, auxdata, backend='gdal', space=None,
**imwrite_kw)
except Exception:
kwimage.imwrite(
aux_fpath, auxdata, space=None, **imwrite_kw)
img['auxiliary'] = auxiliaries
dset._build_index()
return dset
@profile
[docs]def render_toy_image(dset, gid, rng=None, renderkw=None):
"""
Modifies dataset inplace, rendering synthetic annotations.
This does not write to disk. Instead this writes to placeholder values in
the image dictionary.
Args:
dset (CocoDataset): coco dataset with renderable anotations / images
gid (int): image to render
rng (int | None | RandomState): random state
renderkw (dict): rendering config
gray (boo): gray or color images
fg_scale (float): foreground noisyness (gauss std)
bg_scale (float): background noisyness (gauss std)
fg_intensity (float): foreground brightness (gauss mean)
bg_intensity (float): background brightness (gauss mean)
newstyle (bool): use new kwcoco datastructure formats
with_kpts (bool): include keypoint info
with_sseg (bool): include segmentation info
Returns:
Dict: the inplace-modified image dictionary
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> image_size=(600, 600)
>>> num_frames=5
>>> verbose=3
>>> rng = None
>>> import kwarray
>>> rng = kwarray.ensure_rng(rng)
>>> aux = 'mx'
>>> dset = random_single_video_dset(
>>> image_size=image_size, num_frames=num_frames, verbose=verbose, aux=aux, rng=rng)
>>> print('dset.dataset = {}'.format(ub.repr2(dset.dataset, nl=2)))
>>> gid = 1
>>> renderkw = {}
>>> render_toy_image(dset, gid, rng, renderkw=renderkw)
>>> img = dset.imgs[gid]
>>> canvas = img['imdata']
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(canvas, doclf=True, pnum=(1, 2, 1))
>>> dets = dset.annots(gid=gid).detections
>>> dets.draw()
>>> auxdata = img['auxiliary'][0]['imdata']
>>> aux_canvas = false_color(auxdata)
>>> kwplot.imshow(aux_canvas, pnum=(1, 2, 2))
>>> _ = dets.draw()
>>> # xdoctest: +REQUIRES(--show)
>>> img, anns = demodata_toy_img(image_size=(172, 172), rng=None, aux=True)
>>> print('anns = {}'.format(ub.repr2(anns, nl=1)))
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(img['imdata'], pnum=(1, 2, 1), fnum=1)
>>> auxdata = img['auxiliary'][0]['imdata']
>>> kwplot.imshow(auxdata, pnum=(1, 2, 2), fnum=1)
>>> kwplot.show_if_requested()
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> multispectral = True
>>> dset = random_single_video_dset(num_frames=1, num_tracks=1, multispectral=True)
>>> gid = 1
>>> dset.imgs[gid]
>>> rng = kwarray.ensure_rng(0)
>>> renderkw = {'with_sseg': True}
>>> img = render_toy_image(dset, gid, rng=rng, renderkw=renderkw)
"""
rng = kwarray.ensure_rng(rng)
if renderkw is None:
renderkw = {}
gray = renderkw.get('gray', 1)
fg_scale = renderkw.get('fg_scale', 0.5)
bg_scale = renderkw.get('bg_scale', 0.8)
bg_intensity = renderkw.get('bg_intensity', 0.1)
fg_intensity = renderkw.get('fg_intensity', 0.9)
newstyle = renderkw.get('newstyle', True)
with_kpts = renderkw.get('with_kpts', False)
with_sseg = renderkw.get('with_sseg', False)
categories = list(dset.name_to_cat.keys())
catpats = CategoryPatterns.coerce(categories, fg_scale=fg_scale,
fg_intensity=fg_intensity, rng=rng)
if with_kpts and newstyle:
# TODO: add ensure keypoint category to dset
# Add newstyle keypoint categories
# kpname_to_id = {}
dset.dataset['keypoint_categories'] = []
for kpcat in catpats.keypoint_categories:
dset.dataset['keypoint_categories'].append(kpcat)
# kpname_to_id[kpcat['name']] = kpcat['id']
img = dset.imgs[gid]
gw, gh = img['width'], img['height']
dims = (gh, gw)
annots = dset.annots(gid=gid)
imdata, chan_to_auxinfo = render_background(img, rng, gray, bg_intensity, bg_scale)
imdata, chan_to_auxinfo = render_foreground(imdata, chan_to_auxinfo, dset,
annots, catpats, with_sseg,
with_kpts, dims, newstyle,
gray, rng)
if imdata is not None:
imdata = (imdata * 255).astype(np.uint8)
imdata = kwimage.atleast_3channels(imdata)
main_channels = 'gray' if gray else 'r|g|b'
img.update({
'imdata': imdata,
'channels': main_channels,
})
for auxinfo in img.get('auxiliary', []):
# Postprocess the auxiliary data so it looks interesting
# It would be really cool if we could do this based on what
# the simulated channel was.
chankey = auxinfo['channels']
auxdata = chan_to_auxinfo[chankey]['imdata']
mask = rng.rand(*auxdata.shape[0:2]) > 0.5
auxdata = kwimage.fourier_mask(auxdata, mask)
auxdata = (auxdata - auxdata.min())
auxdata = (auxdata / max(1e-8, auxdata.max()))
auxdata = auxdata.clip(0, 1)
_dtype = auxinfo.pop('dtype', 'uint8').lower()
if _dtype == 'uint8':
auxdata = (auxdata * int((2 ** 8) - 1)).astype(np.uint8)
elif _dtype == 'uint16':
auxdata = (auxdata * int((2 ** 16) - 1)).astype(np.uint16)
else:
raise KeyError(_dtype)
auxinfo['imdata'] = auxdata
return img
@profile
[docs]def render_foreground(imdata, chan_to_auxinfo, dset, annots, catpats,
with_sseg, with_kpts, dims, newstyle, gray, rng):
boxes = annots.boxes
FIX_OOB_RENDER_ATTEMPT = 1 # seems to work now
if FIX_OOB_RENDER_ATTEMPT:
tlbr_boxes = boxes.to_tlbr().quantize().data.astype(int)
else:
tlbr_boxes = boxes.to_tlbr().clip(0, 0, None, None).data.round(0).astype(int)
# Render coco-style annotation dictionaries
for ann, tlbr in zip(annots.objs, tlbr_boxes):
catname = dset._resolve_to_cat(ann['category_id'])['name']
tl_x, tl_y, br_x, br_y = tlbr
if FIX_OOB_RENDER_ATTEMPT:
# hack
if tl_x == br_x:
tl_x = tl_x - 1
br_x = br_x + 1
if tl_y == br_y:
tl_y = tl_y - 1
br_y = br_y + 1
chip_index = tuple([slice(tl_y, br_y), slice(tl_x, br_x)])
if imdata is None:
chip = None
else:
# try:
# Use pad-infinite-slices to fix a bug here
# Requries that subsequent setting of the data is also
# accounted for
if FIX_OOB_RENDER_ATTEMPT:
data_slice, padding = kwarray.embed_slice(chip_index, imdata.shape)
# TODO: could have a kwarray function to expose this inverse slice
# functionality. Also having a top-level call to apply an embedded
# slice would be good
chip = kwarray.padded_slice(imdata, chip_index)
inverse_slice = (
slice(padding[0][0], chip.shape[0] - padding[0][1]),
slice(padding[1][0], chip.shape[1] - padding[1][1]),
)
else:
chip = imdata[chip_index]
size = (br_x - tl_x, br_y - tl_y)
xy_offset = (tl_x, tl_y)
if chip is None or chip.size:
# todo: no need to make kpts / sseg if not requested
info = catpats.render_category(
catname, chip, xy_offset, dims, newstyle=newstyle,
size=size)
if imdata is not None:
fgdata = info['data']
if gray:
fgdata = fgdata.mean(axis=2, keepdims=True)
if FIX_OOB_RENDER_ATTEMPT:
# imdata[tl_y:br_y, tl_x:br_x, :] = fgdata[inverse_slice]
imdata[data_slice] = fgdata[inverse_slice]
else:
imdata[tl_y:br_y, tl_x:br_x, :] = fgdata
if with_sseg:
ann['segmentation'] = info['segmentation']
if with_kpts:
ann['keypoints'] = info['keypoints']
if chan_to_auxinfo is not None:
# chan_to_auxinfo.keys()
coco_sseg = ann.get('segmentation', None)
if coco_sseg:
seg = kwimage.Segmentation.coerce(coco_sseg)
seg = seg.to_multi_polygon()
for chankey, auxinfo in chan_to_auxinfo.items():
val = rng.uniform(0.2, 1.0)
# transform annotation into aux space if it is
# different
warp_aux_to_img = auxinfo.get('warp_aux_to_img', None)
if warp_aux_to_img is not None:
mat = kwimage.Affine.coerce(warp_aux_to_img).matrix
seg_ = seg.warp(mat)
auxinfo['imdata'] = seg_.fill(auxinfo['imdata'], value=val)
else:
auxinfo['imdata'] = seg.fill(auxinfo['imdata'], value=val)
return imdata, chan_to_auxinfo
[docs]def false_color(twochan):
"""
TODO: the function ensure_false_color will eventually be ported to kwimage
use that instead.
"""
if 0:
import sklearn
model = sklearn.decomposition.PCA(3)
X = twochan.reshape(-1, 2)
model.fit(X)
else:
import sklearn
ndim = twochan.ndim
dims = twochan.shape[0:2]
if ndim == 2:
in_channels = 1
else:
in_channels = twochan.shape[2]
if in_channels > 1:
model = sklearn.decomposition.PCA(1)
X = twochan.reshape(-1, in_channels)
X_ = model.fit_transform(X)
gray = X_.reshape(dims)
viz = kwimage.make_heatmask(gray, with_alpha=1)[:, :, 0:3]
else:
gray = twochan.reshape(dims)
viz = gray
return viz
@profile
[docs]def render_background(img, rng, gray, bg_intensity, bg_scale):
# This is 2x as fast for image_size=(300,300)
gw, gh = img['width'], img['height']
if img.get('file_name', None) is None:
imdata = None
else:
if gray:
gshape = (gh, gw, 1)
imdata = kwarray.standard_normal(gshape, mean=bg_intensity, std=bg_scale,
rng=rng, dtype=np.float32)
else:
gshape = (gh, gw, 3)
# imdata = kwarray.standard_normal(gshape, mean=bg_intensity, std=bg_scale,
# rng=rng, dtype=np.float32)
# hack because 3 channels is slower
imdata = kwarray.uniform(0, 1, gshape, rng=rng, dtype=np.float32)
np.clip(imdata, 0, 1, out=imdata)
chan_to_auxinfo = {}
for auxinfo in img.get('auxiliary', []):
chankey = auxinfo['channels']
# TODO:
# Need to incorporate ChannelSpec abstraction here
import kwcoco
aux_bands = kwcoco.ChannelSpec.coerce(chankey).numel()
# if chankey in ['mx|my', 'motion']:
# aux_bands = 2
# elif chankey == 'flowx|flowy':
# aux_bands = 2
# elif chankey == 'disparity':
# aux_bands = 1
# else:
# aux_bands = 1
# TODO: make non-aligned auxiliary information?
aux_width = auxinfo.get('width', gw)
aux_height = auxinfo.get('height', gh)
auxshape = (aux_height, aux_width, aux_bands)
# auxdata = np.zeros(auxshape, dtype=np.float32)
auxdata = kwarray.uniform(0, 0.01, auxshape, rng=rng, dtype=np.float32)
auxinfo['imdata'] = auxdata
chan_to_auxinfo[chankey] = auxinfo
return imdata, chan_to_auxinfo
@profile
[docs]def random_multi_object_path(num_objects, num_frames, rng=None, max_speed=0.01):
"""
Ignore:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> #
>>> import kwarray
>>> import kwplot
>>> plt = kwplot.autoplt()
>>> #
>>> num_objects = 5
>>> num_frames = 100
>>> rng = kwarray.ensure_rng(0)
>>> #
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> paths = random_multi_object_path(num_objects, num_frames, rng, max_speed=0.1)
>>> #
>>> from mpl_toolkits.mplot3d import Axes3D # NOQA
>>> ax = plt.gca(projection='3d')
>>> ax.cla()
>>> #
>>> for path in paths:
>>> time = np.arange(len(path))
>>> ax.plot(time, path.T[0] * 1, path.T[1] * 1, 'o-')
>>> ax.set_xlim(0, num_frames)
>>> ax.set_ylim(-.01, 1.01)
>>> ax.set_zlim(-.01, 1.01)
"""
import kwarray
rng = kwarray.ensure_rng(rng)
USE_BOIDS = 1
if USE_BOIDS:
from kwcoco.demo.boids import Boids
config = {
'perception_thresh': 0.2,
'max_speed': max_speed,
'max_force': 0.001,
'damping': 0.99,
}
boids = Boids(num_objects, rng=rng, **config).initialize()
paths = boids.paths(num_frames)
return paths
else:
import torch
from torch.nn import functional as F
max_speed = rng.rand(num_objects, 1) * 0.01
max_speed = np.concatenate([max_speed, max_speed], axis=1)
max_speed = torch.from_numpy(max_speed)
# TODO: can we do better?
torch.optim.SGD
class Positions(torch.nn.Module):
def __init__(model):
super().__init__()
_pos = torch.from_numpy(rng.rand(num_objects, 2))
model.pos = torch.nn.Parameter(_pos)
def forward(model, noise):
loss_parts = {}
# Push objects away from each other
utriu_dists = torch.nn.functional.pdist(model.pos)
respulsive = (1 / (utriu_dists ** 2)).sum() / num_objects
loss_parts['repulsive'] = 0.01 * respulsive.sum()
# Push objects in random directions
loss_parts['random'] = (model.pos * noise).sum()
# Push objects away from the boundary
margin = 0
x = model.pos
y = F.softplus((x - 0.5) + margin)
y = torch.max(F.softplus(-(x - 0.5) + margin), y)
loss_parts['boundary'] = y.sum() * 2
return sum(loss_parts.values())
model = Positions()
params = list(model.parameters())
optim = torch.optim.SGD(params, lr=1.00, momentum=0.9)
positions = []
for i in range(num_frames):
optim.zero_grad()
noise = torch.from_numpy(rng.rand(2))
loss = model.forward(noise)
loss.backward()
if max_speed is not None:
# Enforce a per-object speed limit
model.pos.grad.data[:] = torch.min(model.pos.grad, max_speed)
model.pos.grad.data[:] = torch.max(model.pos.grad, -max_speed)
optim.step()
# Enforce boundry conditions
model.pos.data.clamp_(0, 1)
positions.append(model.pos.data.numpy().copy())
paths = np.concatenate([p[:, None] for p in positions], axis=1)
return paths
# path = np.array(positions) % 1
[docs]def random_path(num, degree=1, dimension=2, rng=None, mode='boid'):
"""
Create a random path using a somem ethod curve.
Args:
num (int): number of points in the path
degree (int, default=1): degree of curvieness of the path
dimension (int, default=2): number of spatial dimensions
mode (str): can be boid, walk, or bezier
rng (RandomState, default=None): seed
References:
https://github.com/dhermes/bezier
Example:
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> num = 10
>>> dimension = 2
>>> degree = 3
>>> rng = None
>>> path = random_path(num, degree, dimension, rng, mode='boid')
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> plt = kwplot.autoplt()
>>> kwplot.multi_plot(xdata=path[:, 0], ydata=path[:, 1], fnum=1, doclf=1, xlim=(0, 1), ylim=(0, 1))
>>> kwplot.show_if_requested()
Example:
>>> # xdoctest: +REQUIRES(--3d)
>>> # xdoctest: +REQUIRES(module:bezier)
>>> import kwarray
>>> import kwplot
>>> plt = kwplot.autoplt()
>>> #
>>> num= num_frames = 100
>>> rng = kwarray.ensure_rng(0)
>>> #
>>> from kwcoco.demo.toydata_video import * # NOQA
>>> paths = []
>>> paths.append(random_path(num, degree=3, dimension=3, mode='bezier'))
>>> paths.append(random_path(num, degree=2, dimension=3, mode='bezier'))
>>> paths.append(random_path(num, degree=4, dimension=3, mode='bezier'))
>>> #
>>> from mpl_toolkits.mplot3d import Axes3D # NOQA
>>> ax = plt.gca(projection='3d')
>>> ax.cla()
>>> #
>>> for path in paths:
>>> time = np.arange(len(path))
>>> ax.plot(time, path.T[0] * 1, path.T[1] * 1, 'o-')
>>> ax.set_xlim(0, num_frames)
>>> ax.set_ylim(-.01, 1.01)
>>> ax.set_zlim(-.01, 1.01)
>>> ax.set_xlabel('x')
>>> ax.set_ylabel('y')
>>> ax.set_zlabel('z')
"""
rng = kwarray.ensure_rng(rng)
if mode == 'boid':
from kwcoco.demo.boids import Boids
boids = Boids(1, rng=rng).initialize()
path = boids.paths(num)[0]
elif mode == 'walk':
# TODO: can we do better?
import torch
torch.optim.SGD
class Position(torch.nn.Module):
def __init__(self):
super().__init__()
self.pos = torch.nn.Parameter(torch.from_numpy(rng.rand(2)))
def forward(self, noise):
return (self.pos * noise).sum()
pos = Position()
params = list(pos.parameters())
optim = torch.optim.SGD(params, lr=0.01, momentum=0.9)
positions = []
for i in range(num):
optim.zero_grad()
noise = torch.from_numpy(rng.rand(2))
loss = pos.forward(noise)
loss.backward()
optim.step()
positions.append(pos.pos.data.numpy().copy())
path = np.array(positions) % 1
elif mode == 'bezier':
import bezier
# Create random bezier control points
nodes_f = rng.rand(degree + 1, dimension).T # F-contiguous
curve = bezier.Curve(nodes_f, degree=degree)
if 0:
# TODO: https://stackoverflow.com/questions/18244305/how-to-redistribute-points-evenly-over-a-curve
t = int(np.log2(num) + 1)
def recsub(c, d):
if d <= 0:
yield c
else:
a, b = c.subdivide()
yield from recsub(a, d - 1)
yield from recsub(b, d - 1)
c = curve
subcurves = list(recsub(c, d=t))
path_f = np.array([c.evaluate(0.0)[:, 0] for c in subcurves][0:num]).T
else:
# Evaluate path points
s_vals = np.linspace(0, 1, num)
# s_vals = np.linspace(*sorted(rng.rand(2)), num)
path_f = curve.evaluate_multi(s_vals)
path = path_f.T # C-contiguous
else:
raise KeyError(mode)
return path