Source code for kwcoco.demo.toydata_video

"""
Generates "toydata" for demo and testing purposes.

This is the video version of the toydata generator and should be prefered to
the loose image version in toydata_image.

"""
from os.path import join
import numpy as np
import ubelt as ub
import kwarray
import kwimage
from kwcoco.demo.toypatterns import CategoryPatterns


try:
    from xdev import profile
except Exception:
[docs] profile = ub.identity
[docs]TOYDATA_VIDEO_VERSION = 21
@profile
[docs]def random_video_dset( num_videos=1, num_frames=2, num_tracks=2, anchors=None, image_size=(600, 600), verbose=3, render=False, aux=None, multispectral=False, multisensor=False, rng=None, dpath=None, max_speed=0.01, channels=None, **kwargs): """ Create a toy Coco Video Dataset Args: num_videos (int) : number of videos num_frames (int) : number of images per video num_tracks (int) : number of tracks per video image_size (Tuple[int, int]): The width and height of the generated images render (bool | dict): if truthy the toy annotations are synthetically rendered. See :func:`render_toy_image` for details. rng (int | None | RandomState): random seed / state dpath (str): only used if render is truthy, place to write rendered images. verbose (int, default=3): verbosity mode aux (bool): if True generates dummy auxiliary channels multispectral (bool): similar to aux, but does not have the concept of a "main" image. max_speed (float): max speed of movers channels (str): experimental new way to get MSI with specific band distributions. **kwargs : used for old backwards compatible argument names gsize - alias for image_size SeeAlso: random_single_video_dset Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> dset = random_video_dset(render=True, num_videos=3, num_frames=2, >>> num_tracks=5, image_size=(128, 128)) >>> # xdoctest: +REQUIRES(--show) >>> dset.show_image(1, doclf=True) >>> dset.show_image(2, doclf=True) >>> from kwcoco.demo.toydata_video import * # NOQA dset = random_video_dset(render=False, num_videos=3, num_frames=2, num_tracks=10) dset._tree() dset.imgs[1] dset = random_single_video_dset() dset._tree() dset.imgs[1] from kwcoco.demo.toydata_video import * # NOQA dset = random_video_dset(render=True, num_videos=3, num_frames=2, num_tracks=10) print(dset.imgs[1]) print('dset.bundle_dpath = {!r}'.format(dset.bundle_dpath)) dset._tree() import xdev globals().update(xdev.get_func_kwargs(random_video_dset)) num_videos = 2 """ if 'gsize' in kwargs: # nocover if 0: # TODO: enable this warning import warnings warnings.warn('gsize is deprecated. Use image_size param instead', DeprecationWarning) image_size = kwargs.pop('gsize') if len(kwargs) != 0: raise ValueError('unknown kwargs={}'.format(kwargs)) rng = kwarray.ensure_rng(rng) subsets = [] tid_start = 1 gid_start = 1 for vidid in range(1, num_videos + 1): if verbose > 2: print('generate vidid = {!r}'.format(vidid)) dset = random_single_video_dset( image_size=image_size, num_frames=num_frames, num_tracks=num_tracks, tid_start=tid_start, anchors=anchors, gid_start=gid_start, video_id=vidid, render=False, autobuild=False, aux=aux, multispectral=multispectral, multisensor=multisensor, max_speed=max_speed, channels=channels, rng=rng, verbose=verbose) try: gid_start = dset.dataset['images'][-1]['id'] + 1 tid_start = dset.dataset['annotations'][-1]['track_id'] + 1 except IndexError: pass subsets.append(dset) if num_videos == 0: raise AssertionError if num_videos == 1: dset = subsets[0] else: import kwcoco assert len(subsets) > 1, '{}'.format(len(subsets)) dset = kwcoco.CocoDataset.union(*subsets) # The dataset has been prepared, now we just render it and we have # a nice video dataset. renderkw = { 'dpath': None, } if isinstance(render, dict): renderkw.update(render) else: if not render: renderkw = None if renderkw: if dpath is None: dpath = ub.ensure_app_cache_dir('kwcoco', 'demo_vidshapes') if verbose > 2: print('rendering') render_toy_dataset(dset, rng=rng, dpath=dpath, renderkw=renderkw, verbose=verbose) dset.fpath = join(dpath, 'data.kwcoco.json') dset._build_index() return dset
@profile
[docs]def random_single_video_dset(image_size=(600, 600), num_frames=5, num_tracks=3, tid_start=1, gid_start=1, video_id=1, anchors=None, rng=None, render=False, dpath=None, autobuild=True, verbose=3, aux=None, multispectral=False, max_speed=0.01, channels=None, multisensor=False, **kwargs): """ Create the video scene layout of object positions. Note: Does not render the data unless specified. Args: image_size (Tuple[int, int]): size of the images num_frames (int): number of frames in this video num_tracks (int): number of tracks in this video tid_start (int, default=1): track-id start index gid_start (int, default=1): image-id start index video_id (int, default=1): video-id of this video anchors (ndarray | None): base anchor sizes of the object boxes we will generate. rng (RandomState): random state / seed render (bool | dict): if truthy, does the rendering according to provided params in the case of dict input. autobuild (bool, default=True): prebuild coco lookup indexes verbose (int): verbosity level aux (bool | List[str]): if specified generates auxiliary channels multispectral (bool): if specified simulates multispectral imagry This is similar to aux, but has no "main" file. max_speed (float): max speed of movers channels (str | None | ChannelSpec): if specified generates multispectral images with dummy channels multisensor (bool): if True, generates demodata from "multiple sensors", in other words, observations may have different "bands". **kwargs : used for old backwards compatible argument names gsize - alias for image_size TODO: - [ ] Need maximum allowed object overlap measure - [ ] Need better parameterized path generation Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> anchors = np.array([ [0.3, 0.3], [0.1, 0.1]]) >>> dset = random_single_video_dset(render=True, num_frames=10, num_tracks=10, anchors=anchors, max_speed=0.2) >>> # xdoctest: +REQUIRES(--show) >>> # Show the tracks in a single image >>> import kwplot >>> kwplot.autompl() >>> annots = dset.annots() >>> tids = annots.lookup('track_id') >>> tid_to_aids = ub.group_items(annots.aids, tids) >>> paths = [] >>> track_boxes = [] >>> for tid, aids in tid_to_aids.items(): >>> boxes = dset.annots(aids).boxes.to_cxywh() >>> path = boxes.data[:, 0:2] >>> paths.append(path) >>> track_boxes.append(boxes) >>> import kwplot >>> plt = kwplot.autoplt() >>> ax = plt.gca() >>> ax.cla() >>> # >>> import kwimage >>> colors = kwimage.Color.distinct(len(track_boxes)) >>> for i, boxes in enumerate(track_boxes): >>> color = colors[i] >>> path = boxes.data[:, 0:2] >>> boxes.draw(color=color, centers={'radius': 0.01}, alpha=0.5) >>> ax.plot(path.T[0], path.T[1], 'x-', color=color) Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> anchors = np.array([ [0.2, 0.2], [0.1, 0.1]]) >>> gsize = np.array([(600, 600)]) >>> print(anchors * gsize) >>> dset = random_single_video_dset(render=True, num_frames=10, >>> anchors=anchors, num_tracks=10, >>> image_size='random') >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> plt = kwplot.autoplt() >>> plt.clf() >>> gids = list(dset.imgs.keys()) >>> pnums = kwplot.PlotNums(nSubplots=len(gids)) >>> for gid in gids: >>> dset.show_image(gid, pnum=pnums(), fnum=1, title=False) >>> pnums = kwplot.PlotNums(nSubplots=len(gids)) Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> dset = random_single_video_dset(num_frames=10, num_tracks=10, aux=True) >>> assert 'auxiliary' in dset.imgs[1] >>> assert dset.imgs[1]['auxiliary'][0]['channels'] >>> assert dset.imgs[1]['auxiliary'][1]['channels'] Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> multispectral = True >>> dset = random_single_video_dset(num_frames=1, num_tracks=1, multispectral=True) >>> dset._check_json_serializable() >>> dset.dataset['images'] >>> assert dset.imgs[1]['auxiliary'][1]['channels'] >>> # test that we can render >>> render_toy_dataset(dset, rng=0, dpath=None, renderkw={}) Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> dset = random_single_video_dset(num_frames=4, num_tracks=1, multispectral=True, multisensor=True, image_size='random', rng=2338) >>> dset._check_json_serializable() >>> assert dset.imgs[1]['auxiliary'][1]['channels'] >>> # Print before and after render >>> #print('multisensor-images = {}'.format(ub.repr2(dset.dataset['images'], nl=-2))) >>> #print('multisensor-images = {}'.format(ub.repr2(dset.dataset, nl=-2))) >>> print(ub.hash_data(dset.dataset)) >>> # test that we can render >>> render_toy_dataset(dset, rng=0, dpath=None, renderkw={}) >>> #print('multisensor-images = {}'.format(ub.repr2(dset.dataset['images'], nl=-2))) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> from kwcoco.demo.toydata_video import _draw_video_sequence # NOQA >>> gids = [1, 2, 3, 4] >>> final = _draw_video_sequence(dset, gids) >>> print('dset.fpath = {!r}'.format(dset.fpath)) >>> kwplot.imshow(final) Ignore: import xdev globals().update(xdev.get_func_kwargs(random_single_video_dset)) dset = random_single_video_dset(render=True, num_frames=10, anchors=anchors, num_tracks=10) dset._tree() """ import pandas as pd import kwcoco from kwarray import distributions rng = kwarray.ensure_rng(rng) if 'gsize' in kwargs: # nocover if 0: # TODO: enable this warning import warnings warnings.warn('gsize is deprecated. Use image_size param instead', DeprecationWarning) image_size = kwargs.pop('gsize') assert len(kwargs) == 0, 'unknown kwargs={}'.format(**kwargs) image_ids = list(range(gid_start, num_frames + gid_start)) track_ids = list(range(tid_start, num_tracks + tid_start)) if isinstance(image_size, str): if image_size == 'random': image_size = ( distributions.Uniform(200, 800, rng=rng), distributions.Uniform(200, 800, rng=rng), ) coercable_width = image_size[0] coercable_height = image_size[1] if isinstance(coercable_width, distributions.Distribution): video_width = int(coercable_width.sample()) video_height = int(coercable_height.sample()) image_width_distri = coercable_width image_height_distri = coercable_height else: video_width = coercable_width video_height = coercable_height image_width_distri = distributions.Constant(coercable_width) image_height_distri = distributions.Constant(coercable_height) video_dsize = (video_width, video_height) video_window = kwimage.Boxes([[0, 0, video_width, video_height]], 'xywh') dset = kwcoco.CocoDataset(autobuild=False) dset.add_video( name='toy_video_{}'.format(video_id), width=video_width, height=video_height, id=video_id, ) if bool(multispectral) + bool(aux) + bool(channels) > 1: raise ValueError('can only have one of multispectral, aux, or channels') # backwards compat no_main_image = False if channels is None: if aux is True: channels = 'disparity,flowx|flowy' if multispectral: channels = 'B1,B8,B8a,B10,B11' no_main_image = True else: no_main_image = True special_fusedbands_to_scale = { 'disparity': 1, 'flowx|flowy': 1, 'B1': 1, 'B8': 1 / 6, 'B8a': 1 / 3, 'B10': 1, 'B11': 1 / 3, } sensor_to_channels = {} if channels is not None: channels = kwcoco.ChannelSpec.coerce(channels) sensor_to_channels['sensor1'] = channels if multisensor: assert channels is not None # todo: give users a way to specify (1) how many sensors, and (2) # what the channels for each sensor should be. sensor_to_channels['sensor2'] = kwcoco.ChannelSpec.coerce('r|g|b,disparity,gauss,B8|B11') sensor_to_channels['sensor3'] = kwcoco.ChannelSpec.coerce('r|g|b,flowx|flowy,distri,B10|B11') sensor_to_channels['sensor4'] = kwcoco.ChannelSpec.coerce('B11,X.2,Y:2:6') sensors = sorted(sensor_to_channels.keys()) for frame_idx, gid in enumerate(image_ids): if verbose > 2: print('generate gid = {!r}'.format(gid)) image_height = int(image_height_distri.sample()) image_width = int(image_width_distri.sample()) warp_vid_from_img = kwimage.Affine.coerce( scale=(video_width / image_width, video_height / image_height) ) warp_img_from_vid = warp_vid_from_img.inv() img = { 'id': gid, 'file_name': '<todo-generate-{}-{}>'.format(video_id, frame_idx), 'width': image_width, 'height': image_height, 'warp_img_to_vid': warp_vid_from_img.concise(), 'frame_index': frame_idx, 'video_id': video_id, } if no_main_image: # TODO: can we do better here? img['name'] = 'generated-{}-{}'.format(video_id, frame_idx) img['file_name'] = None if sensors: frame_sensor_idx = rng.randint(0, len(sensors)) frame_sensor = sensors[frame_sensor_idx] frame_channels = sensor_to_channels[frame_sensor] img['auxiliary'] = [] for stream in frame_channels.streams(): scale = special_fusedbands_to_scale.get(stream.spec, None) if scale is not None: warp_img_from_aux = kwimage.Affine.scale(scale=1 / scale) else: # about = (image_width / 2, image_height / 2) # params = kwimage.Affine.random_params(rng=rng) # params['about'] = about # warp_img_from_aux = kwimage.Affine.coerce(params) warp_img_from_aux = kwimage.Affine.random(rng=rng) warp_aux_from_img = warp_img_from_aux.inv() warp_aux_from_vid = warp_aux_from_img @ warp_img_from_vid aux_window = video_window.warp(warp_aux_from_vid).quantize() aux_width = aux_window.width.item() aux_height = aux_window.height.item() auxitem = { 'file_name': '<todo-generate-{}-{}>'.format(video_id, frame_idx), 'channels': stream.spec, 'width': aux_width, 'height': aux_height, 'warp_aux_to_img': warp_img_from_aux.concise(), } # hack for dtype if stream.spec.startswith('B'): auxitem['dtype'] = 'uint16' else: auxitem['dtype'] = 'uint8' img['auxiliary'].append(auxitem) dset.add_image(**img) classes = ['star', 'superstar', 'eff'] for catname in classes: dset.ensure_category(name=catname) catpats = CategoryPatterns.coerce(classes, rng=rng) if True: # TODO: add ensure keypoint category to dset # Add newstyle keypoint categories kpname_to_id = {} dset.dataset['keypoint_categories'] = [] for kpcat in catpats.keypoint_categories: dset.dataset['keypoint_categories'].append(kpcat) kpname_to_id[kpcat['name']] = kpcat['id'] # Generate paths in a way that they are dependant on each other paths = random_multi_object_path( num_frames=num_frames, num_objects=num_tracks, rng=rng, max_speed=max_speed) def warp_within_bounds(self, x_min, y_min, x_max, y_max): """ Translate / scale the boxes to fit in the bounds FIXME: do something reasonable Example: >>> from kwimage.structs.boxes import * # NOQA >>> self = Boxes.random(10).scale(1).translate(-10) >>> x_min, y_min, x_max, y_max = 10, 10, 20, 20 >>> x_min, y_min, x_max, y_max = 0, 0, 20, 20 >>> print('self = {!r}'.format(self)) >>> scaled = warp_within_bounds(self, x_min, y_min, x_max, y_max) >>> print('scaled = {!r}'.format(scaled)) """ tlbr = self.to_tlbr() tl_x, tl_y, br_x, br_y = tlbr.components tl_xy_min = np.c_[tl_x, tl_y].min(axis=0) br_xy_max = np.c_[br_x, br_y].max(axis=0) tl_xy_lb = np.array([x_min, y_min]) br_xy_ub = np.array([x_max, y_max]) size_ub = br_xy_ub - tl_xy_lb size_max = br_xy_max - tl_xy_min tl_xy_over = np.minimum(tl_xy_lb - tl_xy_min, 0) # tl_xy_over = -tl_xy_min # Now at the minimum coord tmp = tlbr.translate(tl_xy_over) _tl_x, _tl_y, _br_x, _br_y = tmp.components tmp_tl_xy_min = np.c_[_tl_x, _tl_y].min(axis=0) # tmp_br_xy_max = np.c_[_br_x, _br_y].max(axis=0) tmp.translate(-tmp_tl_xy_min) sf = np.minimum(size_ub / size_max, 1) out = tmp.scale(sf).translate(tmp_tl_xy_min) return out if verbose > 2: print('generate tracks') for tid, path in zip(track_ids, paths): if anchors is None: anchors_ = anchors else: anchors_ = np.array([anchors[rng.randint(0, len(anchors))]]) # Box scale video_boxes = kwimage.Boxes.random( num=num_frames, scale=1.0, format='cxywh', rng=rng, anchors=anchors_) # Smooth out varying box sizes alpha = rng.rand() * 0.1 wh = pd.DataFrame(video_boxes.data[:, 2:4], columns=['w', 'h']) ar = wh['w'] / wh['h'] min_ar = 0.25 max_ar = 1 / min_ar wh['w'][ar < min_ar] = wh['h'] * 0.25 wh['h'][ar > max_ar] = wh['w'] * 0.25 box_dims = wh.ewm(alpha=alpha, adjust=False).mean() video_boxes.data[:, 0:2] = path video_boxes.data[:, 2:4] = box_dims.values video_boxes = video_boxes.scale(video_dsize, about='origin') video_boxes = video_boxes.scale(0.9, about='center') # oob_pad = -20 # allow some out of bounds # oob_pad = 20 # allow some out of bounds # video_boxes = video_boxes.to_tlbr() # TODO: need better path distributions # video_boxes = warp_within_bounds(video_boxes, 0 - oob_pad, 0 - oob_pad, image_size[0] + oob_pad, image_size[1] + oob_pad) video_boxes = video_boxes.to_xywh() video_boxes.data = video_boxes.data.round(1) cidx = rng.randint(0, len(classes)) video_dets = kwimage.Detections( boxes=video_boxes, class_idxs=np.array([cidx] * len(video_boxes)), classes=classes, ) WITH_KPTS_SSEG = True if WITH_KPTS_SSEG: kpts = [] ssegs = [] ddims = video_boxes.data[:, 2:4].astype(int)[:, ::-1] offsets = video_boxes.data[:, 0:2].astype(int) cnames = [classes[cidx] for cidx in video_dets.class_idxs] for dims, xy_offset, cname in zip(ddims, offsets, cnames): info = catpats._todo_refactor_geometric_info(cname, xy_offset, dims) kpts.append(info['kpts']) if False and rng.rand() > 0.5: # sseg dropout ssegs.append(None) else: ssegs.append(info['segmentation']) video_dets.data['keypoints'] = kwimage.PointsList(kpts) video_dets.data['segmentations'] = kwimage.SegmentationList(ssegs) start_frame = 0 for frame_index, video_det in enumerate(video_dets, start=start_frame): frame_img = dset.dataset['images'][frame_index] warp_vid_from_img = kwimage.Affine.coerce(frame_img['warp_img_to_vid']) warp_img_from_vid = warp_vid_from_img.inv() image_det = video_det.warp(warp_img_from_vid) image_ann = list(image_det.to_coco(dset=dset, style='new'))[0] image_ann['track_id'] = tid image_ann['image_id'] = frame_img['id'] dset.add_annotation(**image_ann) HACK_FIX_COCO_KPTS_FORMAT = True if HACK_FIX_COCO_KPTS_FORMAT: # Hack to fix coco formatting from Detections.to_coco kpt_cats = dset.keypoint_categories() for ann in dset.dataset['annotations']: for kpt in ann.get('keypoints', []): if 'keypoint_category_id' not in kpt: kp_catname = kpt.pop('keypoint_category') kpt['keypoint_category_id'] = kpt_cats.node_to_id[kp_catname] # The dataset has been prepared, now we just render it and we have # a nice video dataset. renderkw = { 'dpath': dpath, } if isinstance(render, dict): renderkw.update(render) else: if not render: renderkw = None if renderkw is not None: if verbose > 2: print('rendering') render_toy_dataset(dset, rng=rng, dpath=dpath, renderkw=renderkw, verbose=verbose) if autobuild: dset._build_index() return dset
[docs]def _draw_video_sequence(dset, gids): """ Helper to draw a multi-sensor sequence Ignore: from kwcoco.demo.toydata_video import _draw_video_sequence # NOQA gids = [1, 2, 3] final = _draw_video_sequence(dset, gids) import kwplot kwplot.autompl() kwplot.imshow(final) """ horizontal_stack = [] max_width = 256 images = dset.images(gids) for coco_img in images.coco_images: chan_names = coco_img.channels.fuse() chan_hwc = coco_img.delay(space='video').finalize() chan_chw = chan_hwc.transpose(2, 0, 1) cells = [] for raw_data, chan_name in zip(chan_chw, chan_names): # norm_data = kwimage.normalize_intensity(raw_data.astype(np.float32)).clip(0, 1) norm_data = kwimage.normalize(raw_data.astype(np.float32)).clip(0, 1) cells.append({ 'norm_data': norm_data, 'raw_data': raw_data, 'text': chan_name, }) vertical_stack = [] header_dims = {'width': max_width} header_part = kwimage.draw_header_text( image=header_dims, fit=False, text='t={frame_index} gid={id}'.format(**coco_img.img), color='salmon') vertical_stack.append(header_part) for cell in cells: norm_data = cell['norm_data'] cell_canvas = kwimage.imresize(norm_data, dsize=(max_width, None)) cell_canvas = cell_canvas.clip(0, 1) cell_canvas = kwimage.atleast_3channels(cell_canvas) cell_canvas = kwimage.draw_text_on_image( cell_canvas, cell['text'], (1, 1), valign='top', color='white', border=3) vertical_stack.append(cell_canvas) vertical_stack = [kwimage.ensure_uint255(d) for d in vertical_stack] column_img = kwimage.stack_images(vertical_stack, axis=0) horizontal_stack.append(column_img) final = kwimage.stack_images(horizontal_stack, axis=1) return final
@profile
[docs]def render_toy_dataset(dset, rng, dpath=None, renderkw=None, verbose=0): """ Create toydata_video renderings for a preconstructed coco dataset. Args: dset (CocoDataset): A dataset that contains special "renderable" annotations. (e.g. the demo shapes). Each image can contain special fields that influence how an image will be rendered. Currently this process is simple, it just creates a noisy image with the shapes superimposed over where they should exist as indicated by the annotations. In the future this may become more sophisticated. Each item in `dset.dataset['images']` will be modified to add the "file_name" field indicating where the rendered data is writen. rng (int | None | RandomState): random state dpath (str): The location to write the images to. If unspecified, it is written to the rendered folder inside the kwcoco cache directory. renderkw (dict): See :func:`render_toy_image` for details. Also takes imwrite keywords args only handled in this function. TODO better docs. Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> import kwarray >>> rng = None >>> rng = kwarray.ensure_rng(rng) >>> num_tracks = 3 >>> dset = random_video_dset(rng=rng, num_videos=3, num_frames=5, >>> num_tracks=num_tracks, image_size=(128, 128)) >>> dset = render_toy_dataset(dset, rng) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> plt = kwplot.autoplt() >>> plt.clf() >>> gids = list(dset.imgs.keys()) >>> pnums = kwplot.PlotNums(nSubplots=len(gids), nRows=num_tracks) >>> for gid in gids: >>> dset.show_image(gid, pnum=pnums(), fnum=1, title=False) >>> pnums = kwplot.PlotNums(nSubplots=len(gids)) """ import kwcoco rng = kwarray.ensure_rng(rng) dset._build_index() if 0: dset._ensure_json_serializable() hashid = dset._build_hashid()[0:24] if dpath is None: dset_name = 'rendered_{}'.format(hashid) bundle_dpath = ub.ensure_app_cache_dir('kwcoco', 'rendered', dset_name) dset.fpath = join(bundle_dpath, 'data.kwcoco.json') bundle_dpath = dset.bundle_dpath else: bundle_dpath = dpath img_dpath = ub.ensuredir((bundle_dpath, '_assets/images')) imwrite_kw = {} imwrite_ops = {'compress', 'blocksize', 'interleave', 'options'} if renderkw is None: renderkw = {} imwrite_kw = ub.dict_isect(renderkw, imwrite_ops) if imwrite_kw: # imwrite_kw['backend'] = 'gdal' # imwrite_kw['space'] = None # imwrite kw requries gdal from osgeo import gdal # NOQA for gid in ub.ProgIter(dset.imgs.keys(), desc='render gid', verbose=verbose > 2): # Render data inside the image img = render_toy_image(dset, gid, rng=rng, renderkw=renderkw) # Extract the data from memory and write it to disk fname = 'img_{:05d}.png'.format(gid) imdata = img.pop('imdata', None) if imdata is not None: img_fpath = join(img_dpath, fname) img.update({ 'file_name': img_fpath, 'channels': 'r|g|b', }) kwimage.imwrite(img_fpath, imdata) auxiliaries = img.pop('auxiliary', None) if auxiliaries is not None: for auxdict in auxiliaries: chan_part = kwcoco.ChannelSpec.coerce(auxdict['channels']).as_path() aux_dpath = ub.ensuredir( (bundle_dpath, '_assets', 'aux', 'aux_' + chan_part)) aux_fpath = ub.augpath(join(aux_dpath, fname), ext='.tif') ub.ensuredir(aux_dpath) auxdict['file_name'] = aux_fpath auxdata = auxdict.pop('imdata', None) try: from osgeo import gdal # NOQA kwimage.imwrite( aux_fpath, auxdata, backend='gdal', space=None, **imwrite_kw) except Exception: kwimage.imwrite( aux_fpath, auxdata, space=None, **imwrite_kw) img['auxiliary'] = auxiliaries dset._build_index() return dset
@profile
[docs]def render_toy_image(dset, gid, rng=None, renderkw=None): """ Modifies dataset inplace, rendering synthetic annotations. This does not write to disk. Instead this writes to placeholder values in the image dictionary. Args: dset (CocoDataset): coco dataset with renderable anotations / images gid (int): image to render rng (int | None | RandomState): random state renderkw (dict): rendering config gray (boo): gray or color images fg_scale (float): foreground noisyness (gauss std) bg_scale (float): background noisyness (gauss std) fg_intensity (float): foreground brightness (gauss mean) bg_intensity (float): background brightness (gauss mean) newstyle (bool): use new kwcoco datastructure formats with_kpts (bool): include keypoint info with_sseg (bool): include segmentation info Returns: Dict: the inplace-modified image dictionary Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> image_size=(600, 600) >>> num_frames=5 >>> verbose=3 >>> rng = None >>> import kwarray >>> rng = kwarray.ensure_rng(rng) >>> aux = 'mx' >>> dset = random_single_video_dset( >>> image_size=image_size, num_frames=num_frames, verbose=verbose, aux=aux, rng=rng) >>> print('dset.dataset = {}'.format(ub.repr2(dset.dataset, nl=2))) >>> gid = 1 >>> renderkw = {} >>> render_toy_image(dset, gid, rng, renderkw=renderkw) >>> img = dset.imgs[gid] >>> canvas = img['imdata'] >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(canvas, doclf=True, pnum=(1, 2, 1)) >>> dets = dset.annots(gid=gid).detections >>> dets.draw() >>> auxdata = img['auxiliary'][0]['imdata'] >>> aux_canvas = false_color(auxdata) >>> kwplot.imshow(aux_canvas, pnum=(1, 2, 2)) >>> _ = dets.draw() >>> # xdoctest: +REQUIRES(--show) >>> img, anns = demodata_toy_img(image_size=(172, 172), rng=None, aux=True) >>> print('anns = {}'.format(ub.repr2(anns, nl=1))) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(img['imdata'], pnum=(1, 2, 1), fnum=1) >>> auxdata = img['auxiliary'][0]['imdata'] >>> kwplot.imshow(auxdata, pnum=(1, 2, 2), fnum=1) >>> kwplot.show_if_requested() Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> multispectral = True >>> dset = random_single_video_dset(num_frames=1, num_tracks=1, multispectral=True) >>> gid = 1 >>> dset.imgs[gid] >>> rng = kwarray.ensure_rng(0) >>> renderkw = {'with_sseg': True} >>> img = render_toy_image(dset, gid, rng=rng, renderkw=renderkw) """ rng = kwarray.ensure_rng(rng) if renderkw is None: renderkw = {} gray = renderkw.get('gray', 1) fg_scale = renderkw.get('fg_scale', 0.5) bg_scale = renderkw.get('bg_scale', 0.8) bg_intensity = renderkw.get('bg_intensity', 0.1) fg_intensity = renderkw.get('fg_intensity', 0.9) newstyle = renderkw.get('newstyle', True) with_kpts = renderkw.get('with_kpts', False) with_sseg = renderkw.get('with_sseg', False) categories = list(dset.name_to_cat.keys()) catpats = CategoryPatterns.coerce(categories, fg_scale=fg_scale, fg_intensity=fg_intensity, rng=rng) if with_kpts and newstyle: # TODO: add ensure keypoint category to dset # Add newstyle keypoint categories # kpname_to_id = {} dset.dataset['keypoint_categories'] = [] for kpcat in catpats.keypoint_categories: dset.dataset['keypoint_categories'].append(kpcat) # kpname_to_id[kpcat['name']] = kpcat['id'] img = dset.imgs[gid] gw, gh = img['width'], img['height'] dims = (gh, gw) annots = dset.annots(gid=gid) imdata, chan_to_auxinfo = render_background(img, rng, gray, bg_intensity, bg_scale) imdata, chan_to_auxinfo = render_foreground(imdata, chan_to_auxinfo, dset, annots, catpats, with_sseg, with_kpts, dims, newstyle, gray, rng) if imdata is not None: imdata = (imdata * 255).astype(np.uint8) imdata = kwimage.atleast_3channels(imdata) main_channels = 'gray' if gray else 'r|g|b' img.update({ 'imdata': imdata, 'channels': main_channels, }) for auxinfo in img.get('auxiliary', []): # Postprocess the auxiliary data so it looks interesting # It would be really cool if we could do this based on what # the simulated channel was. import kwcoco chankey = auxinfo['channels'] auxdata = chan_to_auxinfo[chankey]['imdata'] auxchan = kwcoco.FusedChannelSpec.coerce(chankey) for chan_idx, chan_name in enumerate(auxchan.as_list()): if chan_name == 'flowx': auxdata[..., chan_idx] = np.gradient(auxdata[..., chan_idx], axis=0) elif chan_name == 'flowy': auxdata[..., chan_idx] = np.gradient(auxdata[..., chan_idx], axis=1) elif chan_name.startswith('B') or 1: mask = rng.rand(*auxdata[..., chan_idx].shape[0:2]) > 0.5 auxdata[..., chan_idx] = kwimage.fourier_mask(auxdata[..., chan_idx], mask)[..., 0] auxdata = kwarray.normalize(auxdata) auxdata = auxdata.clip(0, 1) _dtype = auxinfo.pop('dtype', 'uint8').lower() if _dtype == 'uint8': auxdata = (auxdata * int((2 ** 8) - 1)).astype(np.uint8) elif _dtype == 'uint16': auxdata = (auxdata * int((2 ** 16) - 1)).astype(np.uint16) else: raise KeyError(_dtype) auxinfo['imdata'] = auxdata return img
@profile
[docs]def render_foreground(imdata, chan_to_auxinfo, dset, annots, catpats, with_sseg, with_kpts, dims, newstyle, gray, rng): """ Renders demo annoations on top of a demo background """ boxes = annots.boxes tlbr_boxes = boxes.to_tlbr().quantize().data.astype(int) # Render coco-style annotation dictionaries for ann, tlbr in zip(annots.objs, tlbr_boxes): catname = dset._resolve_to_cat(ann['category_id'])['name'] tl_x, tl_y, br_x, br_y = tlbr # hack if tl_x == br_x: tl_x = tl_x - 1 br_x = br_x + 1 if tl_y == br_y: tl_y = tl_y - 1 br_y = br_y + 1 chip_index = tuple([slice(tl_y, br_y), slice(tl_x, br_x)]) if imdata is None: chip = None else: data_slice, padding = kwarray.embed_slice(chip_index, imdata.shape) # TODO: could have a kwarray function to expose this inverse slice # functionality. Also having a top-level call to apply an embedded # slice would be good chip = kwarray.padded_slice(imdata, chip_index) inverse_slice = ( slice(padding[0][0], chip.shape[0] - padding[0][1]), slice(padding[1][0], chip.shape[1] - padding[1][1]), ) size = (br_x - tl_x, br_y - tl_y) xy_offset = (tl_x, tl_y) if chip is None or chip.size: # todo: no need to make kpts / sseg if not requested info = catpats.render_category( catname, chip, xy_offset, dims, newstyle=newstyle, size=size) if imdata is not None: fgdata = info['data'] if gray: fgdata = fgdata.mean(axis=2, keepdims=True) imdata[data_slice] = fgdata[inverse_slice] if with_sseg: ann['segmentation'] = info['segmentation'] if with_kpts: ann['keypoints'] = info['keypoints'] if chan_to_auxinfo is not None: # chan_to_auxinfo.keys() coco_sseg = ann.get('segmentation', None) if coco_sseg: seg = kwimage.Segmentation.coerce(coco_sseg) seg = seg.to_multi_polygon() for chankey, auxinfo in chan_to_auxinfo.items(): val = rng.uniform(0.2, 1.0) # transform annotation into aux space warp_aux_to_img = auxinfo.get('warp_aux_to_img', None) if warp_aux_to_img is not None: warp_aux_from_img = kwimage.Affine.coerce(warp_aux_to_img).inv().matrix seg_ = seg.warp(warp_aux_from_img) else: seg_ = seg c = kwimage.num_channels(auxinfo['imdata']) if c < 4: # hack work around bug in kwimage, where only first # channel was filled val = (val,) * c auxinfo['imdata'] = seg_.fill(auxinfo['imdata'], value=val) return imdata, chan_to_auxinfo
@profile
[docs]def render_background(img, rng, gray, bg_intensity, bg_scale): # This is 2x as fast for image_size=(300,300) gw, gh = img['width'], img['height'] if img.get('file_name', None) is None: imdata = None else: if gray: gshape = (gh, gw, 1) imdata = kwarray.standard_normal(gshape, mean=bg_intensity, std=bg_scale, rng=rng, dtype=np.float32) else: gshape = (gh, gw, 3) # imdata = kwarray.standard_normal(gshape, mean=bg_intensity, std=bg_scale, # rng=rng, dtype=np.float32) # hack because 3 channels is slower imdata = kwarray.uniform(0, 1, gshape, rng=rng, dtype=np.float32) np.clip(imdata, 0, 1, out=imdata) chan_to_auxinfo = {} for auxinfo in img.get('auxiliary', []): import kwcoco chankey = auxinfo['channels'] aux_bands = kwcoco.ChannelSpec.coerce(chankey).numel() aux_width = auxinfo.get('width', gw) aux_height = auxinfo.get('height', gh) auxshape = (aux_height, aux_width, aux_bands) if chankey == 'gauss': auxdata = np.stack([ kwimage.gaussian_patch(auxshape[0:2]) for b in range(aux_bands)], axis=2) auxdata = kwarray.normalize(auxdata) * rng.rand() elif chankey == 'distri': # Random distribution currently broken, fix later # random_distri = kwarray.distributions.Distribution.random(rng=rng) # auxdata = random_distri.sample(*auxshape) scale = rng.randint(0, 100) * rng.rand() auxdata = rng.exponential(scale, size=auxshape) auxdata = kwarray.normalize(auxdata) else: auxdata = kwarray.uniform(0, 0.4, auxshape, rng=rng, dtype=np.float32) # if True or True or True: # auxdata[:] = 0 # auxdata = (auxdata - auxdata.min()) # auxdata = (auxdata / max(1e-8, auxdata.min())) auxinfo['imdata'] = auxdata chan_to_auxinfo[chankey] = auxinfo return imdata, chan_to_auxinfo
[docs]def false_color(twochan): """ TODO: the function ensure_false_color will eventually be ported to kwimage use that instead. """ if 0: import sklearn model = sklearn.decomposition.PCA(3) X = twochan.reshape(-1, 2) model.fit(X) else: import sklearn ndim = twochan.ndim dims = twochan.shape[0:2] if ndim == 2: in_channels = 1 else: in_channels = twochan.shape[2] if in_channels > 1: model = sklearn.decomposition.PCA(1) X = twochan.reshape(-1, in_channels) X_ = model.fit_transform(X) gray = X_.reshape(dims) viz = kwimage.make_heatmask(gray, with_alpha=1)[:, :, 0:3] else: gray = twochan.reshape(dims) viz = gray return viz
@profile
[docs]def random_multi_object_path(num_objects, num_frames, rng=None, max_speed=0.01): """ Ignore: >>> from kwcoco.demo.toydata_video import * # NOQA >>> # >>> import kwarray >>> import kwplot >>> plt = kwplot.autoplt() >>> # >>> num_objects = 5 >>> num_frames = 100 >>> rng = kwarray.ensure_rng(0) >>> # >>> from kwcoco.demo.toydata_video import * # NOQA >>> paths = random_multi_object_path(num_objects, num_frames, rng, max_speed=0.1) >>> # >>> from mpl_toolkits.mplot3d import Axes3D # NOQA >>> ax = plt.gca(projection='3d') >>> ax.cla() >>> # >>> for path in paths: >>> time = np.arange(len(path)) >>> ax.plot(time, path.T[0] * 1, path.T[1] * 1, 'o-') >>> ax.set_xlim(0, num_frames) >>> ax.set_ylim(-.01, 1.01) >>> ax.set_zlim(-.01, 1.01) Ignore: >>> from kwcoco.demo.toydata_video import * # NOQA >>> num_objects = 1 >>> num_frames = 2 >>> rng = kwarray.ensure_rng(342) >>> paths = random_multi_object_path(num_objects, num_frames, rng, max_speed=0.1) >>> print('paths = {!r}'.format(paths)) >>> print(ub.hash_data(paths)) """ import kwarray rng = kwarray.ensure_rng(rng) USE_BOIDS = 1 if USE_BOIDS: from kwcoco.demo.boids import Boids config = { 'perception_thresh': 0.2, 'max_speed': max_speed, 'max_force': 0.001, 'damping': 0.99, } boids = Boids(num_objects, rng=rng, **config).initialize() paths = boids.paths(num_frames) return paths else: import torch from torch.nn import functional as F max_speed = rng.rand(num_objects, 1) * 0.01 max_speed = np.concatenate([max_speed, max_speed], axis=1) max_speed = torch.from_numpy(max_speed) # TODO: can we do better? torch.optim.SGD class Positions(torch.nn.Module): def __init__(model): super().__init__() _pos = torch.from_numpy(rng.rand(num_objects, 2)) model.pos = torch.nn.Parameter(_pos) def forward(model, noise): loss_parts = {} # Push objects away from each other utriu_dists = torch.nn.functional.pdist(model.pos) respulsive = (1 / (utriu_dists ** 2)).sum() / num_objects loss_parts['repulsive'] = 0.01 * respulsive.sum() # Push objects in random directions loss_parts['random'] = (model.pos * noise).sum() # Push objects away from the boundary margin = 0 x = model.pos y = F.softplus((x - 0.5) + margin) y = torch.max(F.softplus(-(x - 0.5) + margin), y) loss_parts['boundary'] = y.sum() * 2 return sum(loss_parts.values()) model = Positions() params = list(model.parameters()) optim = torch.optim.SGD(params, lr=1.00, momentum=0.9) positions = [] for i in range(num_frames): optim.zero_grad() noise = torch.from_numpy(rng.rand(2)) loss = model.forward(noise) loss.backward() if max_speed is not None: # Enforce a per-object speed limit model.pos.grad.data[:] = torch.min(model.pos.grad, max_speed) model.pos.grad.data[:] = torch.max(model.pos.grad, -max_speed) optim.step() # Enforce boundry conditions model.pos.data.clamp_(0, 1) positions.append(model.pos.data.numpy().copy()) paths = np.concatenate([p[:, None] for p in positions], axis=1) return paths
# path = np.array(positions) % 1
[docs]def random_path(num, degree=1, dimension=2, rng=None, mode='boid'): """ Create a random path using a somem ethod curve. Args: num (int): number of points in the path degree (int, default=1): degree of curvieness of the path dimension (int, default=2): number of spatial dimensions mode (str): can be boid, walk, or bezier rng (RandomState, default=None): seed References: https://github.com/dhermes/bezier Example: >>> from kwcoco.demo.toydata_video import * # NOQA >>> num = 10 >>> dimension = 2 >>> degree = 3 >>> rng = None >>> path = random_path(num, degree, dimension, rng, mode='boid') >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> plt = kwplot.autoplt() >>> kwplot.multi_plot(xdata=path[:, 0], ydata=path[:, 1], fnum=1, doclf=1, xlim=(0, 1), ylim=(0, 1)) >>> kwplot.show_if_requested() Example: >>> # xdoctest: +REQUIRES(--3d) >>> # xdoctest: +REQUIRES(module:bezier) >>> import kwarray >>> import kwplot >>> plt = kwplot.autoplt() >>> # >>> num= num_frames = 100 >>> rng = kwarray.ensure_rng(0) >>> # >>> from kwcoco.demo.toydata_video import * # NOQA >>> paths = [] >>> paths.append(random_path(num, degree=3, dimension=3, mode='bezier')) >>> paths.append(random_path(num, degree=2, dimension=3, mode='bezier')) >>> paths.append(random_path(num, degree=4, dimension=3, mode='bezier')) >>> # >>> from mpl_toolkits.mplot3d import Axes3D # NOQA >>> ax = plt.gca(projection='3d') >>> ax.cla() >>> # >>> for path in paths: >>> time = np.arange(len(path)) >>> ax.plot(time, path.T[0] * 1, path.T[1] * 1, 'o-') >>> ax.set_xlim(0, num_frames) >>> ax.set_ylim(-.01, 1.01) >>> ax.set_zlim(-.01, 1.01) >>> ax.set_xlabel('x') >>> ax.set_ylabel('y') >>> ax.set_zlabel('z') """ rng = kwarray.ensure_rng(rng) if mode == 'boid': from kwcoco.demo.boids import Boids boids = Boids(1, rng=rng).initialize() path = boids.paths(num)[0] elif mode == 'walk': # TODO: can we do better? import torch torch.optim.SGD class Position(torch.nn.Module): def __init__(self): super().__init__() self.pos = torch.nn.Parameter(torch.from_numpy(rng.rand(2))) def forward(self, noise): return (self.pos * noise).sum() pos = Position() params = list(pos.parameters()) optim = torch.optim.SGD(params, lr=0.01, momentum=0.9) positions = [] for i in range(num): optim.zero_grad() noise = torch.from_numpy(rng.rand(2)) loss = pos.forward(noise) loss.backward() optim.step() positions.append(pos.pos.data.numpy().copy()) path = np.array(positions) % 1 elif mode == 'bezier': import bezier # Create random bezier control points nodes_f = rng.rand(degree + 1, dimension).T # F-contiguous curve = bezier.Curve(nodes_f, degree=degree) if 0: # TODO: https://stackoverflow.com/questions/18244305/how-to-redistribute-points-evenly-over-a-curve t = int(np.log2(num) + 1) def recsub(c, d): if d <= 0: yield c else: a, b = c.subdivide() yield from recsub(a, d - 1) yield from recsub(b, d - 1) c = curve subcurves = list(recsub(c, d=t)) path_f = np.array([c.evaluate(0.0)[:, 0] for c in subcurves][0:num]).T else: # Evaluate path points s_vals = np.linspace(0, 1, num) # s_vals = np.linspace(*sorted(rng.rand(2)), num) path_f = curve.evaluate_multi(s_vals) path = path_f.T # C-contiguous else: raise KeyError(mode) return path