"""
An implementation and extension of the original MS-COCO API [CocoFormat]_.
Extends the format to also include line annotations.
The following describes psuedo-code for the high level spec (some of which may
not be have full support in the Python API). A formal json-schema is defined in
:mod:`kwcoco.coco_schema`.
Note:
The main object in this file is :class:`CocoDataset`, which is composed of
several mixin classes. See the class and method documentation for more
details.
An informal description of the spec given in: `coco_schema_informal.rst <coco_schema_informal.rst>`_.
For a formal description of the spec see the `coco_schema.json
<coco_schema.json>`_, which is generated by :py:mod`kwcoco/coco_schema`.
TODO:
- [ ] Use ijson (modified to support NaN) to lazilly load pieces of the
dataset in the background or on demand. This will give us faster access
to categories / images, whereas we will always have to wait for
annotations etc...
- [X] Should img_root be changed to bundle_dpath?
- [ ] Read video data, return numpy arrays (requires API for images)
- [ ] Spec for video URI, and convert to frames @ framerate function.
- [x] Document channel spec
- [x] Document sensor-channel spec
- [X] Add remove videos method
- [ ] Efficiency: Make video annotations more efficient by only tracking
keyframes, provide an API to obtain a dense or interpolated
annotation on an intermediate frame.
- [ ] Efficiency: Allow each section of the kwcoco file to be written as a
separate json file. Perhaps allow genric pointer support? Might get
messy.
- [ ] Reroot needs to be redesigned very carefully.
- [ ] Allow parts of the kwcoco file to be references to other json files.
- [X] Add top-level track table
- [ ] Fully transition to integer track ids (in progress)
References:
.. [CocoFormat] http://cocodataset.org/#format-data
.. [PyCocoToolsMask] https://github.com/nightrome/cocostuffapi/blob/master/PythonAPI/pycocotools/mask.py
.. [CocoTutorial] https://www.immersivelimit.com/tutorials/create-coco-annotations-from-scratch/#coco-dataset-format
"""
import copy
import sys
import itertools as it
import numbers
import os
import ubelt as ub
import warnings
from packaging.version import parse as Version
from collections import OrderedDict, defaultdict
from os.path import (dirname, basename, join, exists, isdir, relpath)
from functools import partial
# Vectorized ORM-Like containers
from kwcoco.coco_objects1d import (
Categories, Videos, Images, Annots, Tracks
)
from kwcoco.abstract_coco_dataset import AbstractCocoDataset
from kwcoco import exceptions
from kwcoco._helpers import (
SortedSet, UniqueNameRemapper, _NextId,
_delitems, _lut_image_frame_index, _lut_annot_frame_index,
_load_and_postprocess, _image_corruption_check
)
from kwcoco._helpers import _ID_Remapper
import json as pjson
from types import ModuleType
# The ujson library is faster than Python's json, but the API has some
# limitations and requires a minimum version. Currently we only use it to read,
# we have to wait for https://github.com/ultrajson/ultrajson/pull/518 to land
# before we use it to write.
try:
import ujson
except ImportError:
ujson = None
KWCOCO_USE_UJSON = bool(os.environ.get('KWCOCO_USE_UJSON'))
if ujson is not None and Version(ujson.__version__) >= Version('5.2.0') and KWCOCO_USE_UJSON:
json_r: ModuleType = ujson
json_w: ModuleType = pjson
else:
json_r: ModuleType = pjson
json_w: ModuleType = pjson
if sys.version_info <= (3, 6):
_dict = OrderedDict
else:
_dict = dict
__docstubs__ = """
from kwcoco.coco_image import CocoImage
"""
# These are the keys that are / should be supported by the API
SPEC_KEYS = [
'info',
'licenses',
'categories',
'keypoint_categories', # support only partially implemented
'videos',
'images',
'tracks',
'annotations',
]
[docs]
class MixinCocoDepricate:
"""
These functions are marked for deprication and will be removed
"""
[docs]
def keypoint_annotation_frequency(self):
"""
DEPRECATED
Example:
>>> import kwcoco
>>> import ubelt as ub
>>> self = kwcoco.CocoDataset.demo('shapes', rng=0)
>>> hist = self.keypoint_annotation_frequency()
>>> hist = ub.odict(sorted(hist.items()))
>>> # FIXME: for whatever reason demodata generation is not determenistic when seeded
>>> print(ub.urepr(hist)) # xdoc: +IGNORE_WANT
{
'bot_tip': 6,
'left_eye': 14,
'mid_tip': 6,
'right_eye': 14,
'top_tip': 6,
}
"""
ub.schedule_deprecation(
'kwcoco', name='keypoint_annotation_frequency', type='method',
deprecate='0.3.4', error='1.0.0', remove='1.1.0',
migration=(
'Implement this functionality explicitly. '
'It is too niche for a the core API.'
'Or propose a better way on '
'https://gitlab.kitware.com/computer-vision/kwcoco/-/issues '
)
)
ann_kpcids = [kp['keypoint_category_id']
for ann in self.dataset['annotations']
for kp in ann.get('keypoints', [])]
kpcid_to_name = {kpcat['id']: kpcat['name']
for kpcat in self.dataset['keypoint_categories']}
kpcid_to_num = ub.dict_hist(ann_kpcids,
labels=list(kpcid_to_name.keys()))
kpname_to_num = ub.map_keys(kpcid_to_name, kpcid_to_num)
return kpname_to_num
[docs]
def category_annotation_type_frequency(self):
"""
DEPRECATED
Reports the number of annotations of each type for each category
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> hist = self.category_annotation_frequency()
>>> print(ub.urepr(hist))
"""
catname_to_nannot_types = {}
ub.schedule_deprecation(
'kwcoco', name='category_annotation_type_frequency', type='method',
deprecate='0.3.4', error='1.0.0', remove='1.1.0',
migration=(
'Implement this functionality explicitly. '
'It is too niche for a the core API.'
'Or propose a better way on '
'https://gitlab.kitware.com/computer-vision/kwcoco/-/issues '
)
)
def _annot_type(ann):
"""
Returns what type of annotation ``ann`` is.
"""
return tuple(sorted(set(ann) & {'bbox', 'line', 'keypoints'}))
for cid, aids in self.index.cid_to_aids.items():
name = self.cats[cid]['name']
hist = ub.dict_hist(map(_annot_type, ub.take(self.anns, aids)))
catname_to_nannot_types[name] = ub.map_keys(
lambda k: k[0] if len(k) == 1 else k, hist)
return catname_to_nannot_types
[docs]
def imread(self, gid):
"""
DEPRECATED: use load_image or delayed_image
Loads a particular image
"""
ub.schedule_deprecation(
'kwcoco', name='imread', type='method',
deprecate='0.3.4', error='1.0.0', remove='1.1.0',
migration=(
'use `self.coco_image(gid).imdelay().finalize()`.'
)
)
return self.load_image(gid)
[docs]
class MixinCocoAccessors:
"""
TODO: better name
"""
[docs]
def delayed_load(self, gid, channels=None, space='image'):
"""
Experimental method
Args:
gid (int): image id to load
channels (kwcoco.FusedChannelSpec): specific channels to load.
if unspecified, all channels are loaded.
space (str):
can either be "image" for loading in image space, or
"video" for loading in video space.
TODO:
- [X] Currently can only take all or none of the channels from each
base-image / auxiliary dict. For instance if the main image is
r|g|b you can't just select g|b at the moment.
- [X] The order of the channels in the delayed load should
match the requested channel order.
- [X] TODO: add nans to bands that don't exist or throw an error
Example:
>>> import kwcoco
>>> gid = 1
>>> #
>>> self = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = self.delayed_load(gid)
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize()))
>>> #
>>> self = kwcoco.CocoDataset.demo('shapes8')
>>> delayed = self.delayed_load(gid)
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize()))
>>> crop = delayed.crop((slice(0, 3), slice(0, 3)))
>>> crop.finalize()
>>> # TODO: should only select the "red" channel
>>> self = kwcoco.CocoDataset.demo('shapes8')
>>> delayed = self.delayed_load(gid, channels='r')
>>> import kwcoco
>>> gid = 1
>>> #
>>> self = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = self.delayed_load(gid, channels='B1|B2', space='image')
>>> print('delayed = {!r}'.format(delayed))
>>> delayed = self.delayed_load(gid, channels='B1|B2|B11', space='image')
>>> print('delayed = {!r}'.format(delayed))
>>> delayed = self.delayed_load(gid, channels='B8|B1', space='video')
>>> print('delayed = {!r}'.format(delayed))
>>> delayed = self.delayed_load(gid, channels='B8|foo|bar|B1', space='video')
>>> print('delayed = {!r}'.format(delayed))
"""
ub.schedule_deprecation(
'kwcoco', 'delayed_load', 'method',
migration='Use ".coco_image(gid).imdelay(...)" instead.',
deprecate='0.7.3', error='1.0.0', remove='1.1.0',
)
coco_img = self.coco_image(gid)
delayed = coco_img.imdelay(channels=channels, space=space)
return delayed
[docs]
def load_image(self, gid_or_img, channels=None):
"""
Reads an image from disk and
Args:
gid_or_img (int | dict): image id or image dict
channels (str | None): if specified, load data from auxiliary
channels instead
Returns:
np.ndarray : the image
Note:
Prefer to use the CocoImage methods instead
"""
try:
import kwimage
gpath = self.get_image_fpath(gid_or_img, channels=channels)
np_img = kwimage.imread(gpath)
except Exception:
img = self._resolve_to_img(gid_or_img)
np_img = self.delayed_load(img['id'], channels=channels).finalize()
return np_img
return np_img
[docs]
def get_image_fpath(self, gid_or_img, channels=None):
"""
Returns the full path to the image
Args:
gid_or_img (int | dict): image id or image dict
channels (str | None): if specified, return a path to data
containing auxiliary channels instead
Note:
Prefer to use the CocoImage methods instead
Returns:
PathLike: full path to the image
"""
if channels is not None:
gpath = self.get_auxiliary_fpath(gid_or_img, channels)
else:
img = self._resolve_to_img(gid_or_img)
gpath = ub.Path(self.bundle_dpath) / img['file_name']
return gpath
[docs]
def _get_img_auxiliary(self, gid_or_img, channels):
""" returns the auxiliary dictionary for a specific channel """
img = self._resolve_to_img(gid_or_img)
found = None
if 'auxiliary' in img:
auxlist = img['auxiliary']
elif 'assets' in img:
auxlist = img['assets']
else:
raise KeyError('no auxiliary data')
for aux in auxlist:
if aux['channels'] == channels:
found = aux
break
if found is None:
raise Exception(
'Image does not have auxiliary channels={}'.format(channels))
return found
[docs]
def get_auxiliary_fpath(self, gid_or_img, channels):
"""
Returns the full path to auxiliary data for an image
Args:
gid_or_img (int | dict): an image or its id
channels (str): the auxiliary channel to load (e.g. disparity)
Note:
Prefer to use the CocoImage methods instead
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('shapes8', aux=True)
>>> self.get_auxiliary_fpath(1, 'disparity')
"""
aux = self._get_img_auxiliary(gid_or_img, channels)
fpath = ub.Path(self.bundle_dpath) / aux['file_name']
return fpath
[docs]
def load_annot_sample(self, aid_or_ann, image=None, pad=None):
"""
Reads the chip of an annotation. Note this is much less efficient than
using a sampler, but it doesn't require disk cache.
Maybe deprecate?
Args:
aid_or_int (int | dict): annot id or dict
image (ArrayLike | None): preloaded image
(note: this process is inefficient unless image is specified)
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> sample = self.load_annot_sample(2, pad=100)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(sample['im'])
>>> kwplot.show_if_requested()
"""
import kwarray
import numpy as np
ann = self._resolve_to_ann(aid_or_ann)
if image is None:
image = self.load_image(ann['image_id'])
x, y, w, h = ann['bbox']
in_slice = (
slice(int(y), int(np.ceil(y + h))),
slice(int(x), int(np.ceil(x + w))),
)
data_sliced, transform = kwarray.padded_slice(image, in_slice, pad=pad,
return_info=True)
sample = {
'im': data_sliced,
'transform': transform,
}
return sample
[docs]
def _resolve_to_id(self, id_or_dict):
"""
Ensures output is an id
"""
if isinstance(id_or_dict, numbers.Integral):
resolved_id = id_or_dict
else:
resolved_id = id_or_dict['id']
return resolved_id
[docs]
def _resolve_to_cid(self, id_or_name_or_dict):
"""
Ensures output is an category id
Note:
this does not resolve aliases (yet), for that see _alias_to_cat
TODO:
we could maintain an alias index to make this fast
"""
if isinstance(id_or_name_or_dict, numbers.Integral):
resolved_id = id_or_name_or_dict
elif isinstance(id_or_name_or_dict, str):
resolved_id = self.index.name_to_cat[id_or_name_or_dict]['id']
else:
resolved_id = id_or_name_or_dict['id']
return resolved_id
[docs]
def _resolve_to_gid(self, id_or_name_or_dict):
"""
Ensures output is an category id
"""
if isinstance(id_or_name_or_dict, numbers.Integral):
resolved_id = id_or_name_or_dict
elif isinstance(id_or_name_or_dict, str):
resolved_id = self.index.file_name_to_img[id_or_name_or_dict]['id']
else:
resolved_id = id_or_name_or_dict['id']
return resolved_id
[docs]
def _resolve_to_vidid(self, id_or_name_or_dict):
"""
Ensures output is an video id
"""
if isinstance(id_or_name_or_dict, numbers.Integral):
resolved_id = id_or_name_or_dict
elif isinstance(id_or_name_or_dict, str):
resolved_id = self.index.name_to_video[id_or_name_or_dict]['id']
else:
resolved_id = id_or_name_or_dict['id']
return resolved_id
[docs]
def _resolve_to_trackid(self, id_or_name_or_dict):
if isinstance(id_or_name_or_dict, numbers.Integral):
resolved_id = id_or_name_or_dict
elif isinstance(id_or_name_or_dict, str):
resolved_id = self.index.name_to_track[id_or_name_or_dict]['id']
else:
resolved_id = id_or_name_or_dict['id']
return resolved_id
[docs]
def _resolve_to_ann(self, aid_or_ann):
"""
Ensures output is an annotation dictionary
"""
if isinstance(aid_or_ann, numbers.Integral):
resolved_ann = None
if self.anns is not None:
resolved_ann = self.anns[aid_or_ann]
else:
for ann in self.dataset['annotations']:
if ann['id'] == aid_or_ann:
resolved_ann = ann
break
if not resolved_ann:
raise IndexError(
'aid {} not in dataset'.format(aid_or_ann))
else:
resolved_ann = aid_or_ann
return resolved_ann
[docs]
def _resolve_to_img(self, gid_or_img):
"""
Ensures output is an image dictionary
"""
if isinstance(gid_or_img, numbers.Integral):
resolved_img = None
if self.imgs is not None:
resolved_img = self.imgs[gid_or_img]
else:
for img in self.dataset['images']:
if img['id'] == gid_or_img:
resolved_img = img
break
if not resolved_img:
raise IndexError(
'gid {} not in dataset'.format(gid_or_img))
else:
resolved_img = gid_or_img
return resolved_img
[docs]
def _resolve_to_kpcat(self, kp_identifier):
"""
Lookup a keypoint-category dict via its name or id
Args:
kp_identifier (int | str | dict): either the keypoint category
name, alias, or its keypoint_category_id.
Returns:
Dict: keypoint category dictionary
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('shapes')
>>> kpcat1 = self._resolve_to_kpcat(1)
>>> kpcat2 = self._resolve_to_kpcat('left_eye')
>>> assert kpcat1 is kpcat2
>>> import pytest
>>> with pytest.raises(KeyError):
>>> self._resolve_to_cat('human')
"""
if 'keypoint_categories' not in self.dataset:
raise NotImplementedError('Must have newstyle keypoints to use')
# TODO: add keypoint categories to the index and optimize
if isinstance(kp_identifier, numbers.Integral):
kpcat = None
for _kpcat in self.dataset['keypoint_categories']:
if _kpcat['id'] == kp_identifier:
kpcat = _kpcat
if kpcat is None:
raise KeyError('unable to find keypoint category')
elif isinstance(kp_identifier, str):
kpcat = None
for _kpcat in self.dataset['keypoint_categories']:
if _kpcat['name'] == kp_identifier:
kpcat = _kpcat
if kpcat is None:
for _kpcat in self.dataset['keypoint_categories']:
alias = _kpcat.get('alias', {})
alias = alias if ub.iterable(alias) else {alias}
if kp_identifier in alias:
kpcat = _kpcat
if kpcat is None:
raise KeyError('unable to find keypoint category')
elif isinstance(kp_identifier, dict):
kpcat = kp_identifier
else:
raise TypeError(type(kp_identifier))
return kpcat
[docs]
def _resolve_to_cat(self, cat_identifier):
"""
Lookup a coco-category dict via its name, alias, or id.
Args:
cat_identifier (int | str | dict): either the category name,
alias, or its category_id.
Raises:
KeyError: if the category doesn't exist.
Note:
If the index is not built, the method will work but may be slow.
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> cat = self._resolve_to_cat('human')
>>> import pytest
>>> assert self._resolve_to_cat(cat['id']) is cat
>>> assert self._resolve_to_cat(cat) is cat
>>> with pytest.raises(KeyError):
>>> self._resolve_to_cat(32)
>>> self.index.clear()
>>> assert self._resolve_to_cat(cat['id']) is cat
>>> with pytest.raises(KeyError):
>>> self._resolve_to_cat(32)
"""
if isinstance(cat_identifier, numbers.Integral):
if self.cats:
cat = self.cats[cat_identifier]
else:
# If the index is not built
found = None
for cat in self.dataset['categories']:
if cat['id'] == cat_identifier:
found = cat
break
if found is None:
raise KeyError(
'Cannot find a category with id={}'.format(cat_identifier))
elif isinstance(cat_identifier, str):
cat = self._alias_to_cat(cat_identifier)
elif isinstance(cat_identifier, dict):
cat = cat_identifier
else:
raise TypeError(type(cat_identifier))
return cat
[docs]
def _alias_to_cat(self, alias_catname):
"""
Lookup a coco-category via its name or an "alias" name.
In production code, use :func:`_resolve_to_cat` instead.
Args:
alias_catname (str): category name or alias
Returns:
dict: coco category dictionary
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> cat = self._alias_to_cat('human')
>>> import pytest
>>> with pytest.raises(KeyError):
>>> self._alias_to_cat('person')
>>> cat['alias'] = ['person']
>>> self._alias_to_cat('person')
>>> cat['alias'] = 'person'
>>> self._alias_to_cat('person')
>>> assert self._alias_to_cat(None) is None
"""
if alias_catname is None:
return None
if self.name_to_cat and alias_catname in self.name_to_cat:
fixed_catname = alias_catname
fixed_cat = self.name_to_cat[fixed_catname]
else:
# Try to find an alias
fixed_catname = None
fixed_cat = None
for cat in self.dataset['categories']:
alias_list = cat.get('alias', [])
if isinstance(alias_list, str):
alias_list = [alias_list]
assert isinstance(alias_list, list)
alias_list = alias_list + [cat['name']]
for alias in alias_list:
if alias_catname.lower() == alias.lower():
fixed_cat = cat
fixed_catname = cat['name']
break
if fixed_cat is not None:
break
if fixed_cat is None:
raise KeyError('Unknown category: {}'.format(alias_catname))
return fixed_cat
[docs]
def category_graph(self):
"""
Construct a networkx category hierarchy
Returns:
networkx.DiGraph:
graph: a directed graph where category names are the nodes,
supercategories define edges, and items in each category dict
(e.g. category id) are added as node properties.
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> graph = self.category_graph()
>>> assert 'astronaut' in graph.nodes()
>>> assert 'keypoints' in graph.nodes['human']
Ignore:
import graphid
graphid.util.show_nx(graph)
"""
# TODO: should supercategories that don't exist as nodes be added here?
import networkx as nx
graph = nx.DiGraph()
for cat in self.dataset['categories']:
graph.add_node(cat['name'], **cat)
if cat.get('supercategory', None) is not None:
u = cat['supercategory']
v = cat['name']
if u != v:
graph.add_edge(u, v)
return graph
[docs]
def object_categories(self):
"""
Construct a consistent CategoryTree representation of object classes
Returns:
kwcoco.CategoryTree: category data structure
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> classes = self.object_categories()
>>> print('classes = {}'.format(classes))
"""
from kwcoco.category_tree import CategoryTree
graph = self.category_graph()
classes = CategoryTree(graph)
return classes
[docs]
def keypoint_categories(self):
"""
Construct a consistent CategoryTree representation of keypoint classes
Returns:
kwcoco.CategoryTree: category data structure
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> classes = self.keypoint_categories()
>>> print('classes = {}'.format(classes))
"""
from kwcoco.category_tree import CategoryTree
try:
if self.index.kpcats is not None:
kpcats = self.index.kpcats.values()
else:
kpcats = iter(self.dataset['keypoint_categories'])
except KeyError:
catnames = self._keypoint_category_names()
classes = CategoryTree.coerce(catnames)
else:
import networkx as nx
graph = nx.DiGraph()
for cat in kpcats:
graph.add_node(cat['name'], **cat)
if cat.get('supercategory', None) is not None:
graph.add_edge(cat['supercategory'], cat['name'])
classes = CategoryTree(graph)
return classes
[docs]
def _keypoint_category_names(self):
"""
Construct keypoint categories names.
Uses new-style if possible, otherwise this falls back on old-style.
Returns:
List[str]:
names - list of keypoint category names
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> names = self._keypoint_category_names()
>>> print(names)
"""
kpcats = self.dataset.get('keypoint_categories', None)
if kpcats is not None:
return [c['name'] for c in self.dataset['keypoint_categories']]
else:
names = []
cats = sorted(self.dataset['categories'], key=lambda c: c['id'])
for cat in cats:
if 'keypoints' in cat:
names.extend(cat['keypoints'])
return names
[docs]
def _lookup_kpnames(self, cid):
""" Get the keypoint categories for a certain class """
kpnames = None
orig_cat = self.cats[cid]
while kpnames is None:
# Extract keypoint names for each annotation
cat = self.cats[cid]
parent = cat.get('supercategory', None)
if 'keypoints' in cat:
kpnames = cat['keypoints']
elif parent is not None:
cid = self.name_to_cat[cat['supercategory']]['id']
else:
raise KeyError('could not find keypoint names for cid={}, cat={}, orig_cat={}'.format(cid, cat, orig_cat))
return kpnames
[docs]
def _coco_image(self, gid):
ub.schedule_deprecation(
'kwcoco', '_coco_image', 'method',
migration='Use "coco_image" instead.',
deprecate='0.5.9', error='1.0.0', remove='1.1.0',
)
return self.coco_image(gid)
[docs]
def coco_image(self, gid):
"""
Args:
gid (int): image id
Returns:
CocoImage
"""
# Experimental
from kwcoco.coco_image import CocoImage
img = self.index.imgs[gid]
image = CocoImage(img, dset=self)
return image
[docs]
class MixinCocoConstructors:
"""
Classmethods for constructing CocoDataset objects
"""
[docs]
@classmethod
def coerce(cls, key, sqlview=False, **kw):
"""
Attempt to transform the input into the intended CocoDataset.
Args:
key : this can either be an instance of a CocoDataset, a
string URI pointing to an on-disk dataset, or a special
key for creating demodata.
sqlview (bool | str):
If truthy, will return the dataset as a cached sql view, which
can be quicker to load and use in some instances. Can be given
as a string, which sets the backend that is used: either sqlite
or postgresql. Defaults to False.
**kw: passed to whatever constructor is chosen (if any)
Returns:
AbstractCocoDataset | kwcoco.CocoDataset | kwcoco.CocoSqlDatabase
Example:
>>> # test coerce for various input methods
>>> import kwcoco
>>> from kwcoco.coco_sql_dataset import assert_dsets_allclose
>>> dct_dset = kwcoco.CocoDataset.coerce('special:shapes8')
>>> copy1 = kwcoco.CocoDataset.coerce(dct_dset)
>>> copy2 = kwcoco.CocoDataset.coerce(dct_dset.fpath)
>>> assert assert_dsets_allclose(dct_dset, copy1)
>>> assert assert_dsets_allclose(dct_dset, copy2)
>>> # xdoctest: +REQUIRES(module:sqlalchemy)
>>> sql_dset = dct_dset.view_sql()
>>> copy3 = kwcoco.CocoDataset.coerce(sql_dset)
>>> copy4 = kwcoco.CocoDataset.coerce(sql_dset.fpath)
>>> assert assert_dsets_allclose(dct_dset, sql_dset)
>>> assert assert_dsets_allclose(dct_dset, copy3)
>>> assert assert_dsets_allclose(dct_dset, copy4)
"""
import kwcoco
if isinstance(key, cls):
self = key
if isinstance(key, os.PathLike):
key = str(key)
if isinstance(key, str):
import uritools
dset_fpath = ub.expandpath(key)
# Parse the the "file" URI scheme
# https://tools.ietf.org/html/rfc8089
result = uritools.urisplit(dset_fpath)
if result.scheme == 'sqlite' or result.path.endswith('.sqlite'):
from kwcoco.coco_sql_dataset import CocoSqlDatabase
self = CocoSqlDatabase(dset_fpath).connect()
elif result.path.endswith('.json') or '.json' in result.path or '.kwcoco' in result.path:
if sqlview:
from kwcoco.coco_sql_dataset import CocoSqlDatabase
kw['backend'] = sqlview
self = CocoSqlDatabase.coerce(dset_fpath, **kw)
else:
self = kwcoco.CocoDataset(dset_fpath, **kw)
elif result.scheme == 'special':
self = cls.demo(key=key, **kw)
else:
# This case can be env-dependant in the unlikely case where you
# have a file with the same name as a demo key. But hey, you
# are using the coerce function, be more explicit if you want
# predictable behavior.
if exists(dset_fpath):
self = kwcoco.CocoDataset(dset_fpath, **kw)
else:
self = cls.demo(key=key, **kw)
elif type(key).__name__ == 'CocoSqlDatabase':
self = key
elif type(key).__name__ == 'CocoDataset':
self = key
elif type(key).__name__ == 'CocoSampler':
self = key.dset
else:
raise TypeError(type(key))
return self
[docs]
@classmethod
def demo(cls, key='photos', **kwargs):
"""
Create a toy coco dataset for testing and demo puposes
Args:
key (str):
Either 'photos' (default), 'shapes', or 'vidshapes'. There are
also special sufixes that can control behavior.
Basic options that define which flavor of demodata to generate
are: `photos`, `shapes`, and `vidshapes`. A numeric suffix e.g.
`vidshapes8` can be specified to indicate the size of the
generated demo dataset. There are other special suffixes that
are available. See the code in this function for explicit
details on what is allowed.
TODO: better documentation for these demo datasets.
As a quick summary: the vidshapes key is the most robust and
mature demodata set, and here are several useful variants of
the vidshapes key.
(1) vidshapes8 - the 8 suffix is the number of videos in this case.
(2) vidshapes8-multispectral - generate 8 multispectral videos.
(3) vidshapes8-msi - msi is an alias for multispectral.
(4) vidshapes8-frames5 - generate 8 videos with 5 frames each.
(5) vidshapes2-tracks5 - generate 2 videos with 5 tracks each.
(6) vidshapes2-speed0.1-frames7 - generate 2 videos with 7
frames where the objects move with with a speed of 0.1.
**kwargs : if key is shapes, these arguments are passed to toydata
generation. The Kwargs section of this docstring documents a
subset of the available options. For full details, see
:func:`demodata_toy_dset` and :func:`random_video_dset`.
Notable options are:
bundle_dpath (PathLike): location to write the demo bundle
fpath (PathLike): location to write the demo kwcoco file
Kwargs:
image_size (Tuple[int, int]): width / height size of the images
dpath (str | PathLike):
path to the directory where any generated demo bundles will be
written to. Defaults to using kwcoco cache dir.
aux (bool): if True generates dummy auxiliary channels
rng (int | RandomState | None):
random number generator or seed
verbose (int): verbosity mode. Defaults to 3.
Example:
>>> # Basic demodata keys
>>> print(CocoDataset.demo('photos', verbose=1))
>>> print(CocoDataset.demo('shapes', verbose=1))
>>> print(CocoDataset.demo('vidshapes', verbose=1))
>>> # Varaints of demodata keys
>>> print(CocoDataset.demo('shapes8', verbose=0))
>>> print(CocoDataset.demo('shapes8-msi', verbose=0))
>>> print(CocoDataset.demo('shapes8-frames1-speed0.2-msi', verbose=0))
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes5', num_frames=5,
>>> verbose=0, rng=None)
>>> dset = kwcoco.CocoDataset.demo('vidshapes5', num_frames=5,
>>> num_tracks=4, verbose=0, rng=44)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> pnums = kwplot.PlotNums(nSubplots=len(dset.index.imgs))
>>> fnum = 1
>>> for gx, gid in enumerate(dset.index.imgs.keys()):
>>> canvas = dset.draw_image(gid=gid)
>>> kwplot.imshow(canvas, pnum=pnums[gx], fnum=fnum)
>>> #dset.show_image(gid=gid, pnum=pnums[gx])
>>> kwplot.show_if_requested()
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes5-aux', num_frames=1,
>>> verbose=0, rng=None)
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes1-multispectral', num_frames=5,
>>> verbose=0, rng=None)
>>> # This is the first use-case of image names
>>> assert len(dset.index.file_name_to_img) == 0, (
>>> 'the multispectral demo case has no "base" image')
>>> assert len(dset.index.name_to_img) == len(dset.index.imgs) == 5
>>> dset.remove_images([1])
>>> assert len(dset.index.name_to_img) == len(dset.index.imgs) == 4
>>> dset.remove_videos([1])
>>> assert len(dset.index.name_to_img) == len(dset.index.imgs) == 0
"""
import parse
import re
kwargs.pop('autobuild', None)
if key.startswith('special:'):
key = key.split(':')[1]
if key.startswith('shapes'):
from kwcoco.demo import toydata_image
res = parse.parse('shapes{num_imgs:d}', key)
if res:
kwargs['n_imgs'] = int(res.named['num_imgs'])
if 'rng' not in kwargs and 'n_imgs' in kwargs:
kwargs['rng'] = kwargs['n_imgs']
self = toydata_image.demodata_toy_dset(**kwargs)
self.tag = key
elif key.startswith('vidshapes'):
from kwcoco.demo import toydata_video
verbose = kwargs.get('verbose', 1)
res = parse.parse('vidshapes{num_videos:d}', key)
if res is None:
res = parse.parse('vidshapes{num_videos:d}-{suffix}', key)
if res is None:
res = parse.parse('vidshapes-{suffix}', key)
"""
The rule is that the suffix will be split by the '-' character
and any registered pattern or alias will impact the kwargs
for random_video_dset
"""
suff_parts = []
if verbose > 3:
print('res = {!r}'.format(res))
if res:
if 'num_videos' in res.named:
kwargs['num_videos'] = int(res.named['num_videos'])
if 'suffix' in res.named:
suff_parts = [p for p in res.named['suffix'].split('-') if p]
if verbose > 3:
print('suff_parts = {!r}'.format(suff_parts))
# The allowed suffix patterns and aliases are defined here
vidkw = {
'render': True,
'num_videos': 1,
'num_frames': 2,
'num_tracks': 2,
'anchors': None,
'image_size': (600, 600),
'aux': None,
'multispectral': None,
'multisensor': False,
'max_speed': 0.01,
'verbose': verbose,
}
vidkw_aliases = {
'num_frames': {'frames'},
'num_tracks': {'tracks'},
'num_videos': {'videos'},
'max_speed': {'speed'},
'image_size': {'gsize'},
'multispectral': {'msi'},
}
alias_to_key = {k: v for v, ks in vidkw_aliases.items() for k in ks}
# These are the variables the vidshapes generator accepts
for part in suff_parts:
match = re.search(r'[\d]', part)
if match is None:
value = True
key = part
else:
key = part[:match.span()[0]]
value = part[match.span()[0]:]
key = alias_to_key.get(key, key)
if key == 'image_size':
value = int(value)
if key == 'num_frames':
value = int(value)
if key == 'num_tracks':
value = int(value)
if key == 'num_videos':
value = int(value)
if key == 'max_speed':
value = float(value)
if key == 'multispectral':
value = bool(value)
if key == 'multisensor':
value = bool(value)
if key == 'render':
value = bool(value)
# if key.startswith('rand'):
# pass
if key in {'randgsize', 'randsize', 'image_sizerandom'}:
key = 'image_size'
value = 'random'
if key == 'amazon':
# Hack to put in amazon background
key = 'render'
if isinstance(vidkw[key], dict):
value = vidkw[key]
else:
value = {}
value['background'] = 'amazon'
vidkw[key] = value
vidkw.update(kwargs)
if 'background' in vidkw:
vidkw['render'] = {
'background': vidkw.pop('background'),
}
if isinstance(vidkw['image_size'], int):
vidkw['image_size'] = (vidkw['image_size'], vidkw['image_size'])
use_cache = vidkw.pop('use_cache', True)
if 'rng' not in vidkw:
# Make rng determined by other params by default
vidkw['rng'] = int(ub.hash_data(sorted(vidkw.items()))[0:8], 16)
depends_items = vidkw.copy()
depends_items.pop('verbose', None)
depends = ub.hash_data(sorted(depends_items.items()), base='abc')[0:14]
if verbose > 0:
print('vidkw = {}'.format(ub.urepr(vidkw, nl=1)))
if verbose > 3:
print('depends = {!r}'.format(depends))
tag = key + '_' + depends
dpath = vidkw.pop('dpath', None)
fpath = vidkw.pop('fpath', None)
bundle_dpath = vidkw.get('bundle_dpath', None)
if fpath is not None:
bundle_dpath = ub.Path(fpath).parent
elif dpath is not None:
bundle_dpath = dpath
if bundle_dpath is None:
# Even if the cache is off, we still will need this because it
# will write rendered data to disk. Perhaps we can make this
# optional in the future.
dpath = ub.Path.appdir('kwcoco', 'demo_vidshapes').ensuredir()
bundle_dpath = vidkw['bundle_dpath'] = join(dpath, tag)
cache_dpath = join(bundle_dpath, '_cache')
if fpath is None:
fpath = join(bundle_dpath, 'data.kwcoco.json')
stamp = ub.CacheStamp(
'vidshape_stamp_v{:03d}'.format(toydata_video.TOYDATA_VIDEO_VERSION),
dpath=cache_dpath,
depends=depends, enabled=use_cache,
product=[fpath], verbose=verbose,
meta=vidkw
)
if verbose > 3:
print('stamp = {!r}'.format(stamp))
if stamp.expired():
vidkw['dpath'] = bundle_dpath
vidkw.pop('bundle_dpath', None)
self = toydata_video.random_video_dset(**vidkw)
if verbose > 3:
print('self.fpath = {!r}'.format(self.fpath))
print('self.bundle_dpath = {!r}'.format(self.bundle_dpath))
self.fpath = fpath
if fpath is not None:
ub.Path(fpath).parent.ensuredir()
self.dump(fpath, newlines=True)
stamp.renew()
else:
self = cls(data=fpath, bundle_dpath=bundle_dpath)
elif key == 'photos':
dataset = demo_coco_data()
self = cls(dataset, tag=key)
else:
raise KeyError(key)
return self
[docs]
@classmethod
def random(cls, rng=None):
"""
Creates a random CocoDataset according to distribution parameters
TODO:
- [ ] parametarize
"""
from kwcoco.demo.toydata_video import random_single_video_dset
dset = random_single_video_dset(num_frames=5, num_tracks=3,
tid_start=1, rng=rng)
return dset
[docs]
class MixinCocoHashing:
"""
Mixin for creating and maintaining hashids (i.e. content identifiers)
"""
[docs]
def _build_hashid(self, hash_pixels=False, verbose=0):
"""
Construct a hash that uniquely identifies the state of this dataset.
Args:
hash_pixels (bool):
If False the image data is not included in the hash, which can
speed up computation, but is not 100% robust. Defaults to
False.
verbose (int): verbosity level
Returns:
str: the hashid
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> self._build_hashid(hash_pixels=True, verbose=3)
...
>>> # Shorten hashes for readability
>>> import ubelt as ub
>>> walker = ub.IndexableWalker(self.hashid_parts)
>>> for path, val in walker:
>>> if isinstance(val, str):
>>> walker[path] = val[0:8]
>>> # Note: this may change in different versions of kwcoco
>>> print('self.hashid_parts = ' + ub.urepr(self.hashid_parts))
>>> print('self.hashid = {!r}'.format(self.hashid[0:8]))
self.hashid_parts = {
'annotations': {
'json': 'c1d1b9c3',
'num': 11,
},
'images': {
'pixels': '88e37cc3',
'json': '9b8e8be3',
'num': 3,
},
'categories': {
'json': '82d22e00',
'num': 8,
},
}
self.hashid = 'bf69bf15'
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> self._build_hashid(hash_pixels=True, verbose=3)
>>> self.hashid_parts
>>> # Test that when we modify the dataset only the relevant
>>> # hashid parts are recomputed.
>>> orig = self.hashid_parts['categories']['json']
>>> self.add_category('foobar')
>>> assert 'categories' not in self.hashid_parts
>>> self.hashid_parts
>>> self.hashid_parts['images']['json'] = 'should not change'
>>> self._build_hashid(hash_pixels=True, verbose=3)
>>> assert self.hashid_parts['categories']['json']
>>> assert self.hashid_parts['categories']['json'] != orig
>>> assert self.hashid_parts['images']['json'] == 'should not change'
"""
# Construct nested container that we will populate with hashable
# info corresponding to each type of data that we track.
hashid_parts = self.hashid_parts
if hashid_parts is None:
hashid_parts = OrderedDict()
# TODO: hashid for videos? Maybe?
# Ensure hashid_parts has the proper root structure
# TODO: rely on subet of SPEC_KEYS
parts = ['annotations', 'images', 'categories']
for part in parts:
if not hashid_parts.get(part, None):
hashid_parts[part] = OrderedDict()
rebuild_parts = []
reuse_parts = []
gids = None
if hash_pixels:
if not hashid_parts['images'].get('pixels', None):
gids = sorted(self.imgs.keys())
gpaths = [join(self.bundle_dpath, gname)
for gname in self.images(gids).lookup('file_name')]
gpath_sha512s = [
ub.hash_file(gpath, hasher='sha512')
for gpath in ub.ProgIter(gpaths, desc='hashing images',
verbose=verbose)
]
hashid_parts['images']['pixels'] = ub.hash_data(gpath_sha512s)
rebuild_parts.append('images.pixels')
else:
reuse_parts.append('images.pixels')
# Hash individual components
with ub.Timer(label='hash coco parts', verbose=verbose > 1):
# Dumping annots to json takes the longest amount of time
# However, its faster than hashing the data directly
def _ditems(d):
# return sorted(d.items())
return list(d.items()) if isinstance(d, OrderedDict) else sorted(d.items())
if not hashid_parts['annotations'].get('json', None):
aids = sorted(self.anns.keys())
_anns_ordered = (self.anns[aid] for aid in aids)
anns_ordered = [_ditems(ann) for ann in _anns_ordered]
try:
anns_text = json_w.dumps(anns_ordered)
except TypeError:
if __debug__:
for ann in anns_ordered:
try:
json_w.dumps(ann)
except TypeError:
print('FAILED TO ENCODE ann = {!r}'.format(ann))
break
raise
hashid_parts['annotations']['json'] = ub.hash_data(
anns_text, hasher='sha512')
hashid_parts['annotations']['num'] = len(aids)
rebuild_parts.append('annotations.json')
else:
reuse_parts.append('annotations.json')
if not hashid_parts['images'].get('json', None):
if gids is None:
gids = sorted(self.imgs.keys())
imgs_text = json_w.dumps(
[_ditems(self.imgs[gid]) for gid in gids])
hashid_parts['images']['json'] = ub.hash_data(
imgs_text, hasher='sha512')
hashid_parts['images']['num'] = len(gids)
rebuild_parts.append('images.json')
else:
reuse_parts.append('images.json')
if not hashid_parts['categories'].get('json', None):
cids = sorted(self.cats.keys())
cats_text = json_w.dumps(
[_ditems(self.cats[cid]) for cid in cids])
hashid_parts['categories']['json'] = ub.hash_data(
cats_text, hasher='sha512')
hashid_parts['categories']['num'] = len(cids)
rebuild_parts.append('categories.json')
else:
reuse_parts.append('categories.json')
# TODO:
# if not hashid_parts['tracks'].get('json', None):
# cids = sorted(self.index.tracks.keys())
# cats_text = json_w.dumps(
# [_ditems(self.index.tracks[cid]) for cid in cids])
# hashid_parts['tracks']['json'] = ub.hash_data(
# cats_text, hasher='sha512')
# hashid_parts['tracks']['num'] = len(cids)
# rebuild_parts.append('tracks.json')
# else:
# reuse_parts.append('tracks.json')
if verbose > 1:
if reuse_parts:
print('Reused hashid_parts: {}'.format(reuse_parts))
print('Rebuilt hashid_parts: {}'.format(rebuild_parts))
hashid = ub.hash_data(hashid_parts)
self.hashid = hashid
self.hashid_parts = hashid_parts
return hashid
[docs]
def _invalidate_hashid(self, parts=None):
"""
Called whenever the coco dataset is modified. It is possible to specify
which parts were modified so unmodified parts can be reused next time
the hash is constructed.
TODO:
- [ ] Rename to _notify_modification --- or something like that
"""
self._state['was_modified'] = True
self.hashid = None
if parts is not None and self.hashid_parts is not None:
for part in parts:
self.hashid_parts.pop(part, None)
else:
self.hashid_parts = None
[docs]
def _cached_hashid(self):
"""
Under Construction.
The idea is to cache the hashid when we are sure that the dataset was
loaded from a file and has not been modified. We can record the
modification time of the file (because we know it hasn't changed in
memory), and use that as a key to the cache. If the modification time
on the file is different than the one recorded in the cache, we know
the cache could be invalid, so we recompute the hashid.
TODO:
- [ ] This is reasonably stable, elevate this to a public API function.
"""
cache_miss = True
enable_cache = (
self._state['was_loaded'] and
not self._state['was_modified']
)
if enable_cache:
coco_fpath = ub.Path(self.fpath)
enable_cache = coco_fpath.exists()
if enable_cache:
cache_dpath = (coco_fpath.parent / '_cache')
cache_fname = coco_fpath.name + '.hashid.cache'
hashid_sidecar_fpath = cache_dpath / cache_fname
# Generate current lookup key
fpath_stat = coco_fpath.stat()
status_key = {
'st_size': fpath_stat.st_size,
'st_mtime': fpath_stat.st_mtime
}
if hashid_sidecar_fpath.exists():
cached_data = json_r.loads(hashid_sidecar_fpath.read_text())
if cached_data['status_key'] == status_key:
self.hashid = cached_data['hashid']
self.hashid_parts = cached_data['hashid_parts']
cache_miss = False
if cache_miss:
self._build_hashid()
if enable_cache:
hashid_cache_data = {
'hashid': self.hashid,
'hashid_parts': self.hashid_parts,
'status_key': status_key,
}
try:
hashid_sidecar_fpath.parent.ensuredir()
hashid_sidecar_fpath.write_text(json_w.dumps(hashid_cache_data))
except PermissionError as ex:
warnings.warn(f'Cannot write a cached hashid: {repr(ex)}')
return self.hashid
[docs]
@classmethod
def _cached_hashid_for(cls, fpath):
"""
Lookup the cached hashid for a kwcoco json file if it exists.
"""
coco_fpath = ub.Path(fpath)
enable_cache = coco_fpath.exists()
if enable_cache:
cache_dpath = (coco_fpath.parent / '_cache')
cache_fname = coco_fpath.name + '.hashid.cache'
hashid_sidecar_fpath = cache_dpath / cache_fname
# Generate current lookup key
fpath_stat = coco_fpath.stat()
status_key = {
'st_size': fpath_stat.st_size,
'st_mtime': fpath_stat.st_mtime
}
if hashid_sidecar_fpath.exists():
cached_data = json_r.loads(hashid_sidecar_fpath.read_text())
if cached_data['status_key'] == status_key:
cached_data['hashid']
cached_data['hashid_parts']
cache_miss = False
if cache_miss:
# TODO: fixme?
# self._build_hashid()
# if enable_cache:
# hashid_cache_data = {
# 'hashid': self.hashid,
# 'hashid_parts': self.hashid_parts,
# 'status_key': status_key,
# }
# hashid_sidecar_fpath.parent.ensuredir()
# hashid_sidecar_fpath.write_text(json_w.dumps(hashid_cache_data))
raise Exception("cache miss")
[docs]
class MixinCocoObjects:
"""
Expose methods to construct object lists / groups.
This is an alternative vectorized ORM-like interface to the coco dataset
"""
[docs]
def annots(self, annot_ids=None, image_id=None, track_id=None, video_id=None, trackid=None, aids=None, gid=None):
"""
Return vectorized annotation objects
Args:
annot_ids (List[int] | None):
annotation ids to reference, if unspecified all annotations are
returned. An alias is "aids", which may be removed in the future.
image_id (int | None):
return all annotations that belong to this image id.
Mutually exclusive with other arguments.
An alias is "gids", which may be removed in the future.
track_id (int | None):
return all annotations that belong to this track.
mutually exclusive with other arguments.
An alias is "trackid", which may be removed in the future.
video_id (int | None):
return all annotations that belong to this video.
mutually exclusive with other arguments.
Returns:
kwcoco.coco_objects1d.Annots: vectorized annotation object
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> annots = self.annots()
>>> print(annots)
<Annots(num=11)>
>>> sub_annots = annots.take([1, 2, 3])
>>> print(sub_annots)
<Annots(num=3)>
>>> print(ub.urepr(sub_annots.get('bbox', None)))
[
[350, 5, 130, 290],
None,
None,
]
"""
if image_id is None:
image_id = gid
if annot_ids is None:
annot_ids = aids
if trackid is not None:
ub.schedule_deprecation(
'kwcoco', 'trackid', 'argument of CocoDataset.annots',
migration='Use "track_id" instead.',
deprecate='0.5.9', error='1.0.0', remove='1.1.0',
)
track_id = trackid
if image_id is not None:
annot_ids = sorted(self.index.gid_to_aids[image_id])
if track_id is not None:
annot_ids = self.index.trackid_to_aids[track_id]
if video_id is not None:
annot_ids = sorted(ub.flatten([
self.index.gid_to_aids[gid]
for gid in self.index.vidid_to_gids[video_id]]))
if annot_ids is None:
annot_ids = sorted(self.index.anns.keys())
return Annots(annot_ids, self)
[docs]
def images(self, image_ids=None, video_id=None, names=None, gids=None, vidid=None):
"""
Return vectorized image objects
Args:
image_ids (List[int] | None): image ids to reference, if unspecified
all images are returned. An alias is `gids`.
video_id (int | None): returns all images that belong to this video id.
mutually exclusive with `image_ids` arg.
names (List[str] | None):
lookup images by their names.
Returns:
kwcoco.coco_objects1d.Images: vectorized image object
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> images = self.images()
>>> print(images)
<Images(num=3)>
>>> self = kwcoco.CocoDataset.demo('vidshapes2')
>>> video_id = 1
>>> images = self.images(video_id=video_id)
>>> assert all(v == video_id for v in images.lookup('video_id'))
>>> print(images)
<Images(num=2)>
"""
if image_ids is None:
image_ids = gids
if vidid is not None:
ub.schedule_deprecation(
'kwcoco', 'vidid', 'argument of CocoDataset.images',
migration='Use "video_id" instead.',
deprecate='0.5.0', error='1.0.0', remove='1.1.0',
)
video_id = vidid
if video_id is not None:
image_ids = self.index.vidid_to_gids[video_id]
if names is not None:
image_ids = [self.index.name_to_img[name]['id'] for name in names]
if image_ids is None:
image_ids = sorted(self.index.imgs.keys())
return Images(image_ids, self)
[docs]
def categories(self, category_ids=None, cids=None):
"""
Return vectorized category objects
Args:
category_ids (List[int] | None):
category ids to reference, if unspecified all categories are
returned. The `cids` argument is an alias.
Returns:
kwcoco.coco_objects1d.Categories: vectorized category object
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> categories = self.categories()
>>> print(categories)
<Categories(num=8)>
"""
if category_ids is None:
category_ids = cids
if cids is not None:
ub.schedule_deprecation(
'kwcoco', 'cids', 'argument of CocoDataset.categories',
migration='Use "category_ids" instead.',
deprecate='0.7.3', error='1.0.0', remove='1.1.0',
)
if category_ids is None:
category_ids = sorted(self.index.cats.keys())
return Categories(category_ids, self)
[docs]
def videos(self, video_ids=None, names=None, vidids=None):
"""
Return vectorized video objects
Args:
video_ids (List[int] | None):
video ids to reference, if unspecified all videos are returned.
The `vidids` argument is an alias. Mutually exclusive with
other args.
names (List[str] | None):
lookup videos by their name.
Mutually exclusive with other args.
Returns:
kwcoco.coco_objects1d.Videos: vectorized video object
TODO:
- [ ] This conflicts with what should be the property that
should redirect to ``index.videos``, we should resolve this
somehow. E.g. all other main members of the index (anns, imgs,
cats) have a toplevel dataset property, we don't have one for
videos because the name we would pick conflicts with this.
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('vidshapes2')
>>> videos = self.videos()
>>> print(videos)
>>> videos.lookup('name')
>>> videos.lookup('id')
>>> print('videos.objs = {}'.format(ub.urepr(videos.objs[0:2], nl=1)))
"""
if video_ids is None:
video_ids = vidids
if video_ids is None:
video_ids = sorted(self.index.videos.keys())
if names is not None:
video_ids = [self.index.name_to_video[name]['id'] for name in names]
return Videos(video_ids, self)
[docs]
def tracks(self, track_ids=None, names=None):
"""
Return vectorized track objects
Args:
track_ids (List[int] | None):
track ids to reference, if unspecified all tracks are returned.
names (List[str] | None):
lookup tracks by their name.
Mutually exclusive with other args.
Returns:
kwcoco.coco_objects1d.Tracks: vectorized video object
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('vidshapes2')
>>> tracks = self.tracks()
>>> print(tracks)
>>> tracks.lookup('name')
>>> tracks.lookup('id')
>>> print('tracks.objs = {}'.format(ub.urepr(tracks.objs[0:2], nl=1)))
"""
if track_ids is None:
track_ids = sorted(self.index.tracks.keys())
if names is not None:
track_ids = [self.index.name_to_track[name]['id'] for name in names]
return Tracks(track_ids, self)
[docs]
class MixinCocoStats:
"""
Methods for getting stats about the dataset
"""
@property
def n_annots(self):
""" The number of annotations in the dataset """
return len(self.dataset.get('annotations', []))
@property
def n_images(self):
""" The number of images in the dataset """
return len(self.dataset.get('images', []))
@property
def n_cats(self):
""" The number of categories in the dataset """
return len(self.dataset.get('categories', []))
@property
def n_tracks(self):
""" The number of tracks in the dataset """
return len(self.dataset.get('tracks', []))
@property
def n_videos(self):
""" The number of videos in the dataset """
return len(self.dataset.get('videos', []))
[docs]
def category_annotation_frequency(self):
"""
Reports the number of annotations of each category
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> hist = self.category_annotation_frequency()
>>> print(ub.urepr(hist))
{
'astroturf': 0,
'human': 0,
'astronaut': 1,
'astronomer': 1,
'helmet': 1,
'rocket': 1,
'mouth': 2,
'star': 5,
}
"""
catname_to_nannots = ub.map_keys(
lambda x: None if x is None else self.cats[x]['name'],
ub.map_vals(len, self.index.cid_to_aids))
catname_to_nannots = ub.odict(sorted(catname_to_nannots.items(),
key=lambda kv: (kv[1], kv[0])))
return catname_to_nannots
[docs]
def validate(self, **config):
"""
Performs checks on this coco dataset.
Corresponds to the ``kwcoco validate`` CLI tool.
Args:
**config :
schema (default=True): if True, validate the json-schema
unique (default=True): if True, validate unique secondary keys
missing (default=True): if True, validate registered files exist
corrupted (default=False): if True, validate data in registered files
channels (default=True):
if True, validate that channels in auxiliary/asset items
are all unique.
require_relative (default=False):
if True, causes validation to fail if paths are
non-portable, i.e. all paths must be relative to the
bundle directory. if>0, paths must be relative to bundle
root. if>1, paths must be inside bundle root.
img_attrs (default='warn'):
if truthy, check that image attributes contain width and
height entries. If 'warn', then warn if they do not exist.
If 'error', then fail.
verbose (default=1): verbosity flag
workers (int): number of workers for parallel checks. defaults to 0
fastfail (default=False): if True raise errors immediately
Returns:
dict: result containing keys -
status (bool): False if any errors occurred
errors (List[str]): list of all error messages
missing (List): List of any missing images
corrupted (List): List of any corrupted images
SeeAlso:
:func:`_check_integrity` - performs internal checks
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> import pytest
>>> with pytest.warns(UserWarning):
>>> result = self.validate()
>>> assert not result['errors']
>>> assert result['warnings']
"""
dset = self
result = {
'errors': [],
'warnings': [],
}
verbose = config.get('verbose', 1)
fastfail = config.get('fastfail', 0)
def _error(msg):
if verbose:
print(msg)
if fastfail:
raise Exception(msg)
result['errors'].append(msg)
def _warn(msg):
if verbose:
print(msg)
warnings.warn(msg, UserWarning)
result['warnings'].append(msg)
if config.get('schema', True):
import jsonschema
from kwcoco.coco_schema import COCO_SCHEMA
if verbose:
print('Validate json-schema')
try:
COCO_SCHEMA.validate(dset.dataset)
except jsonschema.exceptions.ValidationError as ex:
err = ex
print(f'err.absolute_path={err.absolute_path}')
msg = 'Failed to validate schema: {}'.format(str(err))
_error(msg)
def _check_unique(dset, table_key, col_key, required=True):
if verbose:
print('Check table {!r} has unique {!r}'.format(
table_key, col_key))
items = dset.dataset.get(table_key, [])
seen = set()
num_unset = 0
for obj in items:
value = obj.get(col_key, None)
if value is None:
num_unset += 1
else:
if value in seen:
msg = 'Duplicate {} {} = {!r}'.format(
table_key, col_key, value)
_error(msg)
else:
seen.add(value)
if num_unset > 0:
msg = ub.paragraph(
'''
Table {!r} is missing {} / {} values for column {!r}
'''
).format(table_key, num_unset, len(items), col_key)
if required:
_error(msg)
else:
_warn(msg)
def _check_attrs(dset, table_key, col_key, required=True):
if verbose:
print('Check table {!r} entries have attr {!r}'.format(
table_key, col_key))
items = dset.dataset.get(table_key, [])
num_unset = 0
for obj in items:
value = obj.get(col_key, None)
if value is None:
num_unset += 1
if num_unset > 0:
msg = ub.paragraph(
'''
Table {!r} is missing {} / {} values for column {!r}
'''
).format(table_key, num_unset, len(items), col_key)
if required:
_error(msg)
else:
_warn(msg)
def _check_subtable_attrs(dset, table_key, subtable_keys, col_key, required=True):
if verbose:
print('Check subtable {!r} / {!r} entries have attr {!r}'.format(
table_key, subtable_keys, col_key))
items = dset.dataset.get(table_key, [])
num_unset = 0
num_subobjs = 0
for obj in items:
# hack for asset/auxiliary
_ = ub.dict_isect(obj, subtable_keys)
sub_objs = ub.peek(_.values()) if _ else None
if sub_objs:
for sub_obj in sub_objs:
num_subobjs += 1
value = sub_obj.get(col_key, None)
if value is None:
num_unset += 1
if num_unset > 0:
msg = ub.paragraph(
'''
Subtable {!r}/{!r} is missing {} / {} values for column {!r}
'''
).format(table_key, subtable_keys, num_unset, num_subobjs, col_key)
if required:
_error(msg)
else:
_warn(msg)
if config.get('unique', True):
_check_unique(dset, table_key='categories', col_key='name')
_check_unique(dset, table_key='videos', col_key='name')
_check_unique(dset, table_key='images', col_key='name',
required=False)
_check_unique(dset, table_key='images', col_key='id')
_check_unique(dset, table_key='videos', col_key='id')
_check_unique(dset, table_key='annotations', col_key='id')
_check_unique(dset, table_key='categories', col_key='id')
if config.get('img_attrs', False):
required = config.get('img_attrs', False) == 'error'
# Refine this?
_check_attrs(dset, table_key='images', col_key='width', required=required)
_check_attrs(dset, table_key='images', col_key='height', required=required)
_check_subtable_attrs(dset, table_key='images', subtable_keys=['asset', 'auxiliary'], col_key='width', required=required)
_check_subtable_attrs(dset, table_key='images', subtable_keys=['asset', 'auxiliary'], col_key='height', required=required)
if config.get('annot_attrs', True):
required = config.get('annot_attrs', False) == 'error'
_check_attrs(dset, table_key='annotations', col_key='bbox', required=False)
if config.get('channels', True):
for img in self.dataset.get('images', []):
seen = set()
assets = img.get('auxiliary', []) + img.get('assets', [])
for obj in assets:
channels = obj.get('channels', None)
if channels is None:
gid = img['id']
_error('Asset in gid={} is missing channels, obj={}'.format(gid, obj))
else:
from kwcoco import FusedChannelSpec
channels = FusedChannelSpec.coerce(channels)
for chan in channels.as_list():
if chan in seen:
gid = img['id']
_error('The chan={} is specified more than once in gid={}'.format(chan, gid))
seen.add(chan)
if config.get('missing', True):
missing = dset.missing_images(check_aux=True, verbose=verbose)
if missing:
msg = ub.paragraph(
f'''
There are {len(missing)} missing images.
The first one is {missing[0][1]!r}.
''')
_error(msg)
result['missing'] = missing
if config.get('corrupted', False):
corrupted = dset.corrupted_images(check_aux=True, verbose=verbose,
workers=config.get('workers', 0))
if corrupted:
msg = ub.paragraph(
f'''
There are {len(corrupted)} corrupted images.
The first one is {corrupted[0][1]!r}.
''')
_error(msg)
result['corrupted'] = corrupted
if config.get('require_relative', False):
rr = config.get('require_relative', False)
for gid in dset.imgs.keys():
coco_img = dset.coco_image(gid)
for obj in coco_img.iter_asset_objs():
fname = ub.Path(obj['file_name'])
if rr > 0:
if fname.is_absolute():
_error('non relative fname = {!r}'.format(fname))
if rr > 1:
if '..' in fname.parts:
_error('non internal fname = {!r}'.format(fname))
return result
[docs]
def stats(self, **kwargs):
"""
Compute summary statistics to describe the dataset at a high level
This function corresponds to :mod:`kwcoco.cli.coco_stats`.
KWargs:
basic(bool): return basic stats', default=True
extended(bool): return extended stats', default=True
catfreq(bool): return category frequency stats', default=True
boxes(bool): return bounding box stats', default=False
annot_attrs(bool): return annotation attribute information', default=True
image_attrs(bool): return image attribute information', default=True
Returns:
dict: info
"""
config = kwargs
dset = self
info = {}
if config.get('basic', True):
info['basic'] = dset.basic_stats()
if config.get('extended', True):
info['extended'] = dset.extended_stats()
if config.get('catfreq', True):
info['catfreq'] = dset.category_annotation_frequency()
def varied(obj_lut):
attrs = ub.ddict(lambda: 0)
unique = ub.ddict(set)
for obj in obj_lut.values():
for key, value in obj.items():
if value:
attrs[key] += 1
try:
unique[key].add(value)
except TypeError:
unique[key].add(ub.hash_data(value, hasher='sha1'))
varied_attrs = {
key: {'num_unique': len(unique[key]), 'num_total': num}
for key, num in attrs.items()
}
return varied_attrs
if config.get('image_attrs', True):
image_attrs = varied(dset.index.imgs)
info['image_attrs'] = image_attrs
if config.get('annot_attrs', True):
annot_attrs = varied(dset.index.anns)
info['annot_attrs'] = annot_attrs
if config.get('video_attrs', True):
annot_attrs = varied(dset.index.videos)
info['video_attrs'] = annot_attrs
if config.get('boxes', False):
info['boxes'] = dset.boxsize_stats()
return info
[docs]
def basic_stats(self):
"""
Reports number of images, annotations, and categories.
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoStats.basic_stats`
:func:`kwcoco.coco_dataset.MixinCocoStats.extended_stats`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> print(ub.urepr(self.basic_stats()))
{
'n_anns': 11,
'n_imgs': 3,
'n_videos': 0,
'n_cats': 8,
'n_tracks': 0,
}
>>> from kwcoco.demo.toydata_video import random_video_dset
>>> dset = random_video_dset(render=True, num_frames=2, num_tracks=10, rng=0)
>>> print(ub.urepr(dset.basic_stats()))
{
'n_anns': 20,
'n_imgs': 2,
'n_videos': 1,
'n_cats': 3,
'n_tracks': 10,
}
"""
n_tracks = self.n_tracks
if len(self.index.trackid_to_aids) > n_tracks + 3:
n_tracks = len(self.index.trackid_to_aids)
if None in self.index.trackid_to_aids:
n_tracks -= 1
info = ub.odict([
('n_anns', self.n_annots),
('n_imgs', self.n_images),
('n_videos', self.n_videos),
('n_cats', self.n_cats),
('n_tracks', n_tracks),
])
return info
[docs]
def extended_stats(self):
"""
Reports number of images, annotations, and categories.
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoStats.basic_stats`
:func:`kwcoco.coco_dataset.MixinCocoStats.extended_stats`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> print(ub.urepr(self.extended_stats()))
"""
import kwarray
def mapping_stats(xid_to_yids):
n_yids = list(ub.map_vals(len, xid_to_yids).values())
return kwarray.stats_dict(n_yids, n_extreme=True)
if hasattr(self, '_all_rows_column_lookup'):
# Hack for SQL speed, still slow though
aid_to_cid = dict(self._all_rows_column_lookup(
'annotations', ['id', 'category_id']))
gid_to_cidfreq = {}
for gid, aids in self.index.gid_to_aids.items():
gid_to_cidfreq[gid] = ub.dict_hist(
ub.take(aid_to_cid, aids))
# for gid, aids in self.index.gid_to_aids.items():
# cids = self._column_lookup(
# 'annotations', 'category_id', rowids=aids)
# gid_to_cidfreq[gid] = ub.dict_hist(cids)
else:
# this is slow for sql.
gid_to_cidfreq = ub.map_vals(
lambda aids: ub.dict_hist([
self.anns[aid]['category_id'] for aid in aids]),
self.index.gid_to_aids)
gid_to_cids = {
gid: list(cidfreq.keys())
for gid, cidfreq in gid_to_cidfreq.items()
}
trackid_to_aids = ub.udict(self.index.trackid_to_aids) - {None}
img_has_anns = [int(len(aids) > 0) for gid, aids in self.index.gid_to_aids.items()]
imgs_with_annots = kwarray.stats_dict(img_has_anns, n_extreme=True, quantile=False)
return ub.odict([
('annots_per_img', mapping_stats(self.index.gid_to_aids)),
('imgs_per_cat', mapping_stats(self.index.cid_to_gids)),
('cats_per_img', mapping_stats(gid_to_cids)),
('annots_per_cat', mapping_stats(self.index.cid_to_aids)),
('imgs_per_video', mapping_stats(self.index.vidid_to_gids)),
('annots_per_track', mapping_stats(trackid_to_aids)),
('imgs_with_annots', imgs_with_annots),
])
[docs]
def boxsize_stats(self, anchors=None, perclass=True, gids=None, aids=None,
verbose=0, clusterkw={}, statskw={}):
"""
Compute statistics about bounding box sizes.
Also computes anchor boxes using kmeans if ``anchors`` is specified.
Args:
anchors (int | None): if specified also computes box anchors
via KMeans clustering
perclass (bool): if True also computes stats for each category
gids (List[int] | None):
if specified only compute stats for these image ids.
Defaults to None.
aids (List[int] | None):
if specified only compute stats for these annotation ids.
Defaults to None.
verbose (int): verbosity level
clusterkw (dict):
kwargs for :class:`sklearn.cluster.KMeans` used
if computing anchors.
statskw (dict): kwargs for :func:`kwarray.stats_dict`
Returns:
Dict[str, Dict[str, Dict | ndarray]]:
Stats are returned in width-height format.
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('shapes32')
>>> infos = self.boxsize_stats(anchors=4, perclass=False)
>>> print(ub.urepr(infos, nl=-1, precision=2))
>>> infos = self.boxsize_stats(gids=[1], statskw=dict(median=True))
>>> print(ub.urepr(infos, nl=-1, precision=2))
"""
import kwarray
import numpy as np
cname_to_box_sizes = defaultdict(list)
if bool(gids) and bool(aids):
raise ValueError('specifying gids and aids is mutually exclusive')
if gids is not None:
aids = ub.flatten(ub.take(self.index.gid_to_aids, gids))
if aids is not None:
anns = ub.take(self.anns, aids)
else:
anns = self.dataset.get('annotations', [])
for ann in anns:
if 'bbox' in ann:
cname = self.cats[ann['category_id']]['name']
cname_to_box_sizes[cname].append(ann['bbox'][2:4])
cname_to_box_sizes = ub.map_vals(np.array, cname_to_box_sizes)
def _boxes_info(box_sizes):
box_info = {
'stats': kwarray.stats_dict(box_sizes, axis=0, **statskw)
}
if anchors:
from sklearn import cluster
defaultkw = {
'n_clusters': anchors,
'n_init': 20,
'max_iter': 10000,
'tol': 1e-6,
'algorithm': 'elkan',
'verbose': verbose
}
kmkw = ub.dict_union(defaultkw, clusterkw)
algo = cluster.KMeans(**kmkw)
algo.fit(box_sizes)
anchor_sizes = algo.cluster_centers_
box_info['anchors'] = anchor_sizes
return box_info
infos = {}
if perclass:
cid_to_info = {}
for cname, box_sizes in cname_to_box_sizes.items():
if verbose:
print('compute {} bbox stats'.format(cname))
cid_to_info[cname] = _boxes_info(box_sizes)
infos['perclass'] = cid_to_info
if verbose:
print('compute all bbox stats')
all_sizes = np.vstack(list(cname_to_box_sizes.values()))
all_info = _boxes_info(all_sizes)
infos['all'] = all_info
return infos
[docs]
def find_representative_images(self, gids=None):
r"""
Find images that have a wide array of categories.
Attempt to find the fewest images that cover all categories using
images that contain both a large and small number of annotations.
Args:
gids (None | List): Subset of image ids to consider when finding
representative images. Uses all images if unspecified.
Returns:
List: list of image ids determined to be representative
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> gids = self.find_representative_images()
>>> print('gids = {!r}'.format(gids))
>>> gids = self.find_representative_images([3])
>>> print('gids = {!r}'.format(gids))
>>> self = kwcoco.CocoDataset.demo('shapes8')
>>> gids = self.find_representative_images()
>>> print('gids = {!r}'.format(gids))
>>> valid = {7, 1}
>>> gids = self.find_representative_images(valid)
>>> assert valid.issuperset(gids)
>>> print('gids = {!r}'.format(gids))
"""
import kwarray
if gids is None:
gids = sorted(self.imgs.keys())
gid_to_aids = self.index.gid_to_aids
else:
gid_to_aids = ub.dict_subset(self.index.gid_to_aids, gids)
all_cids = set(self.index.cid_to_aids.keys())
# Select representative images to draw such that each category
# appears at least once.
gid_to_cidfreq = ub.map_vals(
lambda aids: ub.dict_hist([self.anns[aid]['category_id']
for aid in aids]),
gid_to_aids)
gid_to_nannots = ub.map_vals(len, gid_to_aids)
gid_to_cids = {
gid: list(cidfreq.keys())
for gid, cidfreq in gid_to_cidfreq.items()
}
for gid, nannots in gid_to_nannots.items():
if nannots == 0:
# Add a dummy category to note images without any annotations
gid_to_cids[gid].append(-1)
all_cids.add(-1)
all_cids = list(all_cids)
# Solve setcover with different weight schemes to get a better
# representative sample.
candidate_sets = gid_to_cids.copy()
selected = {}
large_image_weights = gid_to_nannots
small_image_weights = ub.map_vals(lambda x: 1 / (x + 1), gid_to_nannots)
cover1 = kwarray.setcover(candidate_sets, items=all_cids)
selected.update(cover1)
candidate_sets = ub.dict_diff(candidate_sets, cover1)
cover2 = kwarray.setcover(
candidate_sets,
items=all_cids,
set_weights=large_image_weights)
selected.update(cover2)
candidate_sets = ub.dict_diff(candidate_sets, cover2)
cover3 = kwarray.setcover(
candidate_sets,
items=all_cids,
set_weights=small_image_weights)
selected.update(cover3)
candidate_sets = ub.dict_diff(candidate_sets, cover3)
selected_gids = sorted(selected.keys())
return selected_gids
[docs]
class MixinCocoDraw:
"""
Matplotlib / display functionality
"""
[docs]
def draw_image(self, gid, channels=None):
"""
Use kwimage to draw all annotations on an image and return the pixels
as a numpy array.
Args:
gid (int): image id to draw
channels (kwcoco.ChannelSpec): the channel to draw on
Returns:
ndarray: canvas
SeeAlso
:func:`kwcoco.coco_dataset.MixinCocoDraw.draw_image`
:func:`kwcoco.coco_dataset.MixinCocoDraw.show_image`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('shapes8')
>>> self.draw_image(1)
>>> # Now you can dump the annotated image to disk / whatever
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(canvas)
"""
import kwimage
# Load the raw image pixels
coco_img = self.coco_image(gid)
delayed = coco_img.imdelay(space='image', channels=channels)
if delayed.channels is not None and delayed.channels.numel() > 3:
import warnings
warnings.warn('Requested drawing more than 3 channels. '
'Only picking first 3')
first_channels = coco_img.imdelay(space='image').channels.fuse()[0:3]
delayed = delayed.take_channels(first_channels)
canvas = delayed.finalize()
canvas = _normalize_intensity_if_needed(canvas)
# Get annotation IDs from this image
aids = self.index.gid_to_aids[gid]
# Grab relevant annotation dictionaries
anns = [self.anns[aid] for aid in aids]
# Transform them into a kwimage.Detections datastructure
dset = self
try:
classes = dset.object_categories()
except Exception:
classes = list(dset.name_to_cat.keys())
try:
kp_classes = dset.keypoint_categories()
except Exception:
# hack
anns = [ann.copy() for ann in anns]
for ann in anns:
ann.pop('keypoints', None)
kp_classes = None
cats = dset.dataset['categories']
# dets = kwimage.Detections.from_coco_annots(anns, dset=self)
dets = kwimage.Detections.from_coco_annots(
anns, classes=classes, cats=cats, kp_classes=kp_classes)
canvas = dets.draw_on(canvas)
return canvas
[docs]
def show_image(self, gid=None, aids=None, aid=None, channels=None, setlim=None, **kwargs):
"""
Use matplotlib to show an image with annotations overlaid
Args:
gid (int | None):
image id to show
aids (list | None):
aids to highlight within the image
aid (int | None):
a specific aid to focus on. If gid is not give, look up gid
based on this aid.
setlim (None | str):
if 'image' sets the limit to the image extent
**kwargs:
show_annots, show_aid, show_catname, show_kpname,
show_segmentation, title, show_gid, show_filename,
show_boxes,
SeeAlso
:func:`kwcoco.coco_dataset.MixinCocoDraw.draw_image`
:func:`kwcoco.coco_dataset.MixinCocoDraw.show_image`
Ignore:
# Programmatically collect the kwargs for docs generation
import xinspect
import kwcoco
kwargs = xinspect.get_kwargs(kwcoco.CocoDataset.show_image)
print(ub.urepr(list(kwargs.keys()), nl=1, si=1))
CommandLine:
xdoctest -m kwcoco.coco_dataset MixinCocoDraw.show_image --show
Example:
>>> # xdoctest: +REQUIRES(module:kwplot)
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi')
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> # xdoctest: -REQUIRES(--show)
>>> dset.show_image(gid=1, channels='B8')
>>> # xdoctest: +REQUIRES(--show)
>>> kwplot.show_if_requested()
"""
import matplotlib as mpl
from matplotlib import pyplot as plt
import kwimage
import kwplot
import numpy as np
figkw = {k: kwargs[k] for k in ['fnum', 'pnum', 'doclf', 'docla']
if k in kwargs}
if figkw:
kwplot.figure(**figkw)
if gid is None:
primary_ann = self.anns[aid]
gid = primary_ann['image_id']
show_all = kwargs.get('show_all', True)
show_annots = kwargs.get('show_annots', True)
show_labels = kwargs.get('show_labels', True)
highlight_aids = set()
if aid is not None:
highlight_aids.add(aid)
if aids is not None:
highlight_aids.update(aids)
coco_img = self.coco_image(gid)
img = coco_img.img
aids = self.index.gid_to_aids.get(img['id'], [])
# Collect annotation overlays
colored_segments = defaultdict(list)
keypoints = []
rects = []
texts = []
sseg_masks = []
sseg_polys = []
if show_annots:
for aid in aids:
ann = self.anns[aid]
if 'keypoints' in ann:
cid = ann['category_id']
if ann['keypoints'] is not None and len(ann['keypoints']) > 0:
# TODO: rely on kwimage.Points to parse multiple format info?
kpts_data = ann['keypoints']
if isinstance(ub.peek(kpts_data), dict):
xys = np.array([p['xy'] for p in kpts_data])
isvisible = np.array([p.get('visible', True) for p in kpts_data])
kpnames = None
isvisible = np.array([p.get('visible', True) for p in kpts_data])
else:
try:
kpnames = self._lookup_kpnames(cid)
except KeyError:
kpnames = None
kpts = np.array(ann['keypoints']).reshape(-1, 3)
isvisible = kpts.T[2] > 0
xys = kpts.T[0:2].T[isvisible]
else:
kpnames = None
xys = None
else:
kpnames = None
xys = None
# Note standard coco bbox is [x,y,width,height]
if 'bbox' in ann:
x1, y1 = ann['bbox'][0:2]
elif 'line' in ann:
x1, y1 = ann['line'][0:2]
elif 'keypoints' in ann:
x1, y1 = xys.min(axis=0)
else:
raise Exception('no bbox, line, or keypoint position')
cid = ann.get('category_id', None)
if cid is not None:
cat = self.cats[cid]
catname = cat['name']
else:
cat = None
catname = ann.get('category_name', 'None')
textkw = {
'horizontalalignment': 'left',
'verticalalignment': 'top',
'backgroundcolor': (0, 0, 0, .3),
'color': 'white',
'fontproperties': mpl.font_manager.FontProperties(
size=6, family='monospace'),
}
annot_text_parts = []
if show_labels:
if kwargs.get('show_aid', show_all):
annot_text_parts.append('aid={}'.format(aid))
if kwargs.get('show_catname', show_all):
annot_text_parts.append(catname)
if annot_text_parts:
annot_text = ' '.join(annot_text_parts)
texts.append((x1, y1, annot_text, textkw))
color = 'orange' if aid in highlight_aids else 'blue'
if 'obox' in ann:
# Oriented bounding box
segs = np.array(ann['obox']).reshape(-1, 3)[:, 0:2]
for pt1, pt2 in ub.iter_window(segs, wrap=True):
colored_segments[color].append([pt1, pt2])
elif 'bbox' in ann:
[x, y, w, h] = ann['bbox']
rect = mpl.patches.Rectangle((x, y), w, h, facecolor='none',
edgecolor=color)
rects.append(rect)
if 'line' in ann:
x1, y1, x2, y2 = ann['line']
pt1, pt2 = (x1, y1), (x2, y2)
colored_segments[color].append([pt1, pt2])
if 'keypoints' in ann:
if xys is not None and len(xys):
keypoints.append(xys)
if show_labels and kwargs.get('show_kpname', True):
if kpnames is not None:
for (kp_x, kp_y), kpname in zip(xys, kpnames):
texts.append((kp_x, kp_y, kpname, textkw))
if ann.get('segmentation', None) is not None and kwargs.get('show_segmentation', True):
sseg = ann['segmentation']
# Respect the 'color' attribute of categories
if cat is not None:
catcolor = cat.get('color', None)
else:
catcolor = None
if catcolor is not None:
catcolor = kwplot.Color(catcolor).as01()
# TODO: Unify masks and polygons into a kwimage
# segmentation class
sseg = kwimage.Segmentation.coerce(sseg).data
if isinstance(sseg, kwimage.Mask):
m = sseg.to_c_mask()
sseg_masks.append((m.data, catcolor))
else:
# TODO: interior
multipoly = sseg.to_multi_polygon()
for poly in multipoly.data:
poly_xys = poly.data['exterior'].data
polykw = {}
if catcolor is not None:
polykw['color'] = catcolor
poly = mpl.patches.Polygon(poly_xys, **polykw)
try:
# hack
poly.area = sseg.to_shapely().area
except Exception:
pass
sseg_polys.append(poly)
# Show image
avail_channels = coco_img.channels
if channels is None and avail_channels is not None:
avail_spec = avail_channels.fuse().normalize()
num_chans = avail_spec.numel()
if num_chans == 0:
raise Exception('no channels!')
elif num_chans <= 2:
print('Auto choosing 1 channel')
channels = avail_spec.parsed[0]
elif num_chans > 3:
print('Auto choosing 3 channel')
sensible_defaults = [
'red|green|blue',
'r|g|b',
]
chosen = None
for sensible in sensible_defaults:
cand = avail_spec & sensible
if cand.numel() == 3:
chosen = cand
break
if chosen is None:
chosen = avail_spec[0:3]
channels = chosen
print('loading image')
delayed = coco_img.imdelay(space='image', channels=channels)
np_img = delayed.finalize()
print('loaded image')
np_img = kwimage.atleast_3channels(np_img)
np_img01 = None
if np_img.max() > 255:
np_img01 = _normalize_intensity_if_needed(np_img)
fig = plt.gcf()
ax = fig.gca()
ax.cla()
if sseg_masks:
if np_img01 is None:
np_img01 = kwimage.ensure_float01(np_img)
layers = []
layers.append(kwimage.ensure_alpha_channel(np_img01))
distinct_colors = kwplot.Color.distinct(len(sseg_masks))
for (mask, _catcolor), col in zip(sseg_masks, distinct_colors):
if _catcolor is not None:
col = kwimage.ensure_float01(np.array(_catcolor)).tolist()
col = np.array(col + [1])[None, None, :]
alpha_mask = col * mask[:, :, None]
alpha_mask[..., 3] = mask * 0.5
layers.append(alpha_mask)
masked_img = kwimage.overlay_alpha_layers(layers[::-1])
ax.imshow(masked_img)
else:
if np_img01 is not None:
ax.imshow(np_img01)
else:
ax.imshow(np_img)
title = kwargs.get('title', None)
if title is None:
title_parts = []
if kwargs.get('show_gid', True):
title_parts.append('gid={}'.format(gid))
if kwargs.get('show_filename', True):
title_parts.append(str(img['file_name']))
title = ' '.join(title_parts)
if title:
ax.set_title(title)
if sseg_polys:
# print('sseg_polys = {!r}'.format(sseg_polys))
# hack: show smaller polygons first.
if len(sseg_polys):
areas = np.array([getattr(p, 'area', np.inf) for p in sseg_polys])
sortx = np.argsort(areas)[::-1]
sseg_polys = list(ub.take(sseg_polys, sortx))
poly_col = mpl.collections.PatchCollection(
sseg_polys, match_original=True, alpha=0.4)
ax.add_collection(poly_col)
# Show all annotations inside it
if kwargs.get('show_boxes', True):
for (x1, y1, catname, textkw) in texts:
ax.text(x1, y1, catname, **textkw)
for color, segments in colored_segments.items():
line_col = mpl.collections.LineCollection(segments, linewidths=2, color=color)
ax.add_collection(line_col)
rect_col = mpl.collections.PatchCollection(rects, match_original=True)
ax.add_collection(rect_col)
if keypoints:
xs, ys = np.vstack(keypoints).T
ax.plot(xs, ys, 'bo')
if setlim:
if not isinstance(setlim, str):
setlim = 'image'
if setlim == 'image':
ax.set_xlim(0, img['width'])
ax.set_ylim(img['height'], 0)
else:
raise NotImplementedError(setlim)
return ax
[docs]
def _normalize_intensity_if_needed(canvas):
# We really need a function that can adapt to dynamic ranges
# in a robust way that ensures at least something is visible.
# Normalize intensity is pretty good, but has common edge cases.
import kwimage
max_value = canvas.max()
min_value = canvas.min()
if canvas.dtype.kind in ['u', 'i'] and max_value > 255 or min_value < 0:
canvas = kwimage.normalize_intensity(canvas)
elif max_value > 0 or min_value < 0:
canvas = kwimage.normalize_intensity(canvas)
return canvas
[docs]
class MixinCocoAddRemove:
"""
Mixin functions to dynamically add / remove annotations images and
categories while maintaining lookup indexes.
"""
[docs]
def add_video(self, name, id=None, **kw):
"""
Register a new video with the dataset
Args:
name (str): Unique name for this video.
id (None | int): ADVANCED. Force using this image id.
**kw : stores arbitrary key/value pairs in this new video
Returns:
int : the video id assigned to the new video
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset()
>>> print('self.index.videos = {}'.format(ub.urepr(self.index.videos, nl=1)))
>>> print('self.index.imgs = {}'.format(ub.urepr(self.index.imgs, nl=1)))
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
>>> vidid1 = self.add_video('foo', id=3)
>>> vidid2 = self.add_video('bar')
>>> vidid3 = self.add_video('baz')
>>> print('self.index.videos = {}'.format(ub.urepr(self.index.videos, nl=1)))
>>> print('self.index.imgs = {}'.format(ub.urepr(self.index.imgs, nl=1)))
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
>>> gid1 = self.add_image('foo1.jpg', video_id=vidid1, frame_index=0)
>>> gid2 = self.add_image('foo2.jpg', video_id=vidid1, frame_index=1)
>>> gid3 = self.add_image('foo3.jpg', video_id=vidid1, frame_index=2)
>>> gid4 = self.add_image('bar1.jpg', video_id=vidid2, frame_index=0)
>>> print('self.index.videos = {}'.format(ub.urepr(self.index.videos, nl=1)))
>>> print('self.index.imgs = {}'.format(ub.urepr(self.index.imgs, nl=1)))
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
>>> self.remove_images([gid2])
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
"""
if id is None:
id = self._next_ids.get('videos')
video = ub.odict()
video['id'] = id
video['name'] = name
video.update(**kw)
try:
self.dataset['videos'].append(video)
except KeyError:
self.dataset['videos'] = []
self.dataset['videos'].append(video)
self.index._add_video(id, video)
# self._invalidate_hashid(['videos'])
return id
[docs]
def add_image(self, file_name=None, id=None, **kw):
"""
Register a new image with the dataset
Args:
file_name (str | None): relative or absolute path to image.
if not given, then "name" must be specified and we will
expect that "auxiliary" assets are eventually added.
id (None | int): ADVANCED. Force using this image id.
name (str): a unique key to identify this image
width (int): base width of the image
height (int): base height of the image
channels (ChannelSpec): specification of base channels.
Only relevant if file_name is given.
auxiliary (List[Dict]): specification of auxiliary assets.
See :func:`CocoImage.add_asset` for details
video_id (int): id of parent video, if applicable
frame_index (int): frame index in parent video
timestamp (number | str): timestamp of frame index
warp_img_to_vid (Dict): this transform is used to align
the image to a video if it belongs to one.
**kw : stores arbitrary key/value pairs in this new image
Returns:
int : the image id assigned to the new image
SeeAlso:
:func:`add_image`
:func:`add_images`
:func:`ensure_image`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> import kwimage
>>> gname = kwimage.grab_test_image_fpath('paraview')
>>> gid = self.add_image(gname)
>>> assert self.imgs[gid]['file_name'] == gname
"""
if id is None:
id = self._next_ids.get('images')
elif self.imgs and id in self.imgs:
raise exceptions.DuplicateAddError('Image id={} already exists'.format(id))
img = _dict()
img['id'] = int(id)
try:
img['file_name'] = os.fspath(file_name)
except TypeError:
img['file_name'] = file_name
img.update(**kw)
self.index._add_image(id, img)
self.dataset['images'].append(img)
self._invalidate_hashid()
return id
[docs]
def add_asset(self, gid, file_name=None, channels=None, **kwargs):
"""
Adds an auxiliary / asset item to the image dictionary.
Args:
gid (int):
The image id to add the auxiliary/asset item to.
file_name (str | None):
The name of the file relative to the bundle directory. If
unspecified, imdata must be given.
channels (str | kwcoco.FusedChannelSpec):
The channel code indicating what each of the bands represents.
These channels should be disjoint wrt to the existing data in
this image (this is not checked).
**kwargs:
See :func:`CocoImage.add_asset` for more details
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset()
>>> gid = dset.add_image(name='my_image_name', width=200, height=200)
>>> dset.add_asset(gid, 'path/fake_B0.tif', channels='B0', width=200,
>>> height=200, warp_aux_to_img={'scale': 1.0})
"""
coco_img = self.coco_image(gid)
coco_img.add_asset(file_name=file_name, channels=channels, **kwargs)
# TODO: deprecated add_auxiliary_item for add_asset
add_auxiliary_item = add_asset
[docs]
def add_annotation(self, image_id, category_id=None, bbox=ub.NoParam,
segmentation=ub.NoParam, keypoints=ub.NoParam, id=None,
track_id=None, **kw):
"""
Register a new annotation with the dataset
Args:
image_id (int): image_id the annotation is added to.
category_id (int | None): category_id for the new annotation
bbox (list | kwimage.Boxes): bounding box in xywh format
segmentation (Dict | List | Any): keypoints in some
accepted format, see :func:`kwimage.Mask.to_coco` and
:func:`kwimage.MultiPolygon.to_coco`.
Extended types: `MaskLike | MultiPolygonLike`.
keypoints (Any): keypoints in some accepted
format, see :func:`kwimage.Keypoints.to_coco`.
Extended types: `KeypointsLike`.
id (None | int): Force using this annotation id. Typically you
should NOT specify this. A new unused id will be chosen and
returned.
track_id (int | str | None):
Some value used to associate annotations that belong to the
same "track". In the future we may remove support for strings.
**kw : stores arbitrary key/value pairs in this new image,
Common respected key/values include but are not limited to the
following:
score : float
prob : List[float]
weight (float): a weight, usually used to indicate if a ground
truth annotation is difficult / important. This generalizes
standard "is_hard" or "ignore" attributes in other formats.
caption (str): a text caption for this annotation
Returns:
int : the annotation id assigned to the new annotation
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_annotation`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_annotations`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> image_id = 1
>>> cid = 1
>>> bbox = [10, 10, 20, 20]
>>> aid = self.add_annotation(image_id, cid, bbox)
>>> assert self.anns[aid]['bbox'] == bbox
Example:
>>> import kwimage
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> new_det = kwimage.Detections.random(1, segmentations=True, keypoints=True)
>>> # kwimage datastructures have methods to convert to coco recognized formats
>>> new_ann_data = list(new_det.to_coco(style='new'))[0]
>>> image_id = 1
>>> aid = self.add_annotation(image_id, **new_ann_data)
>>> # Lookup the annotation we just added
>>> ann = self.index.anns[aid]
>>> print('ann = {}'.format(ub.urepr(ann, nl=-2)))
Example:
>>> # Attempt to add annot without a category or bbox
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> image_id = 1
>>> aid = self.add_annotation(image_id)
>>> assert None in self.index.cid_to_aids
Example:
>>> # Attempt to add annot using various styles of kwimage structures
>>> import kwcoco
>>> import kwimage
>>> self = kwcoco.CocoDataset.demo()
>>> image_id = 1
>>> #--
>>> kw = {}
>>> kw['segmentation'] = kwimage.Polygon.random()
>>> kw['keypoints'] = kwimage.Points.random()
>>> aid = self.add_annotation(image_id, **kw)
>>> ann = self.index.anns[aid]
>>> print('ann = {}'.format(ub.urepr(ann, nl=2)))
>>> #--
>>> kw = {}
>>> kw['segmentation'] = kwimage.Mask.random()
>>> aid = self.add_annotation(image_id, **kw)
>>> ann = self.index.anns[aid]
>>> assert ann.get('segmentation', None) is not None
>>> print('ann = {}'.format(ub.urepr(ann, nl=2)))
>>> #--
>>> kw = {}
>>> kw['segmentation'] = kwimage.Mask.random().to_array_rle()
>>> aid = self.add_annotation(image_id, **kw)
>>> ann = self.index.anns[aid]
>>> assert ann.get('segmentation', None) is not None
>>> print('ann = {}'.format(ub.urepr(ann, nl=2)))
>>> #--
>>> kw = {}
>>> kw['segmentation'] = kwimage.Polygon.random().to_coco()
>>> kw['keypoints'] = kwimage.Points.random().to_coco()
>>> aid = self.add_annotation(image_id, **kw)
>>> ann = self.index.anns[aid]
>>> assert ann.get('segmentation', None) is not None
>>> assert ann.get('keypoints', None) is not None
>>> print('ann = {}'.format(ub.urepr(ann, nl=2)))
"""
try:
import kwimage
except ImportError:
kwimage = None
if id is None:
id = self._next_ids.get('annotations')
elif self.anns and id in self.anns:
raise IndexError('Annot id={} already exists'.format(id))
ann = _dict()
ann['id'] = int(id)
ann['image_id'] = int(image_id)
ann['category_id'] = None if category_id is None else int(category_id)
if kwimage is not None and hasattr(bbox, 'to_coco'):
# to_coco works different for boxes atm, might update in future
try:
ann['bbox'] = ub.peek(bbox.to_coco(style='new'))
except Exception:
ann['bbox'] = bbox.to_xywh().data.tolist()
elif bbox is not ub.NoParam:
ann['bbox'] = bbox
else:
assert bbox is ub.NoParam
if kwimage is not None and hasattr(keypoints, 'to_coco'):
ann['keypoints'] = keypoints.to_coco(style='new')
elif keypoints is not ub.NoParam:
ann['keypoints'] = keypoints
else:
assert keypoints is ub.NoParam
if kwimage is not None and hasattr(segmentation, 'to_coco'):
ann['segmentation'] = segmentation.to_coco(style='new')
elif segmentation is not ub.NoParam:
ann['segmentation'] = segmentation
else:
assert segmentation is ub.NoParam
if track_id is not None:
ann['track_id'] = track_id
ann.update(**kw)
self.dataset['annotations'].append(ann)
self.index._add_annotation(id, image_id, category_id, track_id, ann)
self._invalidate_hashid(['annotations'])
return id
[docs]
def add_category(self, name, supercategory=None, id=None, **kw):
"""
Register a new category with the dataset
Args:
name (str): name of the new category
supercategory (str | None): parent of this category
id (int | None): use this category id, if it was not taken
**kw : stores arbitrary key/value pairs in this new image
Returns:
int : the category id assigned to the new category
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_category`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.ensure_category`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> prev_n_cats = self.n_cats
>>> cid = self.add_category('dog', supercategory='object')
>>> assert self.cats[cid]['name'] == 'dog'
>>> assert self.n_cats == prev_n_cats + 1
>>> import pytest
>>> with pytest.raises(ValueError):
>>> self.add_category('dog', supercategory='object')
"""
index = self.index
if index.cats and name in index.name_to_cat:
raise exceptions.DuplicateAddError('Category name={!r} already exists'.format(name))
if id is None:
id = self._next_ids.get('categories')
elif index.cats and id in index.cats:
raise exceptions.DuplicateAddError('Category id={} already exists'.format(id))
cat = _dict()
cat['id'] = int(id)
cat['name'] = str(name)
if supercategory:
cat['supercategory'] = supercategory
cat.update(**kw)
# Add to raw data structure
self.dataset['categories'].append(cat)
# And add to the indexes
index._add_category(id, name, cat)
self._invalidate_hashid(['categories'])
return id
[docs]
def add_track(self, name, id=None, **kw):
"""
Register a new track with the dataset
Args:
name (str): name of the new track
id (int | None): use this track id, if it was not taken
**kw : stores arbitrary key/value pairs in this new image
Returns:
int : the track id assigned to the new track
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> prev_n_tracks = self.n_tracks
>>> track_id = self.add_track('dog')
>>> assert self.index.tracks[track_id]['name'] == 'dog'
>>> assert self.n_tracks == prev_n_tracks + 1
>>> import pytest
>>> with pytest.raises(ValueError):
>>> self.add_track('dog')
"""
index = self.index
if index.tracks and name in index.name_to_track:
raise exceptions.DuplicateAddError('Track name={!r} already exists'.format(name))
if id is None:
id = self._next_ids.get('tracks')
elif index.tracks and id in index.tracks:
raise exceptions.DuplicateAddError('Track id={} already exists'.format(id))
track = _dict()
track['id'] = int(id)
track['name'] = str(name)
track.update(**kw)
# Add to raw data structure
try:
self.dataset['tracks'].append(track)
except KeyError:
self.dataset['tracks'] = [track]
# And add to the indexes
index._add_track(id, name, track)
self._invalidate_hashid(['tracks'])
return id
[docs]
def ensure_video(self, name, id=None, **kw):
"""
Register a video if it is new or returns an existing id.
Like :func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_video`, but
returns the existing video id if it already exists instead of failing.
In this case all metadata is ignored.
Args:
file_name (str): relative or absolute path to video
id (None | int): ADVANCED. Force using this video id.
**kw : stores arbitrary key/value pairs in this new video
Returns:
int: the existing or new video id
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_video`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.ensure_video`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> id1 = self.ensure_video('video1')
>>> id2 = self.ensure_video('video1')
>>> assert id1 == id2
"""
try:
id = self.add_video(name=name, id=id, **kw)
except exceptions.DuplicateAddError:
obj = self.index.name_to_video[name]
id = obj['id']
return id
[docs]
def ensure_track(self, name, id=None, **kw):
"""
Register a track if it is new or returns an existing id.
Like :func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_track`, but
returns the existing track id if it already exists instead of failing.
In this case all metadata is ignored.
Args:
file_name (str): relative or absolute path to track
id (None | int): ADVANCED. Force using this track id.
**kw : stores arbitrary key/value pairs in this new track
Returns:
int: the existing or new track id
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_track`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.ensure_track`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> track_id1 = self.ensure_track('dog')
>>> track_id2 = self.ensure_track('dog')
>>> assert track_id1 == track_id2
"""
try:
id = self.add_track(name=name, id=id, **kw)
except exceptions.DuplicateAddError:
obj = self.index.name_to_track[name]
id = obj['id']
return id
[docs]
def ensure_image(self, file_name, id=None, **kw):
"""
Register an image if it is new or returns an existing id.
Like :func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_image`, but
returns the existing image id if it already exists instead of failing.
In this case all metadata is ignored.
Args:
file_name (str): relative or absolute path to image
id (None | int): ADVANCED. Force using this image id.
**kw : stores arbitrary key/value pairs in this new image
Returns:
int: the existing or new image id
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_image`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_images`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.ensure_image`
"""
try:
id = self.add_image(file_name=file_name, id=id, **kw)
except exceptions.DuplicateAddError:
img = self.index.file_name_to_img[file_name]
id = img['id']
return id
[docs]
def ensure_category(self, name, supercategory=None, id=None, **kw):
"""
Register a category if it is new or returns an existing id.
Like :func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_category`, but
returns the existing category id if it already exists instead of
failing. In this case all metadata is ignored.
Returns:
int: the existing or new category id
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_category`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.ensure_category`
"""
try:
id = self.add_category(name=name, supercategory=supercategory,
id=id, **kw)
except exceptions.DuplicateAddError:
cat = self.index.name_to_cat[name]
id = cat['id']
return id
[docs]
def add_annotations(self, anns):
"""
Faster less-safe multi-item alternative to add_annotation.
We assume the annotations are well formatted in kwcoco compliant
dictionaries, including the "id" field. No validation checks are
made when calling this function.
Args:
anns (List[Dict]): list of annotation dictionaries
SeeAlso:
:func:`add_annotation`
:func:`add_annotations`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> anns = [self.anns[aid] for aid in [2, 3, 5, 7]]
>>> self.remove_annotations(anns)
>>> assert self.n_annots == 7 and self._check_integrity()
>>> self.add_annotations(anns)
>>> assert self.n_annots == 11 and self._check_integrity()
"""
self.dataset['annotations'].extend(anns)
self.index._add_annotations(anns)
self._invalidate_hashid(['annotations'])
[docs]
def add_images(self, imgs):
"""
Faster less-safe multi-item alternative
We assume the images are well formatted in kwcoco compliant
dictionaries, including the "id" field. No validation checks are
made when calling this function.
Note:
THIS FUNCTION WAS DESIGNED FOR SPEED, AS SUCH IT DOES NOT CHECK IF
THE IMAGE-IDs or FILE_NAMES ARE DUPLICATED AND WILL BLINDLY ADD
DATA EVEN IF IT IS BAD. THE SINGLE IMAGE VERSION IS SLOWER BUT
SAFER.
Args:
imgs (List[Dict]): list of image dictionaries
SeeAlso:
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_image`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.add_images`
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.ensure_image`
Example:
>>> import kwcoco
>>> imgs = kwcoco.CocoDataset.demo().dataset['images']
>>> self = kwcoco.CocoDataset()
>>> self.add_images(imgs)
>>> assert self.n_images == 3 and self._check_integrity()
"""
self.dataset['images'].extend(imgs)
self.index._add_images(imgs)
self._invalidate_hashid(['images'])
[docs]
def clear_images(self):
"""
Removes all images and annotations (but not categories)
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> self.clear_images()
>>> print(ub.urepr(self.basic_stats(), nobr=1, nl=0, si=1))
n_anns: 0, n_imgs: 0, n_videos: 0, n_cats: 8, n_tracks: 0
"""
# self.dataset['images'].clear()
# self.dataset['annotations'].clear()
del self.dataset['images'][:]
del self.dataset['annotations'][:]
self.index._remove_all_images()
self._invalidate_hashid(['images', 'annotations'])
[docs]
def clear_annotations(self):
"""
Removes all annotations and tracks (but not images and categories)
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> self.clear_annotations()
>>> print(ub.urepr(self.basic_stats(), nobr=1, nl=0, si=1))
n_anns: 0, n_imgs: 3, n_videos: 0, n_cats: 8, n_tracks: 0
"""
# self.dataset['annotations'].clear()
self.dataset['annotations'].clear()
if 'tracks' in self.dataset:
self.dataset['tracks'].clear()
self.index._remove_all_annotations()
self._invalidate_hashid(['annotations', 'tracks'])
[docs]
def remove_annotation(self, aid_or_ann):
"""
Remove a single annotation from the dataset
If you have multiple annotations to remove its more efficient to remove
them in batch with
:func:`kwcoco.coco_dataset.MixinCocoAddRemove.remove_annotations`
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> aids_or_anns = [self.anns[2], 3, 4, self.anns[1]]
>>> self.remove_annotations(aids_or_anns)
>>> assert len(self.dataset['annotations']) == 7
>>> self._check_integrity()
"""
# Do the simple thing, its O(n) anyway,
remove_ann = self._resolve_to_ann(aid_or_ann)
self.dataset['annotations'].remove(remove_ann)
self.index.clear()
self._invalidate_hashid(['annotations'])
[docs]
def remove_annotations(self, aids_or_anns, verbose=0, safe=True):
"""
Remove multiple annotations from the dataset.
Args:
anns_or_aids (List): list of annotation dicts or ids
safe (bool): if True, we perform checks to remove
duplicates and non-existing identifiers. Defaults to True.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> prev_n_annots = self.n_annots
>>> aids_or_anns = [self.anns[2], 3, 4, self.anns[1]]
>>> self.remove_annotations(aids_or_anns) # xdoc: +IGNORE_WANT
{'annotations': 4}
>>> assert len(self.dataset['annotations']) == prev_n_annots - 4
>>> self._check_integrity()
"""
remove_info = {'annotations': None}
# Do nothing if given no input
if aids_or_anns:
# build mapping from aid to index O(n)
# TODO: it would be nice if this mapping was as part of the index.
aid_to_index = {
ann['id']: index
for index, ann in enumerate(self.dataset['annotations'])
}
remove_aids = list(map(self._resolve_to_id, aids_or_anns))
if safe:
remove_aids = sorted(set(remove_aids))
remove_info['annotations'] = len(remove_aids)
# Lookup the indices to remove, sort in descending order
if verbose > 1:
print('Removing {} annotations'.format(len(remove_aids)))
remove_idxs = list(ub.take(aid_to_index, remove_aids))
self.index._remove_annotations(remove_aids, verbose=verbose)
_delitems(self.dataset['annotations'], remove_idxs)
self._invalidate_hashid(['annotations'])
return remove_info
[docs]
def remove_categories(self, cat_identifiers, keep_annots=False, verbose=0,
safe=True):
"""
Remove categories and all annotations in those categories.
Currently does not change any hierarchy information
Args:
cat_identifiers (List): list of category dicts, names, or ids
keep_annots (bool):
if True, keeps annotations, but removes category labels.
Defaults to False.
safe (bool): if True, we perform checks to remove
duplicates and non-existing identifiers. Defaults to True.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> cat_identifiers = [self.cats[1], 'rocket', 3]
>>> self.remove_categories(cat_identifiers)
>>> assert len(self.dataset['categories']) == 5
>>> self._check_integrity()
"""
remove_info = {'annotations': None, 'categories': None}
if cat_identifiers:
if verbose > 1:
print('Removing annots of removed categories')
if safe:
remove_cids = set()
for identifier in cat_identifiers:
try:
cid = self._resolve_to_cid(identifier)
remove_cids.add(cid)
except Exception:
pass
remove_cids = sorted(remove_cids)
else:
remove_cids = list(map(self._resolve_to_cid, cat_identifiers))
# First remove any annotation that belongs to those categories
if self.index.cid_to_aids:
remove_aids = list(it.chain(*[self.index.cid_to_aids[cid]
for cid in remove_cids]))
else:
remove_aids = [ann['id'] for ann in self.dataset['annotations']
if ann['category_id'] in remove_cids]
if keep_annots:
# Simply remove category information instead of removing the
# entire annotation.
for aid in remove_aids:
self.anns[aid].pop('category_id')
else:
rminfo = self.remove_annotations(remove_aids, verbose=verbose)
remove_info.update(rminfo)
remove_info['categories'] = len(remove_cids)
if verbose > 1:
print('Removing {} category entries'.format(len(remove_cids)))
id_to_index = {
cat['id']: index
for index, cat in enumerate(self.dataset['categories'])
}
# Lookup the indices to remove, sort in descending order
remove_idxs = list(ub.take(id_to_index, remove_cids))
_delitems(self.dataset['categories'], remove_idxs)
self.index._remove_categories(remove_cids, verbose=verbose)
self._invalidate_hashid(['categories', 'annotations'])
return remove_info
[docs]
def remove_tracks(self, track_identifiers, keep_annots=False, verbose=0,
safe=True):
"""
Remove tracks and all annotations in those tracks.
Args:
track_identifiers (List): list of track dicts, names, or ids
keep_annots (bool):
if True, keeps annotations, but removes tracks labels.
Defaults to False.
safe (bool): if True, we perform checks to remove
duplicates and non-existing identifiers. Defaults to True.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('vidshapes1')
>>> for ann in self.dataset['annotations']:
... ann.pop('segmentation')
... ann.pop('keypoints')
>>> print('self.dataset = {}'.format(ub.urepr(self.dataset, nl=2)))
>>> track_identifiers = [2]
>>> assert len(self.dataset['tracks']) == 2
>>> self.remove_tracks(track_identifiers)
>>> print('self.dataset = {}'.format(ub.urepr(self.dataset, nl=2)))
>>> assert len(self.dataset['tracks']) == 1
>>> self._check_integrity()
"""
remove_info = {'annotations': None, 'tracks': None}
if track_identifiers:
if verbose > 1:
print('Removing annots of removed tracks')
if safe:
remove_trackids = set()
for identifier in track_identifiers:
try:
trackid = self._resolve_to_trackid(identifier)
remove_trackids.add(trackid)
except Exception:
pass
remove_trackids = sorted(remove_trackids)
else:
remove_trackids = list(map(self._resolve_to_trackid, track_identifiers))
# First remove any annotation that belongs to those tracks
if self.index.trackid_to_aids:
remove_aids = list(it.chain(*[self.index.trackid_to_aids[trackid]
for trackid in remove_trackids]))
else:
remove_aids = [ann['id'] for ann in self.dataset['annotations']
if ann['track_id'] in remove_trackids]
if keep_annots:
# Simply remove track information instead of removing the
# entire annotation.
for aid in remove_aids:
self.anns[aid].pop('track_id')
else:
rminfo = self.remove_annotations(remove_aids, verbose=verbose)
remove_info.update(rminfo)
remove_info['tracks'] = len(remove_trackids)
if verbose > 1:
print('Removing {} track entries'.format(len(remove_trackids)))
id_to_index = {
track['id']: index
for index, track in enumerate(self.dataset['tracks'])
}
# Lookup the indices to remove, sort in descending order
remove_idxs = list(ub.take(id_to_index, remove_trackids))
_delitems(self.dataset['tracks'], remove_idxs)
self.index._remove_tracks(remove_trackids, verbose=verbose)
self._invalidate_hashid(['tracks', 'annotations'])
return remove_info
[docs]
def remove_images(self, gids_or_imgs, verbose=0, safe=True):
"""
Remove images and any annotations contained by them
Args:
gids_or_imgs (List): list of image dicts, names, or ids
safe (bool): if True, we perform checks to remove
duplicates and non-existing identifiers.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> assert len(self.dataset['images']) == 3
>>> gids_or_imgs = [self.imgs[2], 'astro.png']
>>> self.remove_images(gids_or_imgs) # xdoc: +IGNORE_WANT
{'annotations': 11, 'images': 2}
>>> assert len(self.dataset['images']) == 1
>>> self._check_integrity()
>>> gids_or_imgs = [3]
>>> self.remove_images(gids_or_imgs)
>>> assert len(self.dataset['images']) == 0
>>> self._check_integrity()
"""
remove_info = {'annotations': None, 'images': None}
if gids_or_imgs:
if verbose > 1:
print('Removing images')
remove_gids = list(map(self._resolve_to_gid, gids_or_imgs))
if safe:
remove_gids = sorted(set(remove_gids))
# First remove any annotation that belongs to those images
if self.index.gid_to_aids:
remove_aids = list(it.chain(*[self.index.gid_to_aids[gid]
for gid in remove_gids]))
else:
remove_aids = [ann['id'] for ann in self.dataset['annotations']
if ann['image_id'] in remove_gids]
rminfo = self.remove_annotations(remove_aids, verbose=verbose)
remove_info.update(rminfo)
remove_info['images'] = len(remove_gids)
if verbose > 1:
print('Removing {} image entries'.format(len(remove_gids)))
id_to_index = {
img['id']: index
for index, img in enumerate(self.dataset['images'])
}
# Lookup the indices to remove, sort in descending order
remove_idxs = list(ub.take(id_to_index, remove_gids))
_delitems(self.dataset['images'], remove_idxs)
self.index._remove_images(remove_gids, verbose=verbose)
self._invalidate_hashid(['images', 'annotations'])
return remove_info
[docs]
def remove_videos(self, vidids_or_videos, verbose=0, safe=True):
"""
Remove videos and any images / annotations contained by them
Args:
vidids_or_videos (List): list of video dicts, names, or ids
safe (bool):
if True, we perform checks to remove duplicates and
non-existing identifiers.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('vidshapes8')
>>> assert len(self.dataset['videos']) == 8
>>> vidids_or_videos = [self.dataset['videos'][0]['id']]
>>> self.remove_videos(vidids_or_videos) # xdoc: +IGNORE_WANT
{'annotations': 4, 'images': 2, 'videos': 1}
>>> assert len(self.dataset['videos']) == 7
>>> self._check_integrity()
"""
remove_info = {'annotations': None, 'images': None, 'videos': None}
if vidids_or_videos:
if verbose > 1:
print('Removing videos')
remove_vidids = list(map(self._resolve_to_vidid, vidids_or_videos))
if safe:
remove_vidids = sorted(set(remove_vidids))
# First remove any annotation that belongs to those images
if self.index.vidid_to_gids:
remove_gids = list(it.chain(*[self.index.vidid_to_gids[vidid]
for vidid in remove_vidids]))
else:
remove_gids = [ann['id'] for ann in self.dataset['videos']
if ann['image_id'] in remove_gids]
rminfo = self.remove_images(remove_gids, verbose=verbose,
safe=safe)
remove_info.update(rminfo)
remove_info['videos'] = len(remove_vidids)
if verbose > 1:
print('Removing {} video entries'.format(len(remove_vidids)))
id_to_index = {
video['id']: index
for index, video in enumerate(self.dataset['videos'])
}
# Lookup the indices to remove, sort in descending order
remove_idxs = list(ub.take(id_to_index, remove_vidids))
_delitems(self.dataset['videos'], remove_idxs)
self.index._remove_videos(remove_vidids, verbose=verbose)
self._invalidate_hashid(['videos', 'images', 'annotations'])
return remove_info
[docs]
def remove_annotation_keypoints(self, kp_identifiers):
"""
Removes all keypoints with a particular category
Args:
kp_identifiers (List): list of keypoint category dicts, names, or ids
Returns:
Dict: num_removed: information on the number of items removed
"""
# kpnames = {k['name'] for k in remove_kpcats}
# TODO: needs optimization
remove_kpcats = list(map(self._resolve_to_kpcat, kp_identifiers))
kpcids = {k['id'] for k in remove_kpcats}
num_kps_removed = 0
for ann in self.dataset['annotations']:
remove_idxs = [
kp_idx for kp_idx, kp in enumerate(ann['keypoints'])
if kp['keypoint_category_id'] in kpcids
]
num_kps_removed += len(remove_idxs)
_delitems(ann['keypoints'], remove_idxs)
remove_info = {'annotation_keypoints': num_kps_removed}
return remove_info
[docs]
def remove_keypoint_categories(self, kp_identifiers):
"""
Removes all keypoints of a particular category as well as all
annotation keypoints with those ids.
Args:
kp_identifiers (List): list of keypoint category dicts, names, or ids
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('shapes', rng=0)
>>> kp_identifiers = ['left_eye', 'mid_tip']
>>> remove_info = self.remove_keypoint_categories(kp_identifiers)
>>> print('remove_info = {!r}'.format(remove_info))
>>> # FIXME: for whatever reason demodata generation is not determenistic when seeded
>>> # assert remove_info == {'keypoint_categories': 2, 'annotation_keypoints': 16, 'reflection_ids': 1}
>>> assert self._resolve_to_kpcat('right_eye')['reflection_id'] is None
"""
remove_info = {
'keypoint_categories': None,
'annotation_keypoints': None
}
remove_kpcats = list(map(self._resolve_to_kpcat, kp_identifiers))
_ann_remove_info = self.remove_annotation_keypoints(remove_kpcats)
remove_info.update(_ann_remove_info)
remove_kpcids = {k['id'] for k in remove_kpcats}
for kpcat in remove_kpcats:
self.dataset['keypoint_categories'].remove(kpcat)
# handle reflection ids
remove_reflect_ids = 0
for kpcat in self.dataset['keypoint_categories']:
if kpcat.get('reflection_id', None) in remove_kpcids:
kpcat['reflection_id'] = None
remove_reflect_ids += 1
remove_info['reflection_ids'] = remove_reflect_ids
remove_info['keypoint_categories'] = len(remove_kpcats)
return remove_info
[docs]
def set_annotation_category(self, aid_or_ann, cid_or_cat):
"""
Sets the category of a single annotation
Args:
aid_or_ann (dict | int): annotation dict or id
cid_or_cat (dict | int): category dict or id
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> old_freq = self.category_annotation_frequency()
>>> aid_or_ann = aid = 2
>>> cid_or_cat = new_cid = self.ensure_category('kitten')
>>> self.set_annotation_category(aid, new_cid)
>>> new_freq = self.category_annotation_frequency()
>>> print('new_freq = {}'.format(ub.urepr(new_freq, nl=1)))
>>> print('old_freq = {}'.format(ub.urepr(old_freq, nl=1)))
>>> assert sum(new_freq.values()) == sum(old_freq.values())
>>> assert new_freq['kitten'] == 1
"""
new_cid = self._resolve_to_cid(cid_or_cat)
ann = self._resolve_to_ann(aid_or_ann)
aid = ann['id']
if self.index:
if 'category_id' in ann:
old_cid = ann['category_id']
self.index.cid_to_aids[old_cid].remove(aid)
ann['category_id'] = new_cid
if self.index:
self.index.cid_to_aids[new_cid].add(aid)
self._invalidate_hashid(['annotations'])
[docs]
class CocoIndex:
"""
Fast lookup index for the COCO dataset with dynamic modification
Attributes:
imgs (Dict[int, dict]):
mapping between image ids and the image dictionaries
anns (Dict[int, dict]):
mapping between annotation ids and the annotation dictionaries
cats (Dict[int, dict]):
mapping between category ids and the category dictionaries
tracks (Dict[int, dict]):
mapping between track ids and the track dictionaries
kpcats (Dict[int, dict]):
mapping between keypoint category ids and keypoint category
dictionaries
gid_to_aids (Dict[int, List[int]]):
mapping between an image-id and annotation-ids that belong to it
cid_to_aids (Dict[int, List[int]]):
mapping between an category-id and annotation-ids that belong to it
cid_to_gids (Dict[int, List[int]]):
mapping between an category-id and image-ids that contain
at least one annotation with this cateogry id.
trackid_to_aids (Dict[int, List[int]]):
mapping between a track-id and annotation-ids that belong to it
vidid_to_gids (Dict[int, List[int]]):
mapping between an video-id and images-ids that belong to it
name_to_video (Dict[str, dict]):
mapping between a video name and the video dictionary.
name_to_cat (Dict[str, dict]):
mapping between a category name and the category dictionary.
name_to_img (Dict[str, dict]):
mapping between a image name and the image dictionary.
name_to_track (Dict[str, dict]):
mapping between a track name and the track dictionary.
file_name_to_img (Dict[str, dict]):
mapping between a image file_name and the image dictionary.
"""
# _set = ub.oset # many operations are much slower for oset
_set = set
[docs]
def _images_set_sorted_by_frame_index(index, gids=None):
"""
Helper for ensuring that vidid_to_gids returns image ids ordered by
frame index.
"""
return SortedSet(gids, key=partial(_lut_image_frame_index, index.imgs))
# Backwards compat
_set_sorted_by_frame_index = _images_set_sorted_by_frame_index
[docs]
def _annots_set_sorted_by_frame_index(index, aids=None):
"""
Helper for ensuring that vidid_to_gids returns image ids ordered by
frame index.
"""
# if aids is None:
# return set()
# return set(aids)
return SortedSet(aids, key=partial(_lut_annot_frame_index, index.imgs, index.anns))
def __init__(index):
index.anns = None
index.imgs = None
index.videos = None
index.cats = None
index.tracks = None
index.kpcats = None
index._id_lookup = None
index.gid_to_aids = None
index.cid_to_aids = None
index.vidid_to_gids = None
index.trackid_to_aids = None
index.name_to_video = None
index.name_to_cat = None
index.name_to_img = None
index.name_to_track = None
index.file_name_to_img = None
index._CHECKS = True
# index.kpcid_to_aids = None # TODO
def __bool__(index):
return index.anns is not None
# On-demand lookup tables
@property
def cid_to_gids(index):
"""
Example:
>>> import kwcoco
>>> self = dset = kwcoco.CocoDataset()
>>> self.index.cid_to_gids
"""
from scriptconfig.dict_like import DictLike
class ProxyCidToGids(DictLike):
def __init__(self, index):
self.index = index
def getitem(self, cid):
aids = self.index.cid_to_aids[cid]
gids = {self.index.anns[aid]['image_id'] for aid in aids}
return gids
def keys(self):
return self.index.cid_to_aids.keys()
cid_to_gids = ProxyCidToGids(index=index)
return cid_to_gids
[docs]
def _add_video(index, vidid, video):
if index.videos is not None:
name = video['name']
if index._CHECKS:
if name in index.name_to_video:
raise exceptions.DuplicateAddError(
'video with name={} already exists'.format(name))
index.videos[vidid] = video
if vidid not in index.vidid_to_gids:
index.vidid_to_gids[vidid] = index._images_set_sorted_by_frame_index()
index.name_to_video[name] = video
[docs]
def _add_image(index, gid, img):
"""
Example:
>>> # Test adding image to video that doesnt exist
>>> import kwcoco
>>> self = dset = kwcoco.CocoDataset()
>>> dset.add_image(file_name='frame1', video_id=1, frame_index=0)
>>> dset.add_image(file_name='frame2', video_id=1, frame_index=0)
>>> dset._check_integrity()
>>> print('dset.index.vidid_to_gids = {!r}'.format(dset.index.vidid_to_gids))
>>> assert len(dset.index.vidid_to_gids) == 1
>>> dset.add_video(name='foo-vid', id=1)
>>> assert len(dset.index.vidid_to_gids) == 1
>>> dset._check_integrity()
"""
if index.imgs is not None:
file_name = img.get('file_name', None)
name = img.get('name', None)
if index._CHECKS:
if file_name is None and name is None:
raise exceptions.InvalidAddError(
'at least one of file_name or name must be specified')
if file_name in index.file_name_to_img:
raise exceptions.DuplicateAddError(
'image with file_name={} already exists'.format(
file_name))
if name in index.name_to_img:
raise exceptions.DuplicateAddError(
'image with name={} already exists'.format(name))
index.imgs[gid] = img
index.gid_to_aids[gid] = index._set()
if file_name is not None:
index.file_name_to_img[file_name] = img
if name is not None:
index.name_to_img[name] = img
if 'video_id' in img:
if img.get('frame_index', None) is None:
raise exceptions.InvalidAddError(
'Images with video-ids must have a frame_index')
vidid = img['video_id']
try:
index.vidid_to_gids[vidid].add(gid)
except KeyError:
# Should warning messages contain data-specific info?
# msg = ('Adding image-id={} to '
# 'non-existing video-id={}').format(gid, vidid)
msg = 'Adding image to non-existing video'
warnings.warn(msg)
index.vidid_to_gids[vidid] = index._images_set_sorted_by_frame_index()
index.vidid_to_gids[vidid].add(gid)
[docs]
def _add_images(index, imgs):
"""
See ../dev/bench/bench_add_image_check.py
Note:
THIS FUNCTION WAS DESIGNED FOR SPEED, AS SUCH IT DOES NOT CHECK IF
THE IMAGE-IDs or FILE_NAMES ARE DUPLICATED AND WILL BLINDLY ADD
DATA EVEN IF IT IS BAD. THE SINGLE IMAGE VERSION IS SLOWER BUT
SAFER.
"""
if index.imgs is not None:
gids = [img['id'] for img in imgs]
new_imgs = dict(zip(gids, imgs))
index.imgs.update(new_imgs)
index.file_name_to_img.update(
{img['file_name']: img for img in imgs
if img.get('file_name', None) is not None})
index.name_to_img.update(
{img['name']: img for img in imgs
if img.get('name', None) is not None})
for gid in gids:
index.gid_to_aids[gid] = index._set()
if index.vidid_to_gids:
vidid_to_gids = ub.group_items(
[g['id'] for g in imgs],
[g.get('video_id', None) for g in imgs]
)
vidid_to_gids.pop(None, None)
for vidid, gids in vidid_to_gids.items():
index.vidid_to_gids[vidid].update(gids)
[docs]
def _add_annotation(index, aid, gid, cid, tid, ann):
if index.anns is not None:
index.anns[aid] = ann
# Note: it should be ok to have None's here
index.gid_to_aids[gid].add(aid)
index.cid_to_aids[cid].add(aid)
# index.trackid_to_aids[tid].add(aid)
try:
index.trackid_to_aids[tid].add(aid)
except KeyError:
if tid is not None:
msg = 'Adding annotation to non-existing track'
warnings.warn(msg)
# Be careful to not apply sorting to trackless annotations
if tid is None:
index.trackid_to_aids[tid] = set()
else:
index.trackid_to_aids[tid] = index._annots_set_sorted_by_frame_index()
index.trackid_to_aids[tid].add(aid)
[docs]
def _add_annotations(index, anns):
if index.anns is not None:
aids = [ann['id'] for ann in anns]
gids = [ann['image_id'] for ann in anns]
cids = [ann['category_id'] for ann in anns]
tids = [ann.get('track_id', None) for ann in anns]
new_anns = dict(zip(aids, anns))
index.anns.update(new_anns)
for gid, cid, tid, aid in zip(gids, cids, tids, aids):
index.gid_to_aids[gid].add(aid)
index.cid_to_aids[cid].add(aid)
try:
index.trackid_to_aids[tid].add(aid)
except KeyError:
if tid is None:
index.trackid_to_aids[tid] = set()
else:
index.trackid_to_aids[tid] = index._annots_set_sorted_by_frame_index()
index.trackid_to_aids[tid].add(aid)
[docs]
def _add_category(index, cid, name, cat):
if index.cats is not None:
index.cats[cid] = cat
index.cid_to_aids[cid] = index._set()
index.name_to_cat[name] = cat
[docs]
def _add_track(index, trackid, name, track):
if index.tracks is not None:
index.tracks[trackid] = track
index.trackid_to_aids[trackid] = index._annots_set_sorted_by_frame_index()
index.name_to_track[name] = track
[docs]
def _remove_all_annotations(index):
# Keep the category and image indexes alive
if index.anns is not None:
for _ in index.gid_to_aids.values():
_.clear()
for _ in index.cid_to_aids.values():
_.clear()
# for _ in index.trackid_to_aids.values():
# _.clear()
index.anns.clear()
index.tracks.clear()
index.trackid_to_aids.clear()
index.name_to_track.clear()
[docs]
def _remove_all_images(index):
# Keep the category indexes alive
if index.imgs is not None:
index.imgs.clear()
index.anns.clear()
index.gid_to_aids.clear()
index.file_name_to_img.clear()
for _ in index.cid_to_aids.values():
_.clear()
for _ in index.vidid_to_gids.values():
_.clear()
[docs]
def _remove_annotations(index, remove_aids, verbose=0):
if index.anns is not None:
if verbose > 1:
print('Updating annotation index')
# This is faster for simple set cid_to_aids
for aid in remove_aids:
ann = index.anns[aid]
gid = ann['image_id']
cid = ann.get('category_id', None)
track_id = ann.get('track_id', None)
index.trackid_to_aids[track_id].remove(aid)
index.cid_to_aids[cid].remove(aid)
index.gid_to_aids[gid].remove(aid)
index.anns.pop(aid)
[docs]
def _remove_tracks(index, remove_trackids, verbose=0):
# dynamically update the track index
if index.tracks is not None:
for tid in remove_trackids:
track = index.tracks.pop(tid)
del index.trackid_to_aids[tid]
del index.name_to_track[track['name']]
if verbose > 2:
print('Updated track index')
[docs]
def _remove_categories(index, remove_cids, verbose=0):
# dynamically update the category index
if index.cats is not None:
for cid in remove_cids:
cat = index.cats.pop(cid)
del index.cid_to_aids[cid]
del index.name_to_cat[cat['name']]
if verbose > 2:
print('Updated category index')
[docs]
def _remove_images(index, remove_gids, verbose=0):
# dynamically update the image index
if index.imgs is not None:
for gid in remove_gids:
img = index.imgs[gid]
vidid = img.get('video_id', None)
if vidid in index.vidid_to_gids:
index.vidid_to_gids[vidid].remove(gid)
del index.gid_to_aids[gid]
gname = img.get('file_name', None)
if gname is not None:
del index.file_name_to_img[gname]
name = img.get('name', None)
if name is not None:
del index.name_to_img[name]
del index.imgs[gid]
if verbose > 2:
print('Updated image index')
[docs]
def _remove_videos(index, remove_vidids, verbose=0):
# dynamically update the video index
lut = index.videos
if lut is not None:
for item_id in remove_vidids:
item = lut.pop(item_id)
del index.vidid_to_gids[item_id]
if index.name_to_video is not None:
del index.name_to_video[item['name']]
if verbose > 2:
print('Updated video index')
[docs]
def clear(index):
index.anns = None
index.imgs = None
index.videos = None
index.cats = None
index.tracks = None
index.kpcats = None
index._id_lookup = None
index.gid_to_aids = None
index.vidid_to_gids = None
index.cid_to_aids = None
index.name_to_cat = None
index.file_name_to_img = None
index.name_to_video = None
index.trackid_to_aids = None
index.name_to_track = None
# index.kpcid_to_aids = None # TODO
[docs]
def build(index, parent):
"""
Build all id-to-obj reverse indexes from scratch.
Args:
parent (kwcoco.CocoDataset): the dataset to index
Notation:
aid - Annotation ID
gid - imaGe ID
cid - Category ID
vidid - Video ID
tid - Track ID
Example:
>>> import kwcoco
>>> parent = kwcoco.CocoDataset.demo('vidshapes1', num_frames=4, rng=1)
>>> index = parent.index
>>> index.build(parent)
"""
# create index
anns, cats, imgs = {}, {}, {}
videos = {}
tracks = {}
# Build one-to-one index-lookup maps
for cat in parent.dataset.get('categories', []):
cid = cat['id']
if cid in cat:
warnings.warn(
'Categories have the same id in {}:\n{} and\n{}'.format(
parent, cats[cid], cat))
cats[cid] = cat
for video in parent.dataset.get('videos', []):
vidid = video['id']
if vidid in videos:
warnings.warn(
'Video has the same id in {}:\n{} and\n{}'.format(
parent, videos[vidid], video))
videos[vidid] = video
for img in parent.dataset.get('images', []):
gid = img['id']
if gid in imgs:
warnings.warn(
'Images have the same id in {}:\n{} and\n{}'.format(
parent, imgs[gid], img))
imgs[gid] = img
for ann in parent.dataset.get('annotations', []):
aid = ann['id']
if aid in anns:
warnings.warn(
'Annotations at index {} and {} '
'have the same id in {}:\n{} and\n{}'.format(
parent.dataset['annotations'].index(anns[aid]),
parent.dataset['annotations'].index(ann),
parent, anns[aid], ann))
anns[aid] = ann
for track in parent.dataset.get('tracks', []):
tid = track['id']
if tid in track:
warnings.warn(
'Tracks have the same id in {}:\n{} and\n{}'.format(
parent, tracks[tid], track))
tracks[tid] = track
# Build one-to-many lookup maps
vidid_to_gids = ub.group_items(
[g['id'] for g in imgs.values()],
[g.get('video_id', None) for g in imgs.values()]
)
vidid_to_gids.pop(None, None)
if 0:
# The following is slightly slower, but it is also many fewer lines
# Not sure if its correct to replace the else block or not
aids = [d['id'] for d in anns.values()]
gid_to_aids = ub.group_items(aids, (d['image_id'] for d in anns.values()))
cid_to_aids = ub.group_items(aids, (d.get('category_id', None) for d in anns.values()))
trackid_to_aids = ub.group_items(aids, (d.get('track_id', None) for d in anns.values()))
cid_to_aids.pop(None, None)
gid_to_aids = ub.map_vals(index._set, gid_to_aids)
cid_to_aids = ub.map_vals(index._set, cid_to_aids)
trackid_to_aids = ub.map_vals(index._set, trackid_to_aids)
vidid_to_gids = ub.map_vals(index._images_set_sorted_by_frame_index, vidid_to_gids)
else:
gid_to_aids = defaultdict(index._set)
cid_to_aids = defaultdict(index._set)
trackid_to_aids = defaultdict(index._set)
for ann in anns.values():
try:
aid = ann['id']
gid = ann['image_id']
except KeyError:
raise KeyError('Annotation does not have ids {}'.format(ann))
if not isinstance(aid, numbers.Integral):
raise TypeError('bad aid={} type={}'.format(aid, type(aid)))
if not isinstance(gid, numbers.Integral):
raise TypeError('bad gid={} type={}'.format(gid, type(gid)))
gid_to_aids[gid].add(aid)
if gid not in imgs:
warnings.warn('Annotation {} in {} references '
'unknown image_id'.format(ann, parent))
try:
cid = ann['category_id']
except KeyError:
warnings.warn('Annotation {} in {} is missing '
'a category_id'.format(ann, parent))
else:
cid_to_aids[cid].add(aid)
if not isinstance(cid, numbers.Integral) and cid is not None:
raise TypeError('bad cid={} type={}'.format(cid, type(cid)))
if cid not in cats and cid is not None:
warnings.warn('Annotation {} in {} references '
'unknown category_id'.format(ann, parent))
trackid = ann.get('track_id', None)
trackid_to_aids[trackid].add(aid)
# Fix one-to-zero cases
for cid in cats.keys():
if cid not in cid_to_aids:
cid_to_aids[cid] = index._set()
for gid in imgs.keys():
if gid not in gid_to_aids:
gid_to_aids[gid] = index._set()
for vidid in videos.keys():
if vidid not in vidid_to_gids:
vidid_to_gids[vidid] = index._set()
# create class members
index._id_lookup = {
'categories': cats,
'images': imgs,
'annotations': anns,
'videos': videos,
'tracks': tracks,
}
index.anns = anns
index.imgs = imgs
index.cats = cats
index.kpcats = None # TODO
index.videos = videos
index.tracks = tracks
# Remove defaultdict like behavior
gid_to_aids.default_factory = None
# Actually, its important to have defaultdict like behavior for
# categories so we can allow for the category_id=None case
# cid_to_aids.default_factory = None
# vidid_to_gids.default_factory = None
# Ensure that the values are cast to the appropriate set type
# This needs to happen after index.imgs is populated
vidid_to_gids = ub.map_vals(index._images_set_sorted_by_frame_index, vidid_to_gids)
# Be careful to not apply sorting to trackless annotations
_notrack_aids = trackid_to_aids.pop(None, None)
trackid_to_aids = ub.map_vals(index._annots_set_sorted_by_frame_index, trackid_to_aids)
if _notrack_aids is not None:
trackid_to_aids[None] = set(_notrack_aids)
index.gid_to_aids = gid_to_aids
index.cid_to_aids = cid_to_aids
index.vidid_to_gids = vidid_to_gids
index.trackid_to_aids = trackid_to_aids
index.name_to_cat = {cat['name']: cat for cat in index.cats.values()}
index.file_name_to_img = {
img['file_name']: img for img in index.imgs.values()
if img.get('file_name', None) is not None
}
index.name_to_img = {
img['name']: img for img in index.imgs.values()
if img.get('name', None) is not None
}
index.name_to_video = {
video['name']: video for video in index.videos.values()
if video.get('name', None) is not None
}
index.name_to_track = {
track['name']: track for track in index.tracks.values()
if track.get('name', None) is not None
}
[docs]
class MixinCocoIndex:
"""
Give the dataset top level access to index attributes
"""
@property
def anns(self):
return self.index.anns
@property
def imgs(self):
return self.index.imgs
@property
def cats(self):
return self.index.cats
# NOTE: API Issue, overloads previous method
# @property
# def videos(self):
# return self.index.videos
@property
def gid_to_aids(self):
return self.index.gid_to_aids
@property
def cid_to_aids(self):
return self.index.cid_to_aids
@property
def name_to_cat(self):
return self.index.name_to_cat
[docs]
class CocoDataset(AbstractCocoDataset, MixinCocoAddRemove, MixinCocoStats,
MixinCocoObjects, MixinCocoDraw,
MixinCocoAccessors, MixinCocoConstructors, MixinCocoExtras,
MixinCocoHashing, MixinCocoIndex, MixinCocoDepricate,
ub.NiceRepr):
"""
The main coco dataset class with a json dataset backend.
Attributes:
dataset (Dict): raw json data structure. This is the base dictionary
that contains {'annotations': List, 'images': List,
'categories': List}
index (CocoIndex): an efficient lookup index into the coco data
structure. The index defines its own attributes like
``anns``, ``cats``, ``imgs``, ``gid_to_aids``,
``file_name_to_img``, etc. See :class:`CocoIndex` for more details
on which attributes are available.
fpath (PathLike | None):
if known, this stores the filepath the dataset was loaded from
tag (str | None):
A tag indicating the name of the dataset.
bundle_dpath (PathLike | None) :
If known, this is the root path that all image file names are
relative to. This can also be manually overwritten by the user.
hashid (str | None) :
If computed, this will be a hash uniquely identifing the dataset.
To ensure this is computed see
:func:`kwcoco.coco_dataset.MixinCocoExtras._build_hashid`.
References:
http://cocodataset.org/#format
http://cocodataset.org/#download
CommandLine:
python -m kwcoco.coco_dataset CocoDataset --show
Example:
>>> from kwcoco.coco_dataset import demo_coco_data
>>> import kwcoco
>>> import ubelt as ub
>>> # Returns a coco json structure
>>> dataset = demo_coco_data()
>>> # Pass the coco json structure to the API
>>> self = kwcoco.CocoDataset(dataset, tag='demo')
>>> # Now you can access the data using the index and helper methods
>>> #
>>> # Start by looking up an image by it's COCO id.
>>> image_id = 1
>>> img = self.index.imgs[image_id]
>>> print(ub.urepr(img, nl=1, sort=1))
{
'file_name': 'astro.png',
'id': 1,
'url': 'https://i.imgur.com/KXhKM72.png',
}
>>> #
>>> # Use the (gid_to_aids) index to lookup annotations in the iamge
>>> annotation_id = sorted(self.index.gid_to_aids[image_id])[0]
>>> ann = self.index.anns[annotation_id]
>>> print(ub.urepr((ub.udict(ann) - {'segmentation'}).sorted_keys(), nl=1))
{
'bbox': [10, 10, 360, 490],
'category_id': 1,
'id': 1,
'image_id': 1,
'keypoints': [247, 101, 2, 202, 100, 2],
}
>>> #
>>> # Use annotation category id to look up that information
>>> category_id = ann['category_id']
>>> cat = self.index.cats[category_id]
>>> print('cat = {}'.format(ub.urepr(cat, nl=1, sort=1)))
cat = {
'id': 1,
'name': 'astronaut',
'supercategory': 'human',
}
>>> #
>>> # Now play with some helper functions, like extended statistics
>>> extended_stats = self.extended_stats()
>>> # xdoctest: +IGNORE_WANT
>>> print('extended_stats = {}'.format(ub.urepr(extended_stats, nl=1, precision=2, sort=1)))
extended_stats = {
'annots_per_img': {'mean': 3.67, 'std': 3.86, 'min': 0.00, 'max': 9.00, 'nMin': 1, 'nMax': 1, 'shape': (3,)},
'imgs_per_cat': {'mean': 0.88, 'std': 0.60, 'min': 0.00, 'max': 2.00, 'nMin': 2, 'nMax': 1, 'shape': (8,)},
'cats_per_img': {'mean': 2.33, 'std': 2.05, 'min': 0.00, 'max': 5.00, 'nMin': 1, 'nMax': 1, 'shape': (3,)},
'annots_per_cat': {'mean': 1.38, 'std': 1.49, 'min': 0.00, 'max': 5.00, 'nMin': 2, 'nMax': 1, 'shape': (8,)},
'imgs_per_video': {'empty_list': True},
}
>>> # You can "draw" a raster of the annotated image with cv2
>>> canvas = self.draw_image(2)
>>> # Or if you have matplotlib you can "show" the image with mpl objects
>>> # xdoctest: +REQUIRES(--show)
>>> from matplotlib import pyplot as plt
>>> fig = plt.figure()
>>> ax1 = fig.add_subplot(1, 2, 1)
>>> self.show_image(gid=2)
>>> ax2 = fig.add_subplot(1, 2, 2)
>>> ax2.imshow(canvas)
>>> ax1.set_title('show with matplotlib')
>>> ax2.set_title('draw with cv2')
>>> plt.show()
"""
def __init__(self, data=None, tag=None, bundle_dpath=None, img_root=None,
fname=None, autobuild=True):
"""
Args:
data (str | PathLike | dict | None):
Either a filepath to a coco json file, or a dictionary
containing the actual coco json structure. For a more generally
coercable constructor see func:`CocoDataset.coerce`.
tag (str | None) :
Name of the dataset for display purposes, and does not
influence behavior of the underlying data structure, although
it may be used via convinience methods. We attempt to
autopopulate this via information in ``data`` if available.
If unspecfied and ``data`` is a filepath this becomes the
basename.
bundle_dpath (str | None):
the root of the dataset that images / external data will be
assumed to be relative to. If unspecfied, we attempt to
determine it using information in ``data``. If ``data`` is a
filepath, we use the dirname of that path. If ``data`` is a
dictionary, we look for the "img_root" key. If unspecfied and
we fail to introspect then, we fallback to the current working
directory.
img_root (str | None):
deprecated alias for bundle_dpath
"""
self._fpath = None
# Info about what was the origin of this object and if anything
# happened to it over its lifetime.
self._state = {
'was_loaded': False,
'was_saved': False,
'was_modified': 0,
}
if img_root is not None:
bundle_dpath = img_root
if data is None:
# TODO: rely on subset of SPEC keys
data = {
'categories': [],
'videos': [],
'images': [],
'annotations': [],
'tracks': [],
'licenses': [],
'info': [],
}
fpath = None
inferred_date_type = None
if isinstance(data, dict):
# Assumption: If data is a dict and are not explicitly given
# bundle_dpath, then we assume it is relative to the cwd.
assumed_root = '.'
inferred_date_type = 'json-dict'
elif isinstance(data, (str, os.PathLike)):
path = os.fspath(data)
if isdir(path):
# data was a pointer to hopefully a kwcoco bundle
if bundle_dpath is None:
bundle_dpath = path
else:
if bundle_dpath != path:
raise Exception('ambiguous')
inferred_date_type = 'bundle-path'
else:
# data was a pointer to hopefully a kwcoco filepath
# TODO: do some validation of that here
fpath = data
if bundle_dpath is None:
bundle_dpath = dirname(fpath)
inferred_date_type = 'file-path'
else:
raise TypeError(
'data must be a dict or path to json file, '
'but got: {!r}'.format(type(data)))
if fpath is None and bundle_dpath is not None and inferred_date_type == 'bundle-path':
# This should probably be reserved for a coercion method
# If we are givena bundle path, assume a standard name convention
if fname is None:
import glob
candidates = [
'data',
'data.json',
'data.kwcoco.json',
'data.kwcoco.zip', # Allow zipfiles
'data.kwcoco.json.zip',
'*.kwcoco.json',
'*.mscoco.json',
]
# Check for standard bundle manifest names
manifest_candidate_iter = iter(ub.oset(ub.flatten([
glob.glob(join(bundle_dpath, p))
for p in candidates])))
try:
fpath = ub.peek(manifest_candidate_iter)
except StopIteration:
fpath = join(bundle_dpath, 'data.kwcoco.json')
# raise Exception('No manifest in Dataset Bundle')
else:
remain = list(manifest_candidate_iter)
if len(remain) > 0:
raise Exception('Ambiguous Dataset Bundle {}: {}'.format(fpath, remain))
else:
fpath = join(bundle_dpath, fname)
key = basename(bundle_dpath)
if fpath is not None:
fname = basename(fpath)
if fname == 'data.kwcoco.json':
if bundle_dpath is not None:
bundle_dpath = dirname(fpath)
key = basename(bundle_dpath)
else:
key = fname
if bundle_dpath is not None:
assumed_root = bundle_dpath
if isinstance(data, (str, os.PathLike)):
if not exists(fpath):
raise Exception(ub.paragraph(
'''
Specified fpath={} does not exist. If you are trying
to create a new dataset fist create a CocoDataset without
any arguments, and then set the fpath attribute.
We may loosen this requirement in the future.
''').format(fpath))
self._state['was_loaded'] = True
# Test to see if the kwcoco file is compressed
import zipfile
if zipfile.is_zipfile(fpath):
with open(fpath, 'rb') as file:
with zipfile.ZipFile(file, 'r') as zfile:
members = zfile.namelist()
if len(members) != 1:
raise Exception(
'Currently only zipfiles with exactly 1 '
'kwcoco member are supported')
text = zfile.read(members[0]).decode('utf8')
data = json_r.loads(text)
else:
with open(fpath, 'r') as file:
data = json_r.load(file)
# If data is a path it gives us the absolute location of the root
if tag is None:
tag = key
# Backwards compat hack, allow the coco file to specify the
# bundle_dpath
# TODO: deprecate img_root
if isinstance(data, dict) and 'img_root' in data:
ub.schedule_deprecation(
'kwcoco', name='img_root', type='dataset member',
deprecate='0.6.3', error='1.0.0', remove='1.1.0',
migration=ub.paragraph(
'''
Ensure the location of the saved kwcoco file encodes the
bundle dpath or ensure bundle_dpath is correctly set in the
in-memory CocoDataset object.
''')
)
# allow image root to be specified in the dataset
# we refer to this as a json data "body root".
body_root = data.get('img_root', '')
if body_root is None:
body_root = ''
elif isinstance(body_root, str):
_tmp = ub.expandpath(body_root)
if exists(_tmp):
body_root = _tmp
else:
if isinstance(body_root, list) and body_root == []:
body_root = ''
else:
raise TypeError('body_root = {!r}'.format(body_root))
try:
bundle_dpath = join(assumed_root, body_root)
except Exception:
print('body_root = {!r}'.format(body_root))
print('assumed_root = {!r}'.format(assumed_root))
raise
if bundle_dpath is None:
bundle_dpath = assumed_root
bundle_dpath = ub.expandpath(bundle_dpath)
if fpath is None:
fpath = join(bundle_dpath, 'data.kwcoco.json')
self.index = CocoIndex()
self.hashid = None
self.hashid_parts = None
self.tag = tag
self.dataset = data
self.data_fpath = fpath
self.bundle_dpath = bundle_dpath
self.cache_dpath = None
self.assets_dpath = None
# Keep track of an unused id we may use
self._next_ids = _NextId(self)
self._infer_dirs()
if autobuild:
self._build_index()
@property
def fpath(self):
""" In the future we will deprecate img_root for bundle_dpath """
return self._fpath
@fpath.setter
def fpath(self, value):
# Cant use update fpath because of reroot checks.
# self._update_fpath(value)
self._fpath = value
self._infer_dirs()
[docs]
def _update_fpath(self, new_fpath):
# New method for more robustly updating the file path and bundle
# directory, still a WIP. Only works when the current dataset is
# already valid.
if new_fpath is None:
# Bundle directory is clobbered, so we should make everything
# absolute
self.reroot(absolute=True)
else:
old_fpath = self.fpath
if old_fpath is not None:
old_fpath_ = ub.Path(old_fpath)
new_fpath_ = ub.Path(new_fpath)
same_bundle = (
(old_fpath_.parent == new_fpath_.parent) or
(old_fpath_.resolve() == new_fpath_.resolve())
)
if not same_bundle:
# The bundle directory has changed, so we need to reroot
new_root = new_fpath_.parent
self.reroot(new_root)
self._fpath = new_fpath
self._infer_dirs()
[docs]
def _infer_dirs(self):
"""
Ignore:
self = dset
"""
data_fpath = self.fpath
if data_fpath is not None:
bundle_dpath = dirname(data_fpath)
assets_dpath = join(bundle_dpath, '_assets')
cache_dpath = join(bundle_dpath, '_cache')
# OPINION: Do we want conditions?
# data_fname = basename(data_fpath)
# bundle_conditions = {
# # 'name': data_fname == 'data.kwcoco.json',
# # 'ext': data_fname.endswith('.kwcoco.json'),
# # 'has_assets': exists(assets_dpath),
# }
# is_bundle = all(bundle_conditions.values())
self.bundle_dpath = bundle_dpath
self.assets_dpath = assets_dpath
self.cache_dpath = cache_dpath
[docs]
@classmethod
def from_data(CocoDataset, data, bundle_dpath=None, img_root=None):
"""
Constructor from a json dictionary
Returns:
CocoDataset:
"""
coco_dset = CocoDataset(data, bundle_dpath=bundle_dpath,
img_root=img_root)
return coco_dset
[docs]
@classmethod
def from_image_paths(CocoDataset, gpaths, bundle_dpath=None,
img_root=None):
"""
Constructor from a list of images paths.
This is a convinience method.
Args:
gpaths (List[str]): list of image paths
Returns:
CocoDataset:
Example:
>>> import kwcoco
>>> coco_dset = kwcoco.CocoDataset.from_image_paths(['a.png', 'b.png'])
>>> assert coco_dset.n_images == 2
"""
coco_dset = CocoDataset(bundle_dpath=bundle_dpath, img_root=img_root)
for gpath in gpaths:
coco_dset.add_image(gpath)
return coco_dset
[docs]
@classmethod
def from_class_image_paths(CocoDataset, root):
"""
Ingest classification data in the common format where images of
different categories are stored in folders with the category label.
Args:
root (str | PathLike):
the path to a directory containing class-subdirectories
Returns:
CocoDataset:
"""
import kwimage
root = ub.Path(root)
subdirs = [child for child in root.glob('*') if child.is_dir()]
coco_dset = CocoDataset(bundle_dpath=root)
for subdir in subdirs:
catname = subdir.name
cat_id = coco_dset.ensure_category(catname)
for gpath in subdir.glob('*'):
h, w = kwimage.load_image_shape(gpath)[0:2]
image_id = coco_dset.add_image(gpath)
coco_dset.add_annotation(
bbox=[0, 0, w, h], category_id=cat_id, image_id=image_id)
return coco_dset
[docs]
@classmethod
def coerce_multiple(cls, datas, workers=0, mode='process', verbose=1,
postprocess=None, ordered=True, **kwargs):
"""
Coerce multiple CocoDataset objects in parallel.
Args:
datas (List): list of kwcoco coercables to load
workers (int | str): number of worker threads / processes.
Can also accept coerceable workers.
mode (str): thread, process, or serial. Defaults to process.
verbose (int): verbosity level
postprocess (Callable | None):
A function taking one arg (the loaded dataset) to run on the
loaded kwcoco dataset in background workers. This can be more
efficient when postprocessing is independent per kwcoco file.
ordered (bool):
if True yields datasets in the same order as given. Otherwise
results are yielded as they become available. Defaults to True.
**kwargs:
arguments passed to the constructor
Yields:
CocoDataset
SeeAlso:
* load_multiple - like this function but is a strict file-path-only loader
CommandLine:
xdoctest -m kwcoco.coco_dataset CocoDataset.coerce_multiple
Example:
>>> import kwcoco
>>> dset1 = kwcoco.CocoDataset.demo('shapes1')
>>> dset2 = kwcoco.CocoDataset.demo('shapes2')
>>> dset3 = kwcoco.CocoDataset.demo('vidshapes8')
>>> dsets = [dset1, dset2, dset3]
>>> input_fpaths = [d.fpath for d in dsets]
>>> results = list(kwcoco.CocoDataset.coerce_multiple(input_fpaths, ordered=True))
>>> result_fpaths = [r.fpath for r in results]
>>> assert result_fpaths == input_fpaths
>>> # Test unordered
>>> results1 = list(kwcoco.CocoDataset.coerce_multiple(input_fpaths, ordered=False))
>>> result_fpaths = [r.fpath for r in results]
>>> assert set(result_fpaths) == set(input_fpaths)
>>> #
>>> # Coerce from existing datasets
>>> results2 = list(kwcoco.CocoDataset.coerce_multiple(dsets, ordered=True, workers=0))
>>> assert results2[0] is dsets[0]
"""
import kwcoco
from kwcoco.util.util_parallel import coerce_num_workers
_loader = kwcoco.CocoDataset.coerce
workers = coerce_num_workers(workers)
workers = min(workers, len(datas))
# Reuse coerce_multiple logic but overload the loader function.
yield from cls._load_multiple(_loader, datas, workers=workers,
mode=mode, verbose=verbose,
postprocess=postprocess, ordered=ordered,
**kwargs)
[docs]
@classmethod
def load_multiple(cls, fpaths, workers=0, mode='process', verbose=1,
postprocess=None, ordered=True, **kwargs):
"""
Load multiple CocoDataset objects in parallel.
Args:
fpaths (List[str | PathLike]):
list of paths to multiple coco files to be loaded
workers (int): number of worker threads / processes
mode (str): thread, process, or serial. Defaults to process.
verbose (int): verbosity level
postprocess (Callable | None):
A function taking one arg (the loaded dataset) to run on the
loaded kwcoco dataset in background workers and returns the
modified dataset. This can be more efficient when
postprocessing is independent per kwcoco file.
ordered (bool):
if True yields datasets in the same order as given. Otherwise
results are yielded as they become available. Defaults to True.
**kwargs:
arguments passed to the constructor
Yields:
CocoDataset
SeeAlso:
* coerce_multiple - like this function but accepts general
coercable inputs.
"""
import kwcoco
_loader = kwcoco.CocoDataset
# Reuse coerce_multiple logic but overload the loader function.
yield from cls._load_multiple(_loader, fpaths, workers=workers,
mode=mode, verbose=verbose,
postprocess=postprocess, ordered=ordered,
**kwargs)
[docs]
@classmethod
def _load_multiple(cls, _loader, inputs, workers=0, mode='process',
verbose=1, postprocess=None, ordered=True, **kwargs):
"""
Shared logic for multiprocessing loaders.
SeeAlso:
* coerce_multiple
* load_multiple
"""
_submit_prog = ub.ProgIter(inputs, desc='submit load kwcoco jobs',
enabled=workers > 0, verbose=verbose)
executor = ub.Executor(mode=mode, max_workers=workers)
with executor:
jobs = []
for job_idx, data in enumerate(_submit_prog):
job = executor.submit(
_load_and_postprocess,
data=data,
loader=_loader,
postprocess=postprocess, **kwargs)
job.job_idx = job_idx
jobs.append(job)
if ordered:
_jobiter = jobs
else:
from concurrent.futures import as_completed
_jobiter = as_completed(jobs)
_collect_prog = ub.ProgIter(_jobiter, total=len(jobs),
desc='loading kwcoco files',
verbose=verbose)
for job in _collect_prog:
# Clear the reference to this job
jobs[job.job_idx] = None
yield job.result()
[docs]
@classmethod
def from_coco_paths(CocoDataset, fpaths, max_workers=0, verbose=1,
mode='thread', union='try'):
"""
Constructor from multiple coco file paths.
Loads multiple coco datasets and unions the result
Note:
if the union operation fails, the list of individually loaded files
is returned instead.
Args:
fpaths (List[str]): list of paths to multiple coco files to be
loaded and unioned.
max_workers (int): number of worker threads / processes
verbose (int): verbosity level
mode (str): thread, process, or serial
union (str | bool): If True, unions the result
datasets after loading. If False, just returns the result list.
If 'try', then try to preform the union, but return the result
list if it fails. Default='try'
Note:
This may be deprecated. Use load_multiple or coerce_multiple and
then manually perform the union.
"""
results = CocoDataset.load_multiple(
fpaths, workers=max_workers, verbose=verbose, mode=mode,
ordered=False, autobuild=False)
results = list(results)
if union:
try:
if verbose:
# TODO: it would be nice if we had a way to combine results
# on the fly, so we can work while the remaining io jobs
# are loading
print('combining results')
coco_dset = CocoDataset.union(*results)
except Exception as ex:
if union == 'try':
warnings.warn(
'Failed to union coco results: {!r}'.format(ex))
return results
else:
raise
else:
return coco_dset
else:
return results
[docs]
def copy(self):
"""
Deep copies this object
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> new = self.copy()
>>> assert new.imgs[1] is new.dataset['images'][0]
>>> assert new.imgs[1] == self.dataset['images'][0]
>>> assert new.imgs[1] is not self.dataset['images'][0]
"""
new = copy.copy(self)
new.index = CocoIndex()
new.hashid_parts = copy.deepcopy(self.hashid_parts)
new.dataset = copy.deepcopy(self.dataset)
new._next_ids = _NextId(new)
new._build_index()
return new
def __nice__(self):
parts = []
parts.append('tag={}'.format(self.tag))
if self.dataset is not None:
info = ub.urepr(self.basic_stats(), kvsep='=', si=1, nobr=1, nl=0)
parts.append(info)
return ', '.join(parts)
[docs]
def dumps(self, indent=None, newlines=False):
"""
Writes the dataset out to the json format
Args:
newlines (bool) :
if True, each annotation, image, category gets its own line
indent (int | str | None): indentation for the json file. See
:func:`json.dump` for details.
newlines (bool):
if True, each annotation, image, category gets its own line.
Note:
Using newlines=True is similar to:
print(ub.urepr(dset.dataset, nl=2, trailsep=False))
However, the above may not output valid json if it contains
ndarrays.
Example:
>>> import kwcoco
>>> import json
>>> self = kwcoco.CocoDataset.demo()
>>> text = self.dumps(newlines=True)
>>> print(text)
>>> self2 = kwcoco.CocoDataset(json.loads(text), tag='demo2')
>>> assert self2.dataset == self.dataset
>>> assert self2.dataset is not self.dataset
>>> text = self.dumps(newlines=True)
>>> print(text)
>>> self2 = kwcoco.CocoDataset(json.loads(text), tag='demo2')
>>> assert self2.dataset == self.dataset
>>> assert self2.dataset is not self.dataset
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.coerce('vidshapes1-msi-multisensor', verbose=3)
>>> self.remove_annotations(self.annots())
>>> text = self.dumps(newlines=0, indent=' ')
>>> print(text)
>>> text = self.dumps(newlines=True, indent=' ')
>>> print(text)
"""
from kwcoco.util import util_special_json
# Instead of using json to dump the whole thing make the text a bit
# more pretty.
try:
if newlines:
# TODO: make this work faster!
text = util_special_json._special_kwcoco_pretty_dumps_orig(self.dataset)
else:
# TODO: do main key sorting here as well
text = util_special_json._json_dumps(self.dataset, indent=indent)
except Exception as ex:
print('Failed to dump ex = {!r}'.format(ex))
self._check_json_serializable()
raise
return text
[docs]
def _compress_dump_to_fileptr(self, file, arcname=None, indent=None, newlines=False):
"""
Experimental method to save compressed kwcoco files, may be folded into
dump in the future.
"""
import zipfile
from kwcoco.util import util_archive
compression = util_archive._coerce_zipfile_compression('auto')
zipkw = {
'compression': compression,
}
if sys.version_info[0:2] >= (3, 7):
zipkw['compresslevel'] = None
if arcname is None:
if not self.fpath:
arcname = '_data.kwcoco.json'
else:
# Use the current name of the file to compress
arcname = basename(self.fpath)
if arcname.endswith('.zip'):
arcname = arcname[:-4]
if not arcname.endswith('.json'):
arcname = arcname + '.json'
with zipfile.ZipFile(file, 'w', **zipkw) as zfile:
# It would be nice if we could dump directly to a binary
# encoded format here. Then we could use self.dump(zfile.open())
text = self.dumps(indent=indent, newlines=newlines)
zfile.writestr(arcname, text.encode('utf8'))
[docs]
def _dump(self, file, indent, newlines, compress):
"""
Case where we are dumping to an open file pointer.
We assume this means the dataset has been written to disk.
"""
if compress:
self._compress_dump_to_fileptr(
file, indent=indent, newlines=newlines)
else:
if newlines:
file.write(self.dumps(indent=indent, newlines=newlines))
else:
try:
json_w.dump(self.dataset, file, indent=indent,
ensure_ascii=False)
except Exception as ex:
print('Failed to dump ex = {!r}'.format(ex))
if 'Circular reference detected' in str(ex):
raise
self._check_json_serializable()
raise
self._state['was_saved'] = True
[docs]
def dump(self, file=None, indent=None, newlines=False, temp_file='auto',
compress='auto'):
"""
Writes the dataset out to the json format
Args:
file (PathLike | IO | None):
Where to write the data. Can either be a path to a file or an
open file pointer / stream. If unspecified, it will be written
to the current ``fpath`` property.
indent (int | str | None): indentation for the json file. See
:func:`json.dump` for details.
newlines (bool):
if True, each annotation, image, category gets its own line.
temp_file (bool | str):
Argument to :func:`safer.open`. Ignored if ``file`` is not a
PathLike object. Defaults to 'auto', which is False on Windows
and True everywhere else.
compress (bool | str):
if True, dumps the kwcoco file as a compressed zipfile.
In this case a literal IO file object must be opened in binary
write mode. If auto, then it will default to False unless
it can introspect the file name and the name ends with .zip
Example:
>>> import kwcoco
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('kwcoco/demo/dump').ensuredir()
>>> dset = kwcoco.CocoDataset.demo()
>>> dset.fpath = dpath / 'my_coco_file.json'
>>> # Calling dump writes to the current fpath attribute.
>>> dset.dump()
>>> assert dset.dataset == kwcoco.CocoDataset(dset.fpath).dataset
>>> assert dset.dumps() == dset.fpath.read_text()
>>> #
>>> # Using compress=True can save a lot of space and it
>>> # is transparent when reading files via CocoDataset
>>> dset.dump(compress=True)
>>> assert dset.dataset == kwcoco.CocoDataset(dset.fpath).dataset
>>> assert dset.dumps() != dset.fpath.read_text(errors='replace')
Example:
>>> import kwcoco
>>> import ubelt as ub
>>> # Compression auto-defaults based on the file name.
>>> dpath = ub.Path.appdir('kwcoco/demo/dump').ensuredir()
>>> dset = kwcoco.CocoDataset.demo()
>>> fpath1 = dset.fpath = dpath / 'my_coco_file.zip'
>>> dset.dump()
>>> fpath2 = dset.fpath = dpath / 'my_coco_file.json'
>>> dset.dump()
>>> assert fpath1.read_bytes()[0:8] != fpath2.read_bytes()[0:8]
"""
from kwcoco.util.util_json import coerce_indent
indent = coerce_indent(indent)
if file is None:
file = self.fpath
try:
fpath = os.fspath(file)
except TypeError:
input_was_pathlike = False
else:
input_was_pathlike = True
if compress == 'auto':
compress = False
if not input_was_pathlike:
fpath = getattr(file, 'name', None)
if fpath is not None:
if os.fspath(fpath).endswith('.zip'):
compress = True
mode = 'wb' if compress else 'w'
if input_was_pathlike:
import safer
if temp_file == 'auto':
temp_file = not ub.WIN32
with safer.open(fpath, mode, temp_file=temp_file) as fp:
self._dump(
fp, indent=indent, newlines=newlines, compress=compress)
else:
# We are likely dumping to a real file.
self._dump(
file, indent=indent, newlines=newlines, compress=compress)
[docs]
def _check_json_serializable(self, verbose=1):
"""
Debug which part of a coco dataset might not be json serializable
"""
from kwcoco.util.util_json import find_json_unserializable
bad_parts_gen = find_json_unserializable(self.dataset)
bad_parts = []
for part in bad_parts_gen:
if verbose == 3:
print('part = {!r}'.format(part))
elif verbose and len(bad_parts) == 0:
# print out the first one we find
print('Found at least one bad part = {!r}'.format(part))
bad_parts.append(part)
if verbose:
# if bad_parts:
# print(ub.urepr(bad_parts))
summary = 'There are {} total errors'.format(len(bad_parts))
print('summary = {}'.format(summary))
return bad_parts
[docs]
def _check_integrity(self):
""" perform most checks """
self._check_index()
self._check_pointers()
self._check_json_serializable()
self.validate(img_attrs=False, annot_attrs=False, unique=False)
# assert len(self.missing_images()) == 0
return True
[docs]
def _check_index(self):
"""
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> self._check_index()
>>> # Force a failure
>>> self.index.anns.pop(1)
>>> self.index.anns.pop(2)
>>> import pytest
>>> with pytest.raises(AssertionError):
>>> self._check_index()
"""
# We can verify our index invariants by copying the raw dataset and
# checking if the newly constructed index is the same as this index.
new_dataset = copy.deepcopy(self.dataset)
new = self.__class__(new_dataset, autobuild=False)
new._build_index()
checks = {}
checks['anns'] = self.index.anns == new.index.anns
checks['imgs'] = self.index.imgs == new.index.imgs
checks['cats'] = self.index.cats == new.index.cats
checks['tracks'] = self.index.tracks == new.index.tracks
checks['gid_to_aids'] = self.index.gid_to_aids == new.index.gid_to_aids
checks['cid_to_aids'] = self.index.cid_to_aids == new.index.cid_to_aids
checks['name_to_cat'] = self.index.name_to_cat == new.index.name_to_cat
checks['name_to_img'] = self.index.name_to_img == new.index.name_to_img
checks['file_name_to_img'] = self.index.file_name_to_img == new.index.file_name_to_img
checks['name_to_track'] = self.index.name_to_track == new.index.name_to_track
one_to_many1 = self.index.trackid_to_aids
one_to_many2 = new.index.trackid_to_aids
missing2 = ub.dict_diff(one_to_many1, one_to_many2)
missing1 = ub.dict_diff(one_to_many2, one_to_many1)
common1 = ub.dict_isect(one_to_many1, one_to_many2)
common2 = ub.dict_isect(one_to_many2, one_to_many1)
checks['trackid_to_aids'] = all([
all(len(v) == 0 for v in missing1.values()),
all(len(v) == 0 for v in missing2.values()),
common1 == common2])
checks['vidid_to_gids'] = self.index.vidid_to_gids == new.index.vidid_to_gids
failed_checks = {k: v for k, v in checks.items() if not v}
if any(failed_checks):
raise AssertionError(
'Failed index checks: {}'.format(list(failed_checks)))
return True
[docs]
def _check_pointers(self, verbose=1):
"""
Check that all category and image ids referenced by annotations exist
"""
if not self.index:
raise Exception('Build index before running pointer check')
errors = []
warnings = []
annots = self.dataset['annotations']
iter_ = ub.ProgIter(annots, desc='check annots', enabled=verbose)
for ann in iter_:
aid = ann['id']
cid = ann['category_id']
gid = ann['image_id']
trackid = ann.get('track_id', None)
if cid not in self.index.cats:
if cid is not None:
errors.append('aid={} references bad cid={}'.format(aid, cid))
else:
if self.index.cats[cid]['id'] != cid:
errors.append('cid={} has a bad index'.format(cid))
if gid not in self.index.imgs:
errors.append('aid={} references bad gid={}'.format(aid, gid))
else:
if self.index.imgs[gid]['id'] != gid:
errors.append('gid={} has a bad index'.format(gid))
if trackid is not None:
if trackid not in self.index.tracks:
warnings.append('aid={} references bad track_id={}'.format(aid, trackid))
iter_ = ub.ProgIter(self.dataset['images'], desc='check images', enabled=verbose)
for img in iter_:
gid = img['id']
vidid = img.get('video_id', None)
if vidid is not None:
if vidid not in self.index.videos:
# Dont make this an error because a video dictionary is not
# strictly necessary for images to be linked via videos.
warnings.append('gid={} references bad video_id={}'.format(gid, vidid))
elif self.index.videos[vidid]['id'] != vidid:
errors.append('video_id={} has a bad index'.format(vidid))
if 'categories' in self.dataset:
iter_ = ub.ProgIter(self.dataset['categories'], desc='check categories', enabled=verbose)
for category in iter_:
category_id = category['id']
if self.index.cats[category_id]['id'] != category_id:
errors.append(f'category_id={category_id} has a bad index')
if 'tracks' in self.dataset:
iter_ = ub.ProgIter(self.dataset['tracks'], desc='check tracks', enabled=verbose)
for track in iter_:
track_id = track['id']
if self.index.tracks[track_id]['id'] != track_id:
errors.append(f'track_id={track_id} has a bad index')
if 'videos' in self.dataset:
iter_ = ub.ProgIter(self.dataset['videos'], desc='check videos', enabled=verbose)
for video in iter_:
video_id = video['id']
if self.index.videos[video_id]['id'] != video_id:
errors.append(f'video_id={video_id} has a bad index')
if errors:
raise Exception('\n'.join(errors))
elif verbose:
print('Pointers are consistent')
return True
[docs]
def _build_index(self):
self.index.build(self)
[docs]
def union(*others, disjoint_tracks=True, remember_parent=False, **kwargs):
"""
Merges multiple :class:`CocoDataset` items into one. Names and
associations are retained, but ids may be different.
Args:
*others : a series of CocoDatasets that we will merge.
Note, if called as an instance method, the "self" instance
will be the first item in the "others" list. But if called
like a classmethod, "others" will be empty by default.
disjoint_tracks (bool):
if True, we will assume track-names are disjoint and if two
datasets share the same track-name, we will disambiguate them.
Otherwise they will be copied over as-is. Defaults to True.
In most cases you do not want to set this to False.
remember_parent (bool):
if True, videos and images will save information about their
parent in the "union_parent" field.
**kwargs : constructor options for the new merged CocoDataset
Returns:
kwcoco.CocoDataset: a new merged coco dataset
CommandLine:
xdoctest -m kwcoco.coco_dataset CocoDataset.union
Example:
>>> import kwcoco
>>> # Test union works with different keypoint categories
>>> dset1 = kwcoco.CocoDataset.demo('shapes1')
>>> dset2 = kwcoco.CocoDataset.demo('shapes2')
>>> dset1.remove_keypoint_categories(['bot_tip', 'mid_tip', 'right_eye'])
>>> dset2.remove_keypoint_categories(['top_tip', 'left_eye'])
>>> dset_12a = kwcoco.CocoDataset.union(dset1, dset2)
>>> dset_12b = dset1.union(dset2)
>>> dset_21 = dset2.union(dset1)
>>> def add_hist(h1, h2):
>>> return {k: h1.get(k, 0) + h2.get(k, 0) for k in set(h1) | set(h2)}
>>> kpfreq1 = dset1.keypoint_annotation_frequency()
>>> kpfreq2 = dset2.keypoint_annotation_frequency()
>>> kpfreq_want = add_hist(kpfreq1, kpfreq2)
>>> kpfreq_got1 = dset_12a.keypoint_annotation_frequency()
>>> kpfreq_got2 = dset_12b.keypoint_annotation_frequency()
>>> assert kpfreq_want == kpfreq_got1
>>> assert kpfreq_want == kpfreq_got2
>>> # Test disjoint gid datasets
>>> dset1 = kwcoco.CocoDataset.demo('shapes3')
>>> for new_gid, img in enumerate(dset1.dataset['images'], start=10):
>>> for aid in dset1.gid_to_aids[img['id']]:
>>> dset1.anns[aid]['image_id'] = new_gid
>>> img['id'] = new_gid
>>> dset1.index.clear()
>>> dset1._build_index()
>>> # ------
>>> dset2 = kwcoco.CocoDataset.demo('shapes2')
>>> for new_gid, img in enumerate(dset2.dataset['images'], start=100):
>>> for aid in dset2.gid_to_aids[img['id']]:
>>> dset2.anns[aid]['image_id'] = new_gid
>>> img['id'] = new_gid
>>> dset1.index.clear()
>>> dset2._build_index()
>>> others = [dset1, dset2]
>>> merged = kwcoco.CocoDataset.union(*others)
>>> print('merged = {!r}'.format(merged))
>>> print('merged.imgs = {}'.format(ub.urepr(merged.imgs, nl=1)))
>>> assert set(merged.imgs) & set([10, 11, 12, 100, 101]) == set(merged.imgs)
>>> # Test data is not preserved
>>> dset2 = kwcoco.CocoDataset.demo('shapes2')
>>> dset1 = kwcoco.CocoDataset.demo('shapes3')
>>> others = (dset1, dset2)
>>> cls = self = kwcoco.CocoDataset
>>> merged = cls.union(*others)
>>> print('merged = {!r}'.format(merged))
>>> print('merged.imgs = {}'.format(ub.urepr(merged.imgs, nl=1)))
>>> assert set(merged.imgs) & set([1, 2, 3, 4, 5]) == set(merged.imgs)
>>> # Test track-ids are mapped correctly
>>> dset1 = kwcoco.CocoDataset.demo('vidshapes1')
>>> dset2 = kwcoco.CocoDataset.demo('vidshapes2')
>>> dset3 = kwcoco.CocoDataset.demo('vidshapes3')
>>> others = (dset1, dset2, dset3)
>>> for dset in others:
>>> [a.pop('segmentation', None) for a in dset.index.anns.values()]
>>> [a.pop('keypoints', None) for a in dset.index.anns.values()]
>>> cls = self = kwcoco.CocoDataset
>>> merged = cls.union(*others, disjoint_tracks=1)
>>> print('dset1.anns = {}'.format(ub.urepr(dset1.anns, nl=1)))
>>> print('dset2.anns = {}'.format(ub.urepr(dset2.anns, nl=1)))
>>> print('dset3.anns = {}'.format(ub.urepr(dset3.anns, nl=1)))
>>> print('merged.anns = {}'.format(ub.urepr(merged.anns, nl=1)))
Example:
>>> import kwcoco
>>> # Test empty union
>>> empty_union = kwcoco.CocoDataset.union()
>>> assert len(empty_union.index.imgs) == 0
TODO:
- [ ] are supercategories broken?
- [ ] reuse image ids where possible
- [ ] reuse annotation / category ids where possible
- [X] handle case where no inputs are given
- [x] disambiguate track-ids
- [x] disambiguate video-ids
"""
from os.path import normpath
# Dev Note:
# See ~/misc/tests/python/test_multiarg_classmethod.py
# for tests for how to correctly implement this method such that it can
# behave as a method or a classmethod
if len(others) > 0:
cls = type(others[0])
else:
cls = CocoDataset
# TODO: add an option such that the union will fail if the names
# are not already disjoint. Alternatively, it could be the case
# that a union between images with the same name really does
# mean that they are the same image.
unique_img_names = UniqueNameRemapper()
unique_video_names = UniqueNameRemapper()
unique_track_names = UniqueNameRemapper()
def update_ifnotin(d1, d2):
""" copies keys from d2 that doent exist in d1 into d1 """
for k, v in d2.items():
if k not in d1:
d1[k] = v
return d1
def _has_duplicates(items):
seen = set()
for item in items:
if item in seen:
return True
seen.add(item)
return False
def _coco_union(relative_dsets, common_root):
""" union of dictionary based data structure """
# TODO: rely on subset of SPEC keys
merged = _dict([
('licenses', []),
('info', []),
('categories', []),
('videos', []),
('images', []),
('tracks', []),
('annotations', []),
])
# TODO: need to handle keypoint_categories
merged_cat_name_to_id = {}
merged_kp_name_to_id = {}
# Check if the image-ids are unique and can be preserved
_all_imgs = (img for _, _, d in relative_dsets for img in d['images'])
_all_gids = (img['id'] for img in _all_imgs)
preserve_gids = not _has_duplicates(_all_gids)
# Check if the video-ids are unique and can be preserved
_all_videos = (video for _, _, d in relative_dsets
for video in (d.get('videos', None) or []))
_all_vidids = (video['id'] for video in _all_videos)
preserve_vidids = not _has_duplicates(_all_vidids)
_all_tracks = (track for _, _, d in relative_dsets
for track in (d.get('tracks', None) or []))
_all_trackids = (track['id'] for track in _all_tracks)
preserve_trackids = not _has_duplicates(_all_trackids)
if not disjoint_tracks:
# Only relevant if we are not changing track names
merged_track_name_to_id = {}
# Only useful while we still support the case where a track-ids
# doesn't have an associated track.
hacked_track_id_map = _ID_Remapper(reuse=False)
# If disjoint_tracks is True keep track of track-ids we've seen in
# so far in previous datasets and ensure we dont reuse them.
# TODO: do this Remapper class with other ids?
# track_id_map = _ID_Remapper(reuse=False)
for subdir, old_fpath, old_dset in relative_dsets:
# Create temporary indexes to map from old to new
cat_id_map = {None: None}
img_id_map = {}
video_id_map = {}
track_id_map = {}
kpcat_id_map = {}
# Add the licenses / info into the merged dataset
# Licenses / info are unused in our datas, so this might not be
# correct
merged['licenses'].extend(old_dset.get('licenses', []))
merged['info'].extend(old_dset.get('info', []))
# Add the categories into the merged dataset
for old_cat in old_dset['categories']:
new_id = merged_cat_name_to_id.get(old_cat['name'], None)
# The same category might exist in different datasets.
if new_id is None:
# Only add if it does not yet exist
new_id = len(merged_cat_name_to_id) + 1
merged_cat_name_to_id[old_cat['name']] = new_id
new_cat = _dict([
('id', new_id),
('name', old_cat['name']),
# ('supercategory', old_cat['supercategory']),
])
update_ifnotin(new_cat, old_cat)
merged['categories'].append(new_cat)
cat_id_map[old_cat['id']] = new_id
# Add the keypoint categories into the merged dataset
if 'keypoint_categories' in old_dset:
if 'keypoint_categories' not in merged:
merged['keypoint_categories'] = []
old_id_to_name = {k['id']: k['name']
for k in old_dset['keypoint_categories']}
postproc_kpcats = []
for old_kpcat in old_dset['keypoint_categories']:
new_id = merged_kp_name_to_id.get(old_kpcat['name'], None)
# The same kpcategory might exist in different datasets.
if new_id is None:
# Only add if it does not yet exist
new_id = len(merged_kp_name_to_id) + 1
merged_kp_name_to_id[old_kpcat['name']] = new_id
new_kpcat = _dict([
('id', new_id),
('name', old_kpcat['name']),
])
update_ifnotin(new_kpcat, old_kpcat)
old_reflect_id = new_kpcat.get('reflection_id', None)
if old_reflect_id is not None:
# Temporarilly overwrite reflectid with name
reflect_name = old_id_to_name.get(old_reflect_id, None)
new_kpcat['reflection_id'] = reflect_name
postproc_kpcats.append(new_kpcat)
merged['keypoint_categories'].append(new_kpcat)
kpcat_id_map[old_kpcat['id']] = new_id
# Fix reflection ids
for kpcat in postproc_kpcats:
reflect_name = kpcat['reflection_id']
new_reflect_id = merged_kp_name_to_id.get(reflect_name, None)
kpcat['reflection_id'] = new_reflect_id
# Add the tracks into the merged dataset
for old_track in old_dset.get('tracks', []):
if preserve_trackids:
new_id = old_track['id']
else:
new_id = len(merged['tracks']) + 1
if disjoint_tracks:
# Tracks with the same name are considered distinct
# and given new non-conflicting names
new_trackname = unique_track_names.remap(old_track['name'])
new_track = _dict([
('id', new_id),
('name', new_trackname),
])
# copy over other metadata
update_ifnotin(new_track, old_track)
track_id_map[old_track['id']] = new_track['id']
if remember_parent:
new_track['union_parent'] = old_fpath
merged['tracks'].append(new_track)
else:
# Tracks with the same name are considered the same
# across datasets.
if old_track['name'] not in unique_track_names:
new_trackname = unique_track_names.remap(old_track['name'])
new_track = _dict([
('id', new_id),
('name', new_trackname),
])
# copy over other metadata
update_ifnotin(new_track, old_track)
track_id_map[old_track['id']] = new_track['id']
if remember_parent:
new_track['union_parent'] = old_fpath
merged['tracks'].append(new_track)
merged_track_name_to_id[new_trackname] = new_id
else:
new_id = merged_track_name_to_id[old_track['name']]
track_id_map[old_track['id']] = new_id
# Add the tracks into the merged dataset
for old_video in old_dset.get('videos', []):
if preserve_vidids:
new_id = old_video['id']
else:
new_id = len(merged['videos']) + 1
new_vidname = unique_video_names.remap(old_video['name'])
new_video = _dict([
('id', new_id),
('name', new_vidname),
])
# copy over other metadata
update_ifnotin(new_video, old_video)
video_id_map[old_video['id']] = new_video['id']
if remember_parent:
new_video['union_parent'] = old_fpath
merged['videos'].append(new_video)
# Add the images into the merged dataset
for old_img in old_dset['images']:
if preserve_gids:
new_id = old_img['id']
else:
new_id = len(merged['images']) + 1
old_gname = old_img['file_name']
new_gname = None if old_gname is None else (
join(subdir, old_gname)
)
new_img = _dict([
('id', new_id),
('file_name', new_gname),
])
old_name = old_img.get('name', None)
if old_name is not None:
new_name = unique_img_names.remap(old_name)
new_img['name'] = new_name
if 'auxiliary' in old_img:
new_auxiliary = []
for old_aux in old_img['auxiliary']:
new_aux = old_aux.copy()
new_aux['file_name'] = join(subdir, old_aux['file_name'])
new_auxiliary.append(new_aux)
new_img['auxiliary'] = new_auxiliary
if 'assets' in old_img:
new_auxiliary = []
for old_aux in old_img['assets']:
new_aux = old_aux.copy()
new_aux['file_name'] = join(subdir, old_aux['file_name'])
new_auxiliary.append(new_aux)
new_img['assets'] = new_auxiliary
video_img_id = video_id_map.get(old_img.get('video_id'), None)
if video_img_id is not None:
new_img['video_id'] = video_img_id
# copy over other metadata
update_ifnotin(new_img, old_img)
img_id_map[old_img['id']] = new_img['id']
if remember_parent:
new_img['union_parent'] = old_fpath
merged['images'].append(new_img)
# Add the annotations into the merged dataset
for old_annot in old_dset['annotations']:
old_cat_id = old_annot['category_id']
old_img_id = old_annot['image_id']
new_cat_id = cat_id_map.get(old_cat_id, ub.NoParam)
new_img_id = img_id_map.get(old_img_id, None)
if new_cat_id is ub.NoParam:
# NOTE: category_id is allowed to be None
warnings.warn(f'annot {old_annot} in {subdir} has bad category-id {old_cat_id}')
if new_img_id is None:
warnings.warn(f'annot {old_annot} in {subdir} has bad image-id {old_img_id}')
new_annot = _dict([
('id', len(merged['annotations']) + 1),
('image_id', new_img_id),
('category_id', new_cat_id),
])
old_track_id = old_annot.get('track_id')
if old_track_id is not None:
new_track_id = track_id_map.get(old_track_id, None)
if new_track_id is None:
warnings.warn(ub.paragraph(
f'''
annot {old_annot} in {subdir} has track_id
{old_track_id} that does not have an associated
entry in the tracks table
'''))
if disjoint_tracks:
new_track_id = hacked_track_id_map.remap(old_track_id)
else:
new_track_id = old_track_id
new_annot['track_id'] = new_track_id
# if disjoint_tracks:
# old_track_id = old_annot.get('track_id', None)
# if old_track_id is not None:
# new_track_id = track_id_map.remap(old_track_id)
# new_annot['track_id'] = new_track_id
update_ifnotin(new_annot, old_annot)
if kpcat_id_map:
# Need to copy keypoint dict to not clobber original
# dset
if 'keypoints' in new_annot:
old_keypoints = new_annot['keypoints']
new_keypoints = copy.deepcopy(old_keypoints)
for kp in new_keypoints:
kp['keypoint_category_id'] = kpcat_id_map.get(
kp['keypoint_category_id'], None)
new_annot['keypoints'] = new_keypoints
merged['annotations'].append(new_annot)
# Mark that we are not allowed to use the same hacked track-ids
# again
if disjoint_tracks:
hacked_track_id_map.block_seen()
return merged
# New behavior is simplified and I believe it is correct
def longest_common_prefix(items, sep='/'):
"""
Example:
>>> items = [
>>> '/foo/bar/always/the/same/set1/img1.png',
>>> '/foo/bar/always/the/same/set1/img2.png',
>>> '/foo/bar/always/the/same/set2/img1.png',
>>> '/foo/bar/always/the/same/set2/img2.png',
>>> '/foo/baz/file1.txt',
>>> ]
>>> sep = '/'
>>> longest_common_prefix(items, sep=sep)
>>> longest_common_prefix(items[:-1], sep=sep)
"""
# I would use a trie, but I don't know if pygtrie can do this efficiently
# (not that this is efficient)
freq = defaultdict(lambda: 0)
for item in items:
path = tuple(item.split(sep))
for i in range(len(path)):
prefix = path[:i + 1]
freq[prefix] += 1
# Find the longest common prefix
if len(freq) == 0:
longest_prefix = ''
else:
value, freq = max(freq.items(), key=lambda kv: (kv[1], len(kv[0])))
longest_prefix = sep.join(value)
return longest_prefix
dset_roots = [dset.bundle_dpath for dset in others]
dset_roots = [normpath(r) if r is not None else None
for r in dset_roots]
items = [join('.', p) for p in dset_roots]
common_root = longest_common_prefix(items, sep=os.path.sep)
relative_dsets = [
(relpath(normpath(d.bundle_dpath), common_root),
str(d.fpath),
d.dataset) for d in others]
merged = _coco_union(relative_dsets, common_root)
kwargs['bundle_dpath'] = common_root
new_dset = cls(merged, **kwargs)
return new_dset
[docs]
def subset(self, gids, copy=False, autobuild=True):
"""
Return a subset of the larger coco dataset by specifying which images
to port. All annotations in those images will be taken.
Args:
gids (List[int]):
image-ids to copy into a new dataset
copy (bool):
if True, makes a deep copy of all nested attributes, otherwise
makes a shallow copy. Defaults to True.
autobuild (bool):
if True will automatically build the fast lookup index.
Defaults to True.
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> gids = [1, 3]
>>> sub_dset = self.subset(gids)
>>> assert len(self.index.gid_to_aids) == 3
>>> assert len(sub_dset.gid_to_aids) == 2
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('vidshapes2')
>>> gids = [1, 2]
>>> sub_dset = self.subset(gids, copy=True)
>>> assert len(sub_dset.index.videos) == 1
>>> assert len(self.index.videos) == 2
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> sub1 = self.subset([1])
>>> sub2 = self.subset([2])
>>> sub3 = self.subset([3])
>>> others = [sub1, sub2, sub3]
>>> rejoined = kwcoco.CocoDataset.union(*others)
>>> assert len(sub1.anns) == 9
>>> assert len(sub2.anns) == 2
>>> assert len(sub3.anns) == 0
>>> assert rejoined.basic_stats() == self.basic_stats()
"""
new_dataset = _dict([(k, []) for k in self.dataset])
new_dataset['categories'] = self.dataset['categories']
new_dataset['info'] = self.dataset.get('info', [])
new_dataset['licenses'] = self.dataset.get('licenses', [])
chosen_gids = sorted(set(gids))
chosen_imgs = list(ub.take(self.imgs, chosen_gids))
new_dataset['images'] = chosen_imgs
if 'keypoint_categories' in self.dataset:
new_dataset['keypoint_categories'] = self.dataset['keypoint_categories']
if 'videos' in self.dataset:
# TODO: Take only videos with image support?
vidids = sorted(set(img.get('video_id', None)
for img in chosen_imgs) - {None})
chosen_vids = list(ub.take(self.index.videos, vidids))
new_dataset['videos'] = chosen_vids
sub_aids = sorted([aid for gid in chosen_gids
for aid in self.index.gid_to_aids.get(gid, [])])
new_dataset['annotations'] = list(ub.take(self.index.anns, sub_aids))
new_dataset['img_root'] = self.dataset.get('img_root', None)
if copy:
from copy import deepcopy
new_dataset = deepcopy(new_dataset)
sub_dset = CocoDataset(new_dataset, bundle_dpath=self.bundle_dpath,
autobuild=autobuild)
return sub_dset
[docs]
def view_sql(self, force_rewrite=False, memory=False, backend='sqlite',
sql_db_fpath=None):
"""
Create a cached SQL interface to this dataset suitable for large scale
multiprocessing use cases.
Args:
force_rewrite (bool):
if True, forces an update to any existing cache file on disk
memory (bool):
if True, the database is constructed in memory.
backend (str): sqlite or postgresql
sql_db_fpath (str | PathLike | None): overrides the database uri
Note:
This view cache is experimental and currently depends on the
timestamp of the file pointed to by ``self.fpath``. In other words
dont use this on in-memory datasets.
CommandLine:
KWCOCO_WITH_POSTGRESQL=1 xdoctest -m /home/joncrall/code/kwcoco/kwcoco/coco_dataset.py CocoDataset.view_sql
Example:
>>> # xdoctest: +REQUIRES(module:sqlalchemy)
>>> # xdoctest: +REQUIRES(env:KWCOCO_WITH_POSTGRESQL)
>>> # xdoctest: +REQUIRES(module:psycopg2)
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes32')
>>> postgres_dset = dset.view_sql(backend='postgresql', force_rewrite=True)
>>> sqlite_dset = dset.view_sql(backend='sqlite', force_rewrite=True)
>>> list(dset.anns.keys())
>>> list(postgres_dset.anns.keys())
>>> list(sqlite_dset.anns.keys())
Ignore:
import timerit
ti = timerit.Timerit(100, bestof=10, verbose=2)
for timer in ti.reset('dct_dset'):
dset.annots().detections
for timer in ti.reset('postgresql'):
postgres_dset.annots().detections
for timer in ti.reset('sqlite'):
sqlite_dset.annots().detections
ub.udict(sql_dset.annots().objs[0]) - {'segmentation'}
ub.udict(dct_dset.annots().objs[0]) - {'segmentation'}
"""
from kwcoco.coco_sql_dataset import cached_sql_coco_view
if sql_db_fpath is None:
if memory:
sql_db_fpath = ':memory:'
sql_dset = cached_sql_coco_view(dset=self, sql_db_fpath=sql_db_fpath,
force_rewrite=force_rewrite,
backend=backend)
return sql_dset
[docs]
def demo_coco_data():
"""
Simple data for testing.
This contains several non-standard fields, which help ensure robustness of
functions tested with this data. For more compliant demodata see the
``kwcoco.demodata`` submodule.
Example:
>>> # xdoctest: +REQUIRES(--show)
>>> import kwcoco
>>> from kwcoco.coco_dataset import demo_coco_data
>>> dataset = demo_coco_data()
>>> self = kwcoco.CocoDataset(dataset, tag='demo')
>>> import kwplot
>>> kwplot.autompl()
>>> self.show_image(gid=1)
>>> kwplot.show_if_requested()
"""
import kwimage
from kwimage.im_demodata import _TEST_IMAGES
from os.path import commonprefix
# FIXME: be robust to broken urls
test_imgs_keys = ['astro', 'carl', 'stars']
urls = {k: _TEST_IMAGES[k]['url'] for k in test_imgs_keys}
gpaths = {k: kwimage.grab_test_image_fpath(k) for k in test_imgs_keys}
img_root = commonprefix(list(gpaths.values()))
gpath1, gpath2, gpath3 = ub.take(gpaths, test_imgs_keys)
url1, url2, url3 = ub.take(urls, test_imgs_keys)
# Make file names relative for consistent testing purpose
gname1 = relpath(gpath1, img_root)
gname2 = relpath(gpath2, img_root)
gname3 = relpath(gpath3, img_root)
dataset = {
'img_root': img_root,
'categories': [
{
'id': 1, 'name': 'astronaut',
'supercategory': 'human',
},
{'id': 2, 'name': 'rocket', 'supercategory': 'object'},
{'id': 3, 'name': 'helmet', 'supercategory': 'object'},
{
'id': 4, 'name': 'mouth',
'supercategory': 'human',
'keypoints': [
'mouth-right-corner', 'mouth-right-bot',
'mouth-left-bot', 'mouth-left-corner',
],
'skeleton': [[0, 1]],
},
{
'id': 5, 'name': 'star',
'supercategory': 'object',
'keypoints': ['star-center'],
'skeleton': [],
},
{'id': 6, 'name': 'astronomer', 'supercategory': 'human'},
{'id': 7, 'name': 'astroturf', 'supercategory': 'object'},
{
'id': 8, 'name': 'human',
'keypoints': ['left-eye', 'right-eye'],
'skeleton': [[0, 1]],
},
],
'images': [
{'id': 1, 'file_name': gname1, 'url': url1},
{'id': 2, 'file_name': gname2, 'url': url2},
{'id': 3, 'file_name': gname3, 'url': url3},
],
'annotations': [
{'id': 1, 'image_id': 1, 'category_id': 1,
'bbox': [10, 10, 360, 490],
'keypoints': [247, 101, 2, 202, 100, 2],
'segmentation': [[
40, 509, 26, 486, 20, 419, 28, 334, 51, 266, 85, 229, 102,
216, 118, 197, 125, 176, 148, 151, 179, 147, 182, 134, 174,
128, 166, 115, 156, 94, 155, 64, 162, 48, 193, 34, 197, 26,
210, 21, 231, 14, 265, 24, 295, 49, 300, 90, 297, 111, 280,
126, 277, 132, 266, 137, 264, 152, 255, 164, 256, 174, 283,
195, 301, 220, 305, 234, 338, 262, 350, 286, 360, 326, 363,
351, 324, 369, 292, 404, 280, 448, 276, 496, 280, 511]],
},
{'id': 2, 'image_id': 1, 'category_id': 2,
'bbox': [350, 5, 130, 290]},
{'id': 3, 'image_id': 1, 'category_id': 3,
'line': [326, 369, 500, 500]},
{'id': 4, 'image_id': 1, 'category_id': 4,
'keypoints': [
202, 139, 2, 215, 150, 2, 229, 150, 2, 244, 142, 2,
]},
{'id': 5, 'image_id': 1, 'category_id': 5,
'keypoints': [37, 65, 1]},
{'id': 6, 'image_id': 1, 'category_id': 5,
'keypoints': [37, 16, 1]},
{'id': 7, 'image_id': 1, 'category_id': 5,
'keypoints': [3, 9, 1]},
{'id': 8, 'image_id': 1, 'category_id': 5,
'keypoints': [2, 111, 1]},
{'id': 9, 'image_id': 1, 'category_id': 5,
'keypoints': [2, 60, 1]},
{'id': 10, 'image_id': 2, 'category_id': 6,
'bbox': [37, 6, 230, 240]},
{'id': 11, 'image_id': 2, 'category_id': 4,
'bbox': [156, 130, 45, 18]}
],
'licenses': [],
'info': [],
}
return dataset
if __name__ == '__main__':
r"""
CommandLine:
xdoctest kwcoco.coco_dataset all
"""
import xdoctest
xdoctest.doctest_module(__file__)