Source code for kwcoco.cli.coco_split

#!/usr/bin/env python
import ubelt as ub
import scriptconfig as scfg


[docs]class CocoSplitCLI(object): """ Splits a coco files into two parts base on some criteria. Useful for generating quick and dirty train/test splits, but in general users should opt for using ``kwcoco subset`` instead to explicitly construct these splits based on domain knowledge. """ name = 'split'
[docs] class CLIConfig(scfg.Config): """ Split a single COCO dataset into two sub-datasets. """ default = { 'src': scfg.Value(None, help='input dataset to split', position=1), 'dst1': scfg.Value('split1.mscoco.json', help='output path1'), 'dst2': scfg.Value('split2.mscoco.json', help='output path2'), 'factor': scfg.Value(3, help='ratio of items put in dset1 vs dset2'), 'rng': scfg.Value(None, help='random seed'), 'balance_categories': scfg.Value(True, help='if True tries to balance annotation categories across splits'), 'splitter': scfg.Value( 'auto', help=ub.paragraph( ''' Split method to use. Using "image" will randomly assign each image to a partition. Using "video" will randomly assign each video to a partition. Using "auto" chooses "video" if there are any, otherwise "image". '''), choices=['auto', 'image', 'video']) } epilog = """ Example Usage: kwcoco split --src special:shapes8 --dst1=learn.mscoco.json --dst2=test.mscoco.json --factor=3 --rng=42 """
[docs] @classmethod def main(cls, cmdline=True, **kw): """ Example: >>> from kwcoco.cli.coco_split import * # NOQA >>> import ubelt as ub >>> dpath = ub.Path.appdir('kwcoco/tests/cli/split').ensuredir() >>> kw = {'src': 'special:vidshapes8', >>> 'dst1': dpath / 'train.json', >>> 'dst2': dpath / 'test.json'} >>> cmdline = False >>> cls = CocoSplitCLI >>> cls.main(cmdline, **kw) """ import kwcoco import kwarray from kwcoco.util import util_sklearn config = cls.CLIConfig(kw, cmdline=cmdline) print('config = {}'.format(ub.repr2(dict(config), nl=1))) if config['src'] is None: raise Exception('must specify source: {}'.format(config['src'])) print('reading fpath = {!r}'.format(config['src'])) dset = kwcoco.CocoDataset.coerce(config['src']) splitter = config['splitter'] if splitter == 'auto': splitter = 'video' if dset.n_videos > 0 else 'image' images = dset.images() cids_per_image = images.annots.cids gids = images.lookup('id') if splitter == 'video': group_ids = images.lookup('video_id') elif splitter == 'image': group_ids = gids else: raise KeyError(splitter) final_group_ids = [] final_group_gids = [] final_group_cids = [] unique_cids = set(ub.flatten(cids_per_image)) | {0} distinct_cid = max(unique_cids) + 11 for group_id, gid, cids in zip(group_ids, gids, cids_per_image): if len(cids) == 0: final_group_ids.append(group_id) final_group_gids.append(gid) final_group_cids.append(distinct_cid) else: final_group_ids.extend([group_id] * len(cids)) final_group_gids.extend([gid] * len(cids)) final_group_cids.extend(cids) # Balanced category split rng = kwarray.ensure_rng(config['rng']) shuffle = rng is not None self = util_sklearn.StratifiedGroupKFold(n_splits=config['factor'], random_state=rng, shuffle=shuffle) if config['balance_categories']: split_idxs = list(self.split(X=final_group_gids, y=final_group_cids, groups=final_group_ids)) else: split_idxs = list(self.split(X=final_group_gids, y=final_group_gids, groups=final_group_ids)) idxs1, idxs2 = split_idxs[0] gids1 = sorted(ub.unique(ub.take(final_group_gids, idxs1))) gids2 = sorted(ub.unique(ub.take(final_group_gids, idxs2))) dset1 = dset.subset(gids1) dset2 = dset.subset(gids2) dset1.fpath = config['dst1'] print('Writing dset1 = {!r}'.format(dset1.fpath)) dset1.dump(dset1.fpath, newlines=True) dset2.fpath = config['dst2'] print('Writing dset2 = {!r}'.format(dset2.fpath)) dset2.dump(dset2.fpath, newlines=True)
_CLI = CocoSplitCLI if __name__ == '__main__': _CLI.main()