Source code for kwcoco.cli.coco_subset

#!/usr/bin/env python
import ubelt as ub
import scriptconfig as scfg


[docs] class CocoSubsetCLI(scfg.DataConfig): """ Take a subset of this dataset and write it to a new file """ __command__ = 'subset' src = scfg.Value(None, position=1, help='input dataset path') dst = scfg.Value(None, position=2, help='output dataset path') include_categories = scfg.Value(None, type=str, help=ub.paragraph( ''' a comma separated list of categories, if specified only images containing these categories will be included. ''')) # TODO: pattern matching? gids = scfg.Value(None, alias=['image_ids'], help=ub.paragraph( ''' A comma separated list of image ids. If specified, only consider these image ids. DEPRECATED. Can pass a YAML list of integer image ids directly to select_images. ''')) select_images = scfg.Value(None, type=str, help=ub.paragraph( ''' Can be a coercable YAML list of image ids, or... A json query (via the jq spec) that specifies which images belong in the subset. Note, this is a passed as the body of the following jq query format string to filter valid ids '.images[] | select({select_images}) | .id'. Examples for this argument are as follows: '.id < 3' will select all image ids less than 3. '.file_name | test(".*png")' will select only images with file names that end with png. '.file_name | test(".*png") | not' will select only images with file names that do not end with png. '.myattr == "foo"' will select only image dictionaries where the value of myattr is "foo". '.id < 3 and (.file_name | test(".*png"))' will select only images with id less than 3 that are also pngs. .myattr | in({"val1": 1, "val4": 1}) will take images where myattr is either val1 or val4. Requires the "jq" python library is installed. ''')) select_videos = scfg.Value(None, help=ub.paragraph( ''' Can be a coercable YAML list of video ids, or... A json query (via the jq spec) that specifies which videos belong in the subset. Note, this is a passed as the body of the following jq query format string to filter valid ids '.videos[] | select({select_videos}) | .id'. Examples for this argument are as follows: '.name | startswith("foo")' will select only videos where the name starts with foo. Only applicable for dataset that contain videos. Requires the "jq" python library is installed. ''')) channels = scfg.Value(None, help=ub.paragraph( ''' if specified select only images that contain these channels (specified as a delayed-image channel spec) ''')) # FIXME: this can cause issues if the src kwcoco points to files # outside of its bundle directory. Is there an inuitive way to handle # this? Maybe this followed by a move-assets operation? copy_assets = scfg.Value(False, help=ub.paragraph( ''' if True copy the assests to the new bundle directory ''')) compress = scfg.Value('auto', help='if True writes results with compression') absolute = scfg.Value('auto', help=ub.paragraph( ''' if True will reroot all paths to be absolute before writing. If "auto", becomes True if the dest dataset is written outside of the source bundle directory and copy_assets is False. ''')) # TODO: Add more filter criteria # # image size # image timestamp # image file name matches # annotations with segmentations / keypoints? # images/annotations that contain a special attribute? # images with a maximum / minimum number of annotations? # 'rng': scfg.Value(None, help='random seed'), __epilog__ = """ Example Usage: kwcoco subset --src special:shapes8 --dst=foo.kwcoco.json # Take only the even image-ids kwcoco subset --src special:shapes8 --dst=foo-even.kwcoco.json --select_images '.id % 2 == 0' # Take only the videos where the name ends with 2 kwcoco subset --src special:vidshapes8 --dst=vidsub.kwcoco.json --select_videos '.name | endswith("2")' """
[docs] @classmethod def main(cls, cmdline=True, **kw): """ CommandLine: xdoctest -m kwcoco.cli.coco_subset CocoSubsetCLI.main Example: >>> from kwcoco.cli.coco_subset import * # NOQA >>> import ubelt as ub >>> dpath = ub.Path.appdir('kwcoco/tests/cli/union').ensuredir() >>> kw = {'src': 'special:shapes8', >>> 'dst': dpath / 'subset.json', >>> 'include_categories': 'superstar'} >>> cmdline = False >>> cls = CocoSubsetCLI >>> cls.main(cmdline, **kw) """ import kwcoco config = cls.cli(data=kw, cmdline=cmdline, strict=True) print('config = {}'.format(ub.urepr(config, nl=1))) if config['src'] is None: raise Exception('must specify subset src: {}'.format(config['src'])) if config['dst'] is None: raise Exception('must specify subset dst: {}'.format(config['dst'])) print('reading fpath = {!r}'.format(config['src'])) dset = kwcoco.CocoDataset.coerce(config['src']) dst_fpath = ub.Path(config['dst']) if config['absolute'] == 'auto': if not config['copy_assets']: src_fpath = ub.Path(dset.fpath) src_bundle_dpath = src_fpath.absolute().parent dst_bundle_dpath = dst_fpath.absolute().parent # Destinations are different, we will need to force a reroot absolute = (src_bundle_dpath.resolve() != dst_bundle_dpath.resolve()) else: absolute = False else: absolute = config['absolute'] print(f'absolute={absolute}') new_dset = query_subset(dset, config) if absolute: new_dset.reroot(absolute=absolute) else: if config['copy_assets']: # a bit roundabout, but it seems to work new_dset.reroot(absolute=False) new_dset.fpath = dst_fpath print(f'new_dset.fpath={new_dset.fpath}') if config['copy_assets']: # Create a copy of the data, (currently only works for relative # kwcoco files) # TODO use kwutil copy manager from os.path import join, dirname import shutil print('Copying assets') # new_dset.reroot(new_dset.bundle_dpath, old_prefix=dset.bundle_dpath) tocopy = [] dstdirs = set() print(f'new_dset.bundle_dpath={new_dset.bundle_dpath}') for gid, new_img in new_dset.index.imgs.items(): old_img = dset.index.imgs[gid] if new_img.get('file_name', None) is not None: old_fpath = join(dset.bundle_dpath, old_img['file_name']) new_fpath = join(new_dset.bundle_dpath, new_img['file_name']) dstdirs.add(dirname(new_fpath)) tocopy.append((old_fpath, new_fpath)) new_aux_list = new_img.get('auxiliary', []) old_aux_list = old_img.get('auxiliary', []) for old_aux, new_aux in zip(old_aux_list, new_aux_list): old_fpath = join(dset.bundle_dpath, old_aux['file_name']) new_fpath = join(new_dset.bundle_dpath, new_aux['file_name']) dstdirs.add(dirname(new_fpath)) tocopy.append((old_fpath, new_fpath)) # Ensure directories for dpath in dstdirs: ub.ensuredir(dpath) pool = ub.JobPool(max_workers=4) for src, dst in tocopy: pool.submit(shutil.copy2, src, dst) for future in pool.as_completed(desc='copy assets'): future.result() print('Writing new_dset.fpath = {!r}'.format(new_dset.fpath)) dumpkw = { 'newlines': True, 'compress': config['compress'], } new_dset.dump(new_dset.fpath, **dumpkw)
__cli__ = CocoSubsetCLI
[docs] def query_subset(dset, config): """ Example: >>> # xdoctest: +REQUIRES(module:jq) >>> from kwcoco.cli.coco_subset import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo(verbose=0) >>> assert dset.n_images == 3 >>> # >>> config = CocoSubsetCLI(**{'select_images': '.id < 3'}) >>> new_dset = query_subset(dset, config) >>> assert new_dset.n_images == 2 >>> # >>> config = CocoSubsetCLI(**{'select_images': '.file_name | test(".*.png")'}) >>> new_dset = query_subset(dset, config) >>> assert all(n.endswith('.png') for n in new_dset.images().lookup('file_name')) >>> assert new_dset.n_images == 2 >>> # >>> config = CocoSubsetCLI(**{'select_images': '.file_name | test(".*.png") | not'}) >>> new_dset = query_subset(dset, config) >>> assert not any(n.endswith('.png') for n in new_dset.images().lookup('file_name')) >>> assert new_dset.n_images == 1 >>> # >>> config = CocoSubsetCLI(**{'select_images': '.id < 3 and (.file_name | test(".*.png"))'}) >>> new_dset = query_subset(dset, config) >>> assert new_dset.n_images == 1 >>> # >>> config = CocoSubsetCLI(**{'select_images': '.id < 3 or (.file_name | test(".*.png"))'}) >>> new_dset = query_subset(dset, config) >>> assert new_dset.n_images == 3 Example: >>> # xdoctest: +REQUIRES(module:jq) >>> from kwcoco.cli.coco_subset import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8', verbose=0) >>> assert dset.n_videos == 8 >>> assert dset.n_images == 16 >>> config = CocoSubsetCLI(**{'select_videos': '.name == "toy_video_3"'}) >>> new_dset = query_subset(dset, config) >>> assert new_dset.n_images == 2 >>> assert new_dset.n_videos == 1 """ valid_gids = set(dset.imgs.keys()) if config['gids'] is not None: if isinstance(config['gids'], str): valid_gids &= set(map(int, config['gids'].split(','))) elif ub.iterable(config['gids']): valid_gids &= set(map(int, config['gids'])) else: raise KeyError(config['gids']) if config['include_categories'] is not None: catnames = config['include_categories'].split(',') chosen_cids = [] for cname in catnames: cid = dset._resolve_to_cat(cname)['id'] chosen_cids.append(cid) category_gids = set(ub.flatten(ub.take( dset.index.cid_to_gids, set(chosen_cids)))) valid_gids &= category_gids from kwcoco._helpers import _query_image_ids # TODO: more the rest of the filters into this helper and normalize them. valid_gids = _query_image_ids( coco_dset=dset, select_images=config['select_images'], select_videos=config['select_videos'], valid_image_ids=valid_gids) if config['channels'] is not None: from delayed_image.channel_spec import ChannelSpec requested_chans = ChannelSpec(config['channels']) valid_gids = [ gid for gid in valid_gids if (requested_chans & dset.coco_image(gid).channels).numel() == requested_chans.numel() ] new_dset = dset.subset(valid_gids) return new_dset
if __name__ == '__main__': __cli__.main()