# -*- coding: utf-8 -*-
"""
An implementation and extension of the original MS-COCO API [1]_.
Extends the format to also include line annotations.
Dataset Spec:
* Note: a formal spec has been defined in
.. code::
category = {
'id': int,
'name': str,
'supercategory': Optional[str],
'keypoints': Optional(List[str]),
'skeleton': Optional(List[Tuple[Int, Int]]),
}
image = {
'id': int,
'file_name': str
}
dataset = {
# these are object level categories
'categories': [category],
'images': [image]
...
],
'annotations': [
{
'id': Int,
'image_id': Int,
'category_id': Int,
'track_id': Optional[Int],
'bbox': [tl_x, tl_y, w, h], # optional (xywh format)
"score" : float, # optional
"prob" : List[float], # optional
"weight" : float, # optional
"caption": str, # an optional text caption for this annotation
"iscrowd" : <0 or 1>, # denotes if the annotation covers a single object (0) or multiple objects (1)
"keypoints" : [x1,y1,v1,...,xk,yk,vk], # or new dict-based format
'segmentation': <RunLengthEncoding | Polygon>, # formats are defined bellow
},
...
],
'licenses': [],
'info': [],
}
Polygon:
A flattned list of xy coordinates.
[x1, y1, x2, y2, ..., xn, yn]
or a list of flattned list of xy coordinates if the CCs are disjoint
[[x1, y1, x2, y2, ..., xn, yn], [x1, y1, ..., xm, ym],]
Note: the original coco spec does not allow for holes in polygons.
We also allow a non-standard dictionary encoding of polygons
{'exterior': [(x1, y1)...],
'interiors': [[(x1, y1), ...], ...]}
RunLengthEncoding:
The RLE can be in a special bytes encoding or in a binary array
encoding. We reuse the original C functions are in [2]_ in
``kwimage.structs.Mask`` to provide a convinient way to abstract this
rather esoteric bytes encoding.
For pure python implementations see kwimage:
Converting from an image to RLE can be done via kwimage.run_length_encoding
Converting from RLE back to an image can be done via:
kwimage.decode_run_length
For compatibility with the COCO specs ensure the binary flags
for these functions are set to true.
Keypoints:
Annotation keypoints may also be specified in this non-standard (but
ultimately more general) way:
'annotations': [
{
'keypoints': [
{
'xy': <x1, y1>,
'visible': <0 or 1 or 2>,
'keypoint_category_id': <kp_cid>,
'keypoint_category': <kp_name, optional>, # this can be specified instead of an id
}, ...
]
}, ...
],
'keypoint_categories': [{
'name': <str>,
'id': <int>, # an id for this keypoint category
'supercategory': <kp_name> # name of coarser parent keypoint class (for hierarchical keypoints)
'reflection_id': <kp_cid> # specify only if the keypoint id would be swapped with another keypoint type
},...
]
In this scheme the "keypoints" property of each annotation (which used
to be a list of floats) is now specified as a list of dictionaries that
specify each keypoints location, id, and visibility explicitly. This
allows for things like non-unique keypoints, partial keypoint
annotations. This also removes the ordering requirement, which makes it
simpler to keep track of each keypoints class type.
We also have a new top-level dictionary to specify all the possible
keypoint categories.
Auxillary Channels:
For multimodal or multispectral images it is possible to specify
auxillary channels in an image dictionary as follows:
{
'id': int, 'file_name': str
'channels': <spec>, # a spec code that indicates the layout of these channels.
'auxillary': [ # information about auxillary channels
{
'file_name':
'channels': <spec>
}, ... # can have many auxillary channels with unique specs
]
}
Video Sequences:
For video sequences, we add the following video level index:
"videos": [
{ "id": <int>, "name": <video_name:str> },
]
Note that the videos might be given as encoded mp4/avi/etc.. files (in
which case the name should correspond to a path) or as a series of
frames in which case the images should be used to index the extracted
frames and information in them.
Then image dictionaries are augmented as follows:
{
'video_id': str # optional, if this image is a frame in a video sequence, this id is shared by all frames in that sequence.
'timestamp': int # optional, timestamp (ideally in flicks), used to identify the timestamp of the frame. Only applicable video inputs.
'frame_index': int # optional, ordinal frame index which can be used if timestamp is unknown.
}
And annotations are augmented as follows:
{
"track_id": <int | str | uuid> # optional, indicates association between annotations across frames
}
Notes:
The main object in this file is class:`CocoDataset`, which is composed of
several mixin classes. See the class and method documentation for more
details.
TODO:
- [ ] Use ijson to lazilly load pieces of the dataset in the background or
on demand. This will give us faster access to categories / images,
whereas we will always have to wait for annotations etc...
- [ ] Should img_root be changed to data root?
- [ ] Read video data, return numpy arrays (requires API for images)
- [ ] Spec for video URI, and convert to frames @ framerate function.
- [ ] remove videos
References:
.. [1] http://cocodataset.org/#format-data
.. [2] https://github.com/nightrome/cocostuffapi/blob/master/PythonAPI/pycocotools/mask.py
.. [3] https://www.immersivelimit.com/tutorials/create-coco-annotations-from-scratch/#coco-dataset-format
"""
from __future__ import absolute_import, division, print_function, unicode_literals
import warnings
from os.path import dirname
from os.path import splitext
from os.path import basename
from os.path import join
from os.path import exists
from collections import OrderedDict, defaultdict
import json
import numpy as np
import ubelt as ub
import six
import itertools as it
from six.moves import cStringIO as StringIO
import copy
# Does having __all__ prevent readthedocs from building mixins?
# __all__ = [
# 'CocoDataset',
# ]
_dict = OrderedDict
# These are the keys that are / should be supported by the API
SPEC_KEYS = [
'info',
'licenses',
'categories',
'keypoint_categories', # support only partially implemented
'videos', # support only partially implemented
'images',
'annotations',
]
INT_TYPES = (int, np.integer)
def _annot_type(ann):
"""
Returns what type of annotation ``ann`` is.
"""
return tuple(sorted(set(ann) & {'bbox', 'line', 'keypoints'}))
[docs]class ObjectList1D(ub.NiceRepr):
"""
Vectorized access to lists of dictionary objects
Lightweight reference to a set of object (e.g. annotations, images) that
allows for convenient property access.
Args:
ids (List[int]): list of ids
dset (CocoDataset): parent dataset
key (str): main object name (e.g. 'images', 'annotations')
Types:
ObjT = Ann | Img | Cat # can be one of these types
ObjectList1D gives us access to a List[ObjT]
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo()
>>> # Both annots and images are object lists
>>> self = dset.annots()
>>> self = dset.images()
>>> # can call with a list of ids or not, for everything
>>> self = dset.annots([1, 2, 11])
>>> self = dset.images([1, 2, 3])
>>> self.lookup('id')
>>> self.lookup(['id'])
"""
def __init__(self, ids, dset, key):
self._key = key
self._ids = ids
self._dset = dset
def __nice__(self):
return 'num={!r}'.format(len(self))
def __iter__(self):
return iter(self._ids)
def __len__(self):
return len(self._ids)
@property
def _id_to_obj(self):
return self._dset.index._id_lookup[self._key]
@property
def objs(self):
"""
Returns:
List: all object dictionaries
"""
return list(ub.take(self._id_to_obj, self._ids))
[docs] def take(self, idxs):
"""
Take a subset by index
Example:
>>> self = CocoDataset.demo().annots()
>>> assert len(self.take([0, 2, 3])) == 3
"""
subids = list(ub.take(self._ids, idxs))
newself = self.__class__(subids, self._dset)
return newself
[docs] def compress(self, flags):
"""
Take a subset by flags
Example:
>>> self = CocoDataset.demo().images()
>>> assert len(self.compress([True, False, True])) == 2
"""
subids = list(ub.compress(self._ids, flags))
newself = self.__class__(subids, self._dset)
return newself
[docs] def peek(self):
"""
Return the first object dictionary
"""
return ub.peek(self._id_to_obj.values())
[docs] def lookup(self, key, default=ub.NoParam, keepid=False):
"""
Lookup a list of object attributes
Args:
key (str | Iterable): name of the property you want to lookup
can also be a list of names, in which case we return a dict
default : if specified, uses this value if it doesn't exist
in an ObjT.
keepid: if True, return a mapping from ids to the property
Returns:
List[ObjT]: a list of whatever type the object is
Dict[str, ObjT]
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo()
>>> self = dset.annots()
>>> self.lookup('id')
>>> key = ['id']
>>> default = None
>>> self.lookup(key=['id', 'image_id'])
>>> self.lookup(key=['id', 'image_id'])
>>> self.lookup(key='foo', default=None, keepid=True)
>>> self.lookup(key=['foo'], default=None, keepid=True)
>>> self.lookup(key=['id', 'image_id'], keepid=True)
"""
# Note: while the old _lookup code was slightly faster than this, the
# difference is extremely negligable (179us vs 178us).
if ub.iterable(key):
return {k: self.lookup(k, default, keepid) for k in key}
else:
return self.get(key, default=default, keepid=keepid)
[docs] def get(self, key, default=ub.NoParam, keepid=False):
"""
Lookup a list of object attributes
Args:
key (str): name of the property you want to lookup
default : if specified, uses this value if it doesn't exist
in an ObjT.
keepid: if True, return a mapping from ids to the property
Returns:
List[ObjT]: a list of whatever type the object is
Dict[str, ObjT]
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo()
>>> self = dset.annots()
>>> self.get('id')
>>> self.get(key='foo', default=None, keepid=True)
"""
_lut = self._id_to_obj
if keepid:
if default is ub.NoParam:
attr_list = {_id: _lut[_id][key] for _id in self._ids}
else:
attr_list = {_id: _lut[_id].get(key, default) for _id in self._ids}
else:
if default is ub.NoParam:
attr_list = [_lut[_id][key] for _id in self._ids]
else:
attr_list = [_lut[_id].get(key, default) for _id in self._ids]
return attr_list
[docs] def set(self, key, values):
"""
Assign a value to each annotation
Args:
key (str): the annotation property to modify
values (Iterable | scalar): an iterable of values to set for each
annot in the dataset. If the item is not iterable, it is
assigned to all objects.
Example:
>>> dset = CocoDataset.demo()
>>> self = dset.annots()
>>> self.set('my-key1', 'my-scalar-value')
>>> self.set('my-key2', np.random.rand(len(self)))
>>> print('dset.imgs = {}'.format(ub.repr2(dset.imgs, nl=1)))
>>> self.get('my-key2')
"""
if not ub.iterable(values):
values = [values] * len(self)
elif not isinstance(values, list):
values = list(values)
assert len(self) == len(values)
self._set(key, values)
def _set(self, key, values):
""" faster less safe version of set """
objs = ub.take(self._id_to_obj, self._ids)
for obj, value in zip(objs, values):
obj[key] = value
def _lookup(self, key, default=ub.NoParam):
"""
Benchmark:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('shapes256')
>>> self = annots = dset.annots()
>>> import timerit
>>> ti = timerit.Timerit(100, bestof=10, verbose=2)
for timer in ti.reset('lookup'):
with timer:
self.lookup('image_id')
for timer in ti.reset('_lookup'):
with timer:
self._lookup('image_id')
for timer in ti.reset('image_id'):
with timer:
self.image_id
for timer in ti.reset('raw1'):
with timer:
key = 'image_id'
[self._dset.anns[_id][key] for _id in self._ids]
for timer in ti.reset('raw2'):
with timer:
anns = self._dset.anns
key = 'image_id'
[anns[_id][key] for _id in self._ids]
for timer in ti.reset('lut-gen'):
with timer:
_lut = self._obj_lut
objs = (_lut[_id] for _id in self._ids)
[obj[key] for obj in objs]
for timer in ti.reset('lut-gen-single'):
with timer:
_lut = self._obj_lut
[_lut[_id][key] for _id in self._ids]
"""
return self.lookup(key, default=default)
[docs]class ObjectGroups(ub.NiceRepr):
"""
An object for holding a groups of :class:`ObjectList1D` objects
"""
def __init__(self, groups, dset):
self._groups = groups
def _lookup(self, key):
return self._lookup(key)
[docs] def lookup(self, key, default=ub.NoParam):
return [group.lookup(key, default) for group in self._groups]
def __nice__(self):
# import timerit
# mu = timerit.core._trychar('μ', 'm')
# sigma = timerit.core._trychar('σ', 's')
mu = 'm'
sigma = 's'
len_list = list(map(len, self._groups))
num = len(self._groups)
mean = np.mean(len_list)
std = np.std(len_list)
nice = 'n={!r}, {}={:.1f}, {}={:.1f}'.format(
num, mu, mean, sigma, std)
return nice
[docs]class Categories(ObjectList1D):
"""
Vectorized access to category attributes
Example:
>>> from kwcoco.coco_dataset import Categories # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo()
>>> ids = list(dset.cats.keys())
>>> self = Categories(ids, dset)
>>> print('self.name = {!r}'.format(self.name))
>>> print('self.supercategory = {!r}'.format(self.supercategory))
"""
def __init__(self, ids, dset):
super().__init__(ids, dset, 'categories')
@property
def cids(self):
return self.lookup('id')
@property
def name(self):
return self.lookup('name')
@property
def supercategory(self):
return self.lookup('supercategory', None)
[docs]class Videos(ObjectList1D):
"""
Vectorized access to video attributes
Example:
>>> from kwcoco.coco_dataset import Videos # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes5')
>>> ids = list(dset.index.videos.keys())
>>> self = Videos(ids, dset)
>>> print('self = {!r}'.format(self))
"""
def __init__(self, ids, dset):
super().__init__(ids, dset, 'videos')
[docs]class Images(ObjectList1D):
"""
Vectorized access to image attributes
"""
def __init__(self, ids, dset):
super().__init__(ids, dset, 'images')
@property
def gids(self):
return self._ids
@property
def gname(self):
return self.lookup('file_name')
@property
def gpath(self):
root = self._dset.img_root
return [join(root, gname) for gname in self.gname]
@property
def width(self):
return self.lookup('width')
@property
def height(self):
return self.lookup('height')
@property
def size(self):
"""
Example:
>>> from kwcoco.coco_dataset import *
>>> self = CocoDataset.demo().images()
>>> self._dset._ensure_imgsize()
>>> print(self.size)
[(512, 512), (300, 250), (256, 256)]
"""
return list(zip(self.lookup('width'), self.lookup('height')))
@property
def area(self):
"""
Example:
>>> from kwcoco.coco_dataset import *
>>> self = CocoDataset.demo().images()
>>> self._dset._ensure_imgsize()
>>> print(self.area)
[262144, 75000, 65536]
"""
return [w * h for w, h in zip(self.lookup('width'), self.lookup('height'))]
@property
def n_annots(self):
"""
Example:
>>> self = CocoDataset.demo().images()
>>> print(ub.repr2(self.n_annots, nl=0))
[9, 2, 0]
"""
return list(map(len, ub.take(self._dset.gid_to_aids, self._ids)))
@property
def aids(self):
"""
Example:
>>> self = CocoDataset.demo().images()
>>> print(ub.repr2(list(map(list, self.aids)), nl=0))
[[1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11], []]
"""
return list(ub.take(self._dset.gid_to_aids, self._ids))
@property
def annots(self):
"""
Example:
>>> self = CocoDataset.demo().images()
>>> print(self.annots)
<AnnotGroups(n=3, m=3.7, s=3.9)>
"""
return AnnotGroups([self._dset.annots(aids) for aids in self.aids],
self._dset)
[docs]class Annots(ObjectList1D):
"""
Vectorized access to annotation attributes
"""
def __init__(self, ids, dset):
super().__init__(ids, dset, 'annotations')
@property
def aids(self):
""" The annotation ids of this column of annotations """
return self._ids
@property
def images(self):
"""
Get the column of images
Returns:
Images
"""
return self._dset.images(self.gids)
@property
def image_id(self):
return self.lookup('image_id')
@property
def category_id(self):
return self.lookup('category_id')
@property
def gids(self):
"""
Get the column of image-ids
Returns:
List[int]: list of image ids
"""
return self.lookup('image_id')
@property
def cids(self):
"""
Get the column of category-ids
Returns:
List[int]
"""
return self.lookup('category_id')
@property
def cnames(self):
"""
Get the column of category names
Returns:
List[int]
"""
return [cat['name'] for cat in ub.take(self._dset.cats, self.cids)]
@cnames.setter
def cnames(self, cnames):
"""
Args:
cnames (List[str]):
Example:
>>> from kwcoco.coco_dataset import * # NOQA
>>> self = CocoDataset.demo().annots([1, 2, 11])
>>> print('self.cnames = {!r}'.format(self.cnames))
>>> print('self.cids = {!r}'.format(self.cids))
>>> cnames = ['boo', 'bar', 'rocket']
>>> list(map(self._dset.ensure_category, set(cnames)))
>>> self.cnames = cnames
>>> print('self.cnames = {!r}'.format(self.cnames))
>>> print('self.cids = {!r}'.format(self.cids))
"""
cats = map(self._dset._alias_to_cat, cnames)
cids = (cat['id'] for cat in cats)
self.set('category_id', cids)
@property
def detections(self):
"""
Get the kwimage-style detection objects
Returns:
kwimage.Detections
Example:
>>> # xdoctest: +REQUIRES(module:kwimage)
>>> from kwcoco.coco_dataset import * # NOQA
>>> self = CocoDataset.demo('shapes32').annots([1, 2, 11])
>>> dets = self.detections
>>> print('dets.data = {!r}'.format(dets.data))
>>> print('dets.meta = {!r}'.format(dets.meta))
"""
import kwimage
anns = [self._id_to_obj[aid] for aid in self.aids]
dets = kwimage.Detections.from_coco_annots(anns, dset=self._dset)
# dets.data['aids'] = np.array(self.aids)
return dets
@property
def boxes(self):
"""
Get the column of kwimage-style bounding boxes
Example:
>>> self = CocoDataset.demo().annots([1, 2, 11])
>>> print(self.boxes)
<Boxes(xywh,
array([[ 10, 10, 360, 490],
[350, 5, 130, 290],
[124, 96, 45, 18]]))>
"""
import kwimage
xywh = self.lookup('bbox')
boxes = kwimage.Boxes(xywh, 'xywh')
return boxes
@boxes.setter
def boxes(self, boxes):
"""
Args:
boxes (kwimage.Boxes):
Example:
>>> import kwimage
>>> from kwcoco.coco_dataset import * # NOQA
>>> self = CocoDataset.demo().annots([1, 2, 11])
>>> print('self.boxes = {!r}'.format(self.boxes))
>>> boxes = kwimage.Boxes.random(3).scale(512).astype(np.int)
>>> self.boxes = boxes
>>> print('self.boxes = {!r}'.format(self.boxes))
"""
anns = ub.take(self._dset.anns, self.aids)
xywh = boxes.to_xywh().data.tolist()
for ann, xywh in zip(anns, xywh):
ann['bbox'] = xywh
@property
def xywh(self):
"""
Returns raw boxes
Example:
>>> self = CocoDataset.demo().annots([1, 2, 11])
>>> print(self.xywh)
"""
xywh = self.lookup('bbox')
return xywh
[docs]class AnnotGroups(ObjectGroups):
@property
def cids(self):
return self.lookup('category_id')
[docs]class ImageGroups(ObjectGroups):
pass
[docs]class MixinCocoDepricate(object):
"""
These functions are marked for deprication and may be removed at any time
"""
[docs] def lookup_imgs(self, filename=None):
"""
Linear search for an images with specific attributes
# DEPRICATE
Ignore:
filename = '201503.20150525.101841191.573975.png'
list(self.lookup_imgs(filename))
gid = 64940
img = self.imgs[gid]
img['file_name'] = filename
"""
import warnings
warnings.warn('DEPRECATED: this method name may be recycled and '
'do something different in a later version',
DeprecationWarning)
for img in self.imgs.values():
if filename is not None:
fpath = img['file_name']
fname = basename(fpath)
fname_noext = splitext(fname)[0]
if filename in [fpath, fname, fname_noext]:
print('img = {!r}'.format(img))
yield img
[docs] def lookup_anns(self, has=None):
"""
Linear search for an annotations with specific attributes
# DEPRICATE
Ignore:
list(self.lookup_anns(has='radius'))
gid = 112888
img = self.imgs[gid]
img['file_name'] = filename
"""
import warnings
warnings.warn('DEPRECATED: this method name may be recycled and '
'do something different in a later version',
DeprecationWarning)
for ann in self.anns.values():
if has is not None:
if hasattr(ann, has):
print('ann = {!r}'.format(ann))
yield ann
def _mark_annotated_images(self):
"""
Mark any image that explicitly has annotations.
# DEPRICATE
"""
import warnings
warnings.warn('DEPRECATED: this method should not be used', DeprecationWarning)
for gid, img in self.imgs.items():
aids = self.gid_to_aids.get(gid, [])
# If there is at least one annotation, always mark as has_annots
if len(aids) > 0:
assert img.get('has_annots', ub.NoParam) in [ub.NoParam, True], (
'image with annots was explictly labeled as non-True!')
img['has_annots'] = True
else:
# Otherwise set has_annots to null if it has not been
# explicitly labeled
if 'has_annots' not in img:
img['has_annots'] = None
def _find_bad_annotations(self):
import warnings
warnings.warn('DEPRECATED: this method should not be used', DeprecationWarning)
to_remove = []
for ann in self.dataset['annotations']:
if ann['image_id'] is None or ann['category_id'] is None:
to_remove.append(ann)
else:
if ann['image_id'] not in self.imgs:
to_remove.append(ann)
if ann['category_id'] not in self.cats:
to_remove.append(ann)
return to_remove
def _remove_keypoint_annotations(self, rebuild=True):
"""
Remove annotations with keypoints only
Example:
>>> self = CocoDataset.demo()
>>> self._remove_keypoint_annotations()
"""
import warnings
warnings.warn('DEPRECATED: this method should not be used', DeprecationWarning)
to_remove = []
for ann in self.dataset['annotations']:
roi_shape = ann.get('roi_shape', None)
if roi_shape is None:
if 'keypoints' in ann and ann.get('bbox', None) is None:
to_remove.append(ann)
elif roi_shape == 'keypoints':
to_remove.append(ann)
print('Removing {} keypoint annotations'.format(len(to_remove)))
self.remove_annotations(to_remove)
if rebuild:
self._build_index()
def _remove_bad_annotations(self, rebuild=True):
import warnings
warnings.warn('DEPRECATED: this method should not be used', DeprecationWarning)
to_remove = []
for ann in self.dataset['annotations']:
if ann['image_id'] is None or ann['category_id'] is None:
to_remove.append(ann)
print('Removing {} bad annotations'.format(len(to_remove)))
self.remove_annotations(to_remove)
if rebuild:
self._build_index()
def _remove_radius_annotations(self, rebuild=False):
import warnings
warnings.warn('DEPRECATED: this method should not be used', DeprecationWarning)
to_remove = []
for ann in self.dataset['annotations']:
if 'radius' in ann:
to_remove.append(ann)
print('Removing {} radius annotations'.format(len(to_remove)))
self.remove_annotations(to_remove)
if rebuild:
self._build_index()
def _remove_empty_images(self):
import warnings
warnings.warn('DEPRECATED: this method should not be used', DeprecationWarning)
to_remove = []
for gid in self.imgs.keys():
aids = self.gid_to_aids.get(gid, [])
if not aids:
to_remove.append(self.imgs[gid])
print('Removing {} empty images'.format(len(to_remove)))
for img in to_remove:
self.dataset['images'].remove(img)
self._build_index()
[docs]class MixinCocoAttrs(object):
"""
Expose methods to construct object lists / groups
"""
[docs] def annots(self, aids=None, gid=None):
"""
Return vectorized annotation objects
Args:
aids (List[int]): annotation ids to reference, if unspecified
all annotations are returned.
gid (int): return all annotations that belong to this image id.
mutually exclusive with `aids` arg.
Returns:
Annots: vectorized annotation object
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> annots = self.annots()
>>> print(annots)
<Annots(num=11)>
>>> sub_annots = annots.take([1, 2, 3])
>>> print(sub_annots)
<Annots(num=3)>
>>> print(ub.repr2(sub_annots.get('bbox', None)))
[
[350, 5, 130, 290],
None,
None,
]
"""
if aids is None and gid is not None:
aids = sorted(self.index.gid_to_aids[gid])
if aids is None:
aids = sorted(self.index.anns.keys())
return Annots(aids, self)
[docs] def images(self, gids=None):
"""
Return vectorized image objects
Args:
gids (List[int]): image ids to reference, if unspecified
all images are returned.
Returns:
Images: vectorized images object
Example:
>>> self = CocoDataset.demo()
>>> images = self.images()
>>> print(images)
<Images(num=3)>
"""
if gids is None:
gids = sorted(self.index.imgs.keys())
return Images(gids, self)
[docs] def categories(self, cids=None):
"""
Return vectorized category objects
Example:
>>> self = CocoDataset.demo()
>>> categories = self.categories()
>>> print(categories)
<Categories(num=8)>
"""
if cids is None:
cids = sorted(self.index.cats.keys())
return Categories(cids, self)
[docs] def videos(self, vidids=None):
"""
Return vectorized video objects
Example:
>>> self = CocoDataset.demo('vidshapes2')
>>> videos = self.videos()
>>> print(videos)
>>> videos.lookup('name')
>>> videos.lookup('id')
>>> print('videos.objs = {}'.format(ub.repr2(videos.objs[0:2], nl=1)))
"""
if vidids is None:
vidids = sorted(self.index.videos.keys())
return Videos(vidids, self)
[docs]class MixinCocoStats(object):
"""
Methods for getting stats about the dataset
"""
@property
def n_annots(self):
return len(self.dataset['annotations'])
@property
def n_images(self):
return len(self.dataset['images'])
@property
def n_cats(self):
return len(self.dataset['categories'])
@property
def n_videos(self):
return len(self.dataset.get('videos', []))
[docs] def keypoint_annotation_frequency(self):
"""
Example:
>>> from kwcoco.coco_dataset import *
>>> self = CocoDataset.demo('shapes', rng=0)
>>> hist = self.keypoint_annotation_frequency()
>>> hist = ub.odict(sorted(hist.items()))
>>> # FIXME: for whatever reason demodata generation is not determenistic when seeded
>>> print(ub.repr2(hist)) # xdoc: +IGNORE_WANT
{
'bot_tip': 6,
'left_eye': 14,
'mid_tip': 6,
'right_eye': 14,
'top_tip': 6,
}
"""
ann_kpcids = [kp['keypoint_category_id']
for ann in self.dataset['annotations']
for kp in ann.get('keypoints', [])]
kpcid_to_name = {kpcat['id']: kpcat['name']
for kpcat in self.dataset['keypoint_categories']}
kpcid_to_num = ub.dict_hist(ann_kpcids,
labels=list(kpcid_to_name.keys()))
kpname_to_num = ub.map_keys(kpcid_to_name, kpcid_to_num)
return kpname_to_num
[docs] def category_annotation_frequency(self):
"""
Reports the number of annotations of each category
Example:
>>> from kwcoco.coco_dataset import *
>>> self = CocoDataset.demo()
>>> hist = self.category_annotation_frequency()
>>> print(ub.repr2(hist))
{
'astroturf': 0,
'human': 0,
'astronaut': 1,
'astronomer': 1,
'helmet': 1,
'rocket': 1,
'mouth': 2,
'star': 5,
}
"""
catname_to_nannots = ub.map_keys(
lambda x: None if x is None else self.cats[x]['name'],
ub.map_vals(len, self.cid_to_aids))
catname_to_nannots = ub.odict(sorted(catname_to_nannots.items(),
key=lambda kv: (kv[1], kv[0])))
return catname_to_nannots
[docs] def category_annotation_type_frequency(self):
"""
Reports the number of annotations of each type for each category
Example:
>>> self = CocoDataset.demo()
>>> hist = self.category_annotation_frequency()
>>> print(ub.repr2(hist))
"""
catname_to_nannot_types = {}
for cid, aids in self.cid_to_aids.items():
name = self.cats[cid]['name']
hist = ub.dict_hist(map(_annot_type, ub.take(self.anns, aids)))
catname_to_nannot_types[name] = ub.map_keys(
lambda k: k[0] if len(k) == 1 else k, hist)
return catname_to_nannot_types
[docs] def basic_stats(self):
"""
Reports number of images, annotations, and categories.
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> print(ub.repr2(self.basic_stats()))
{
'n_anns': 11,
'n_imgs': 3,
'n_videos': 0,
'n_cats': 8,
}
>>> from kwcoco.demo.toydata import * # NOQA
>>> dset = random_video_dset(render=True, num_frames=2, num_tracks=10, rng=0)
>>> print(ub.repr2(dset.basic_stats()))
{
'n_anns': 20,
'n_imgs': 2,
'n_videos': 1,
'n_cats': 3,
}
"""
return ub.odict([
('n_anns', self.n_annots),
('n_imgs', self.n_images),
('n_videos', self.n_videos),
('n_cats', self.n_cats),
])
[docs] def extended_stats(self):
"""
Reports number of images, annotations, and categories.
Example:
>>> self = CocoDataset.demo()
>>> print(ub.repr2(self.extended_stats()))
"""
def mapping_stats(xid_to_yids):
import kwarray
n_yids = list(ub.map_vals(len, xid_to_yids).values())
return kwarray.stats_dict(n_yids, n_extreme=True)
return ub.odict([
('annots_per_img', mapping_stats(self.gid_to_aids)),
# ('cats_per_img', mapping_stats(self.cid_to_gids)),
('annots_per_cat', mapping_stats(self.cid_to_aids)),
])
[docs] def boxsize_stats(self, anchors=None, perclass=True, gids=None, aids=None,
verbose=0, clusterkw={}, statskw={}):
"""
Compute statistics about bounding box sizes.
Also computes anchor boxes using kmeans if ``anchors`` is specified.
Args:
anchors (int): if specified also computes box anchors
perclass (bool): if True also computes stats for each category
gids (List[int], default=None):
if specified only compute stats for these image ids.
aids (List[int], default=None):
if specified only compute stats for these annotation ids.
verbose (int): verbosity level
clusterkw (dict): kwargs for :class:`sklearn.cluster.KMeans` used
if computing anchors.
statskw (dict): kwargs for :func:`kwarray.stats_dict`
Returns:
Dict[str, Dict[str, Dict | ndarray]
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('shapes32')
>>> infos = self.boxsize_stats(anchors=4, perclass=False)
>>> print(ub.repr2(infos, nl=-1, precision=2))
>>> infos = self.boxsize_stats(gids=[1], statskw=dict(median=True))
>>> print(ub.repr2(infos, nl=-1, precision=2))
"""
import kwarray
cname_to_box_sizes = defaultdict(list)
if bool(gids) and bool(aids):
raise ValueError('specifying gids and aids is mutually exclusive')
if gids is not None:
aids = ub.flatten(ub.take(self.index.gid_to_aids, gids))
if aids is not None:
anns = ub.take(self.anns, aids)
else:
anns = self.dataset['annotations']
for ann in anns:
if 'bbox' in ann:
cname = self.cats[ann['category_id']]['name']
cname_to_box_sizes[cname].append(ann['bbox'][2:4])
cname_to_box_sizes = ub.map_vals(np.array, cname_to_box_sizes)
def _boxes_info(box_sizes):
box_info = {
'stats': kwarray.stats_dict(box_sizes, axis=0, **statskw)
}
if anchors:
from sklearn import cluster
defaultkw = {
'n_clusters': anchors,
'n_init': 20,
'max_iter': 10000,
'tol': 1e-6,
'algorithm': 'elkan',
'verbose': verbose
}
kmkw = ub.dict_union(defaultkw, clusterkw)
algo = cluster.KMeans(**kmkw)
algo.fit(box_sizes)
anchor_sizes = algo.cluster_centers_
box_info['anchors'] = anchor_sizes
return box_info
infos = {}
if perclass:
cid_to_info = {}
for cname, box_sizes in cname_to_box_sizes.items():
if verbose:
print('compute {} bbox stats'.format(cname))
cid_to_info[cname] = _boxes_info(box_sizes)
infos['perclass'] = cid_to_info
if verbose:
print('compute all bbox stats')
all_sizes = np.vstack(list(cname_to_box_sizes.values()))
all_info = _boxes_info(all_sizes)
infos['all'] = all_info
return infos
class _NextId(object):
""" Helper class to tracks unused ids for new items """
def __init__(self, parent):
self.parent = parent
self.unused = {
'categories': None,
'images': None,
'annotations': None,
'videos': None,
}
def _update_unused(self, key):
""" Scans for what the next safe id can be for ``key`` """
item_list = self.parent.dataset[key]
max_id = max(item['id'] for item in item_list) if item_list else 0
next_id = max(max_id + 1, len(item_list))
self.unused[key] = next_id
def get(self, key):
""" Get the next safe item id for ``key`` """
if self.unused[key] is None:
self._update_unused(key)
new_id = self.unused[key]
self.unused[key] += 1
return new_id
[docs]class MixinCocoDraw(object):
"""
Matplotlib / display functionality
"""
[docs] def imread(self, gid):
"""
Loads a particular image
"""
return self.load_image(gid)
[docs] def draw_image(self, gid):
"""
Use kwimage to draw all annotations on an image and return the pixels
as a numpy array.
Returns:
ndarray: canvas
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo('shapes8')
>>> self.draw_image(1)
>>> # Now you can dump the annotated image to disk / whatever
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(canvas)
"""
import kwimage
# Load the raw image pixels
canvas = self.load_image(gid)
# Get annotation IDs from this image
aids = self.index.gid_to_aids[gid]
# Grab relevant annotation dictionaries
anns = [self.anns[aid] for aid in aids]
# Transform them into a kwimage.Detections datastructure
# if 1:
dset = self
try:
classes = dset.object_categories()
except Exception:
classes = list(dset.name_to_cat.keys())
try:
kp_classes = dset.keypoint_categories()
except Exception:
# hack
anns = [ann.copy() for ann in anns]
for ann in anns:
ann.pop('keypoints', None)
kp_classes = None
cats = dset.dataset['categories']
# dets = kwimage.Detections.from_coco_annots(anns, dset=self)
dets = kwimage.Detections.from_coco_annots(
anns, classes=classes, cats=cats, kp_classes=kp_classes)
canvas = dets.draw_on(canvas)
return canvas
[docs] def show_image(self, gid=None, aids=None, aid=None, **kwargs):
"""
Use matplotlib to show an image with annotations overlaid
Args:
gid (int): image to show
aids (list): aids to highlight within the image
aid (int): a specific aid to focus on. If gid is not give,
look up gid based on this aid.
**kwargs:
show_annots, show_aid, show_catname, show_kpname,
show_segmentation, title, show_gid, show_filename,
show_boxes,
Ignore:
# Programatically collect the kwargs for docs generation
import xinspect
import kwcoco
kwargs = xinspect.get_kwargs(kwcoco.CocoDataset.show_image)
print(ub.repr2(list(kwargs.keys()), nl=1, si=1))
"""
import matplotlib as mpl
from matplotlib import pyplot as plt
# from PIL import Image
import kwimage
import kwplot
figkw = {k: kwargs[k] for k in ['fnum', 'pnum', 'doclf', 'docla']
if k in kwargs}
if figkw:
kwplot.figure(**figkw)
if gid is None:
primary_ann = self.anns[aid]
gid = primary_ann['image_id']
show_all = kwargs.get('show_all', True)
show_annots = kwargs.get('show_annots', True)
highlight_aids = set()
if aid is not None:
highlight_aids.add(aid)
if aids is not None:
highlight_aids.update(aids)
img = self.imgs[gid]
aids = self.gid_to_aids.get(img['id'], [])
# Collect annotation overlays
colored_segments = defaultdict(list)
keypoints = []
rects = []
texts = []
sseg_masks = []
sseg_polys = []
if show_annots:
for aid in aids:
ann = self.anns[aid]
if 'keypoints' in ann:
cid = ann['category_id']
if ann['keypoints'] is not None and len(ann['keypoints']) > 0:
# TODO: rely on kwimage.Points to parse multiple format info?
kpts_data = ann['keypoints']
if isinstance(ub.peek(kpts_data), dict):
xys = np.array([p['xy'] for p in kpts_data])
isvisible = np.array([p.get('visible', True) for p in kpts_data])
kpnames = None
# kpnames = []
# for p in kpts_data:
# if 'keypoint_category_id' in p:
# pass
# pass
isvisible = np.array([p.get('visible', True) for p in kpts_data])
else:
try:
kpnames = self._lookup_kpnames(cid)
except KeyError:
kpnames = None
kpts = np.array(ann['keypoints']).reshape(-1, 3)
isvisible = kpts.T[2] > 0
xys = kpts.T[0:2].T[isvisible]
else:
kpnames = None
xys = None
else:
kpnames = None
xys = None
# Note standard coco bbox is [x,y,width,height]
if 'bbox' in ann:
x1, y1 = ann['bbox'][0:2]
elif 'line' in ann:
x1, y1 = ann['line'][0:2]
elif 'keypoints' in ann:
x1, y1 = xys.min(axis=0)
else:
raise Exception('no bbox, line, or keypoint position')
cid = ann.get('category_id', None)
if cid is not None:
cat = self.cats[cid]
catname = cat['name']
else:
cat = None
catname = ann.get('category_name', 'None')
textkw = {
'horizontalalignment': 'left',
'verticalalignment': 'top',
'backgroundcolor': (0, 0, 0, .3),
'color': 'white',
'fontproperties': mpl.font_manager.FontProperties(
size=6, family='monospace'),
}
annot_text_parts = []
if kwargs.get('show_aid', show_all):
annot_text_parts.append('aid={}'.format(aid))
if kwargs.get('show_catname', show_all):
annot_text_parts.append(catname)
annot_text = ' '.join(annot_text_parts)
texts.append((x1, y1, annot_text, textkw))
color = 'orange' if aid in highlight_aids else 'blue'
if 'obox' in ann:
# Oriented bounding box
segs = np.array(ann['obox']).reshape(-1, 3)[:, 0:2]
for pt1, pt2 in ub.iter_window(segs, wrap=True):
colored_segments[color].append([pt1, pt2])
elif 'bbox' in ann:
[x, y, w, h] = ann['bbox']
rect = mpl.patches.Rectangle((x, y), w, h, facecolor='none',
edgecolor=color)
rects.append(rect)
if 'line' in ann:
x1, y1, x2, y2 = ann['line']
pt1, pt2 = (x1, y1), (x2, y2)
colored_segments[color].append([pt1, pt2])
if 'keypoints' in ann:
if xys is not None and len(xys):
keypoints.append(xys)
if kwargs.get('show_kpname', True):
if kpnames is not None:
for (kp_x, kp_y), kpname in zip(xys, kpnames):
texts.append((kp_x, kp_y, kpname, textkw))
if 'segmentation' in ann and kwargs.get('show_segmentation', True):
sseg = ann['segmentation']
# Respect the 'color' attribute of categories
if cat is not None:
catcolor = cat.get('color', None)
else:
catcolor = None
HAVE_KWIMAGE = True
if HAVE_KWIMAGE:
if catcolor is not None:
catcolor = kwplot.Color(catcolor).as01()
# TODO: Unify masks and polygons into a kwimage
# segmentation class
sseg = kwimage.Segmentation.coerce(sseg).data
if isinstance(sseg, kwimage.Mask):
m = sseg.to_c_mask()
sseg_masks.append((m.data, catcolor))
else:
# TODO: interior
multipoly = sseg.to_multi_polygon()
for poly in multipoly.data:
poly_xys = poly.data['exterior'].data
polykw = {}
if catcolor is not None:
polykw['color'] = catcolor
poly = mpl.patches.Polygon(poly_xys, **polykw)
try:
# hack
poly.area = sseg.to_shapely().area
except Exception:
pass
sseg_polys.append(poly)
else:
# print('sseg = {!r}'.format(sseg))
if isinstance(sseg, dict):
# Handle COCO-RLE-segmentations; convert to raw binary masks
sseg = dict(sseg)
if 'shape' not in sseg and 'size' in sseg:
# NOTE: size here is actually h/w unlike almost
# everywhere else
sseg['shape'] = sseg['size']
if isinstance(sseg['counts'], (six.binary_type, six.text_type)):
mask = kwimage.Mask(sseg, 'bytes_rle').to_c_mask().data
else:
mask = kwimage.Mask(sseg, 'array_rle').to_c_mask().data
sseg_masks.append((mask, catcolor))
elif isinstance(sseg, list):
# Handle COCO-polygon-segmentation
# If the segmentation is a list of polygons
if not (len(sseg) and isinstance(sseg[0], list)):
sseg = [sseg]
for flat in sseg:
poly_xys = np.array(flat).reshape(-1, 2)
polykw = {}
if catcolor is not None:
polykw['color'] = catcolor
poly = mpl.patches.Polygon(poly_xys, **polykw)
sseg_polys.append(poly)
else:
raise TypeError(type(sseg))
# Show image
np_img = self.load_image(img)
np_img = kwimage.atleast_3channels(np_img)
np_img01 = None
if np_img.dtype.kind in {'i', 'u'}:
if np_img.max() > 255:
np_img01 = np_img / np_img.max()
fig = plt.gcf()
ax = fig.gca()
ax.cla()
if sseg_masks:
if np_img01 is None:
np_img01 = kwimage.ensure_float01(np_img)
layers = []
layers.append(kwimage.ensure_alpha_channel(np_img01))
distinct_colors = kwplot.Color.distinct(len(sseg_masks))
for (mask, _catcolor), col in zip(sseg_masks, distinct_colors):
if _catcolor is not None:
col = kwimage.ensure_float01(np.array(_catcolor)).tolist()
col = np.array(col + [1])[None, None, :]
alpha_mask = col * mask[:, :, None]
alpha_mask[..., 3] = mask * 0.5
layers.append(alpha_mask)
with ub.Timer('overlay'):
masked_img = kwimage.overlay_alpha_layers(layers[::-1])
ax.imshow(masked_img)
else:
if np_img01 is not None:
ax.imshow(np_img01)
else:
ax.imshow(np_img)
title = kwargs.get('title', None)
if title is None:
title_parts = []
if kwargs.get('show_gid', True):
title_parts.append('gid={}'.format(gid))
if kwargs.get('show_filename', True):
title_parts.append(img['file_name'])
title = ' '.join(title_parts)
if title:
ax.set_title(title)
if sseg_polys:
# print('sseg_polys = {!r}'.format(sseg_polys))
if True:
# hack: show smaller polygons first.
if len(sseg_polys):
areas = np.array([getattr(p, 'area', np.inf) for p in sseg_polys])
sortx = np.argsort(areas)[::-1]
sseg_polys = list(ub.take(sseg_polys, sortx))
poly_col = mpl.collections.PatchCollection(
sseg_polys, 2, alpha=0.4)
ax.add_collection(poly_col)
# Show all annotations inside it
if kwargs.get('show_boxes', True):
for (x1, y1, catname, textkw) in texts:
ax.text(x1, y1, catname, **textkw)
for color, segments in colored_segments.items():
line_col = mpl.collections.LineCollection(segments, 2, color=color)
ax.add_collection(line_col)
rect_col = mpl.collections.PatchCollection(rects, match_original=True)
ax.add_collection(rect_col)
if keypoints:
xs, ys = np.vstack(keypoints).T
ax.plot(xs, ys, 'bo')
return ax
[docs]class MixinCocoAddRemove(object):
"""
Mixin functions to dynamically add / remove annotations images and
categories while maintaining lookup indexes.
"""
[docs] def add_video(self, name, id=None, **kw):
"""
Add a video to the dataset (dynamically updates the index)
Args:
name (str): Unique name for this video.
id (None or int): ADVANCED. Force using this image id.
**kw : stores arbitrary key/value pairs in this new video
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset()
>>> print('self.index.videos = {}'.format(ub.repr2(self.index.videos, nl=1)))
>>> print('self.index.imgs = {}'.format(ub.repr2(self.index.imgs, nl=1)))
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
>>> vidid1 = self.add_video('foo', id=3)
>>> vidid2 = self.add_video('bar')
>>> vidid3 = self.add_video('baz')
>>> print('self.index.videos = {}'.format(ub.repr2(self.index.videos, nl=1)))
>>> print('self.index.imgs = {}'.format(ub.repr2(self.index.imgs, nl=1)))
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
>>> gid1 = self.add_image('foo1.jpg', video_id=vidid1)
>>> gid2 = self.add_image('foo2.jpg', video_id=vidid1)
>>> gid3 = self.add_image('foo3.jpg', video_id=vidid1)
>>> self.add_image('bar1.jpg', video_id=vidid2)
>>> print('self.index.videos = {}'.format(ub.repr2(self.index.videos, nl=1)))
>>> print('self.index.imgs = {}'.format(ub.repr2(self.index.imgs, nl=1)))
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
>>> self.remove_images([gid2])
>>> print('self.index.vidid_to_gids = {!r}'.format(self.index.vidid_to_gids))
"""
if id is None:
id = self._next_ids.get('videos')
video = ub.odict()
video['id'] = id
video['name'] = name
video.update(**kw)
self.dataset['videos'].append(video)
self.index._add_video(id, video)
# self._invalidate_hashid(['videos'])
return id
[docs] def add_image(self, file_name, id=None, **kw):
"""
Add an image to the dataset (dynamically updates the index)
Args:
file_name (str): relative or absolute path to image
id (None or int): ADVANCED. Force using this image id.
**kw : stores arbitrary key/value pairs in this new image
Example:
>>> self = CocoDataset.demo()
>>> import kwimage
>>> gname = kwimage.grab_test_image_fpath('paraview')
>>> gid = self.add_image(gname)
>>> assert self.imgs[gid]['file_name'] == gname
"""
if id is None:
id = self._next_ids.get('images')
elif self.imgs and id in self.imgs:
raise IndexError('Image id={} already exists'.format(id))
img = _dict()
img['id'] = int(id)
img['file_name'] = str(file_name)
img.update(**kw)
self.index._add_image(id, img)
self.dataset['images'].append(img)
self._invalidate_hashid()
return id
[docs] def add_annotation(self, image_id, category_id=None, bbox=None, id=None, **kw):
"""
Add an annotation to the dataset (dynamically updates the index)
Args:
image_id (int): image_id to add to
category_id (int): category_id to add to
bbox (list or kwimage.Boxes): bounding box in xywh format
id (None or int): ADVANCED. Force using this annotation id.
**kw : stores arbitrary key/value pairs in this new image
Example:
>>> self = CocoDataset.demo()
>>> image_id = 1
>>> cid = 1
>>> bbox = [10, 10, 20, 20]
>>> aid = self.add_annotation(image_id, cid, bbox)
>>> assert self.anns[aid]['bbox'] == bbox
Example:
>>> # Attempt to annot without a category or bbox
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> image_id = 1
>>> aid = self.add_annotation(image_id)
>>> assert None in self.index.cid_to_aids
"""
if id is None:
id = self._next_ids.get('annotations')
elif self.anns and id in self.anns:
raise IndexError('Annot id={} already exists'.format(id))
ann = _dict()
ann['id'] = int(id)
ann['image_id'] = int(image_id)
ann['category_id'] = None if category_id is None else int(category_id)
if bbox is not None:
try:
import kwimage
if isinstance(bbox, kwimage.Boxes):
bbox = bbox.to_xywh().data.tolist()
except ImportError:
pass
ann['bbox'] = bbox
# assert not set(kw).intersection(set(ann))
ann.update(**kw)
self.dataset['annotations'].append(ann)
self.index._add_annotation(id, image_id, category_id, ann)
self._invalidate_hashid(['annotations'])
return id
[docs] def add_category(self, name, supercategory=None, id=None, **kw):
"""
Adds a category
Args:
name (str): name of the new category
supercategory (str, optional): parent of this category
id (int, optional): use this category id, if it was not taken
**kw : stores arbitrary key/value pairs in this new image
Example:
>>> self = CocoDataset.demo()
>>> prev_n_cats = self.n_cats
>>> cid = self.add_category('dog', supercategory='object')
>>> assert self.cats[cid]['name'] == 'dog'
>>> assert self.n_cats == prev_n_cats + 1
>>> import pytest
>>> with pytest.raises(ValueError):
>>> self.add_category('dog', supercategory='object')
"""
index = self.index
if index.cats and name in index.name_to_cat:
raise ValueError('Category name={!r} already exists'.format(name))
if id is None:
id = self._next_ids.get('categories')
elif index.cats and id in index.cats:
raise IndexError('Category id={} already exists'.format(id))
cat = _dict()
cat['id'] = int(id)
cat['name'] = str(name)
if supercategory:
cat['supercategory'] = supercategory
cat.update(**kw)
# Add to raw data structure
self.dataset['categories'].append(cat)
# And add to the indexes
index._add_category(id, name, cat)
self._invalidate_hashid(['categories'])
return id
[docs] def ensure_image(self, file_name, id=None, **kw):
"""
Like add_image, but returns the existing image id if it already
exists instead of failing. In this case all metadata is ignored.
Args:
file_name (str): relative or absolute path to image
id (None or int): ADVANCED. Force using this image id.
**kw : stores arbitrary key/value pairs in this new image
Returns:
int: the existing or new image id
"""
try:
id = self.add_image(file_name=file_name, id=id, **kw)
except ValueError:
img = self.index.file_name_to_img[file_name]
id = img['id']
return id
[docs] def ensure_category(self, name, supercategory=None, id=None, **kw):
"""
Like add_category, but returns the existing category id if it already
exists instead of failing. In this case all metadata is ignored.
Returns:
int: the existing or new category id
"""
try:
id = self.add_category(name=name, supercategory=supercategory,
id=id, **kw)
except ValueError:
cat = self.index.name_to_cat[name]
id = cat['id']
return id
[docs] def add_annotations(self, anns):
"""
Faster less-safe multi-item alternative
Args:
anns (List[Dict]): list of annotation dictionaries
Example:
>>> self = CocoDataset.demo()
>>> anns = [self.anns[aid] for aid in [2, 3, 5, 7]]
>>> self.remove_annotations(anns)
>>> assert self.n_annots == 7 and self._check_index()
>>> self.add_annotations(anns)
>>> assert self.n_annots == 11 and self._check_index()
"""
self.dataset['annotations'].extend(anns)
self.index._add_annotations(anns)
self._invalidate_hashid(['annotations'])
[docs] def add_images(self, imgs):
"""
Faster less-safe multi-item alternative
Note:
THIS FUNCTION WAS DESIGNED FOR SPEED, AS SUCH IT DOES NOT CHECK IF
THE IMAGE-IDs or FILE_NAMES ARE DUPLICATED AND WILL BLINDLY ADD
DATA EVEN IF IT IS BAD. THE SINGLE IMAGE VERSION IS SLOWER BUT
SAFER.
Args:
imgs (List[Dict]): list of image dictionaries
Example:
>>> imgs = CocoDataset.demo().dataset['images']
>>> self = CocoDataset()
>>> self.add_images(imgs)
>>> assert self.n_images == 3 and self._check_index()
"""
self.dataset['images'].extend(imgs)
self.index._add_images(imgs)
self._invalidate_hashid(['images'])
[docs] def clear_images(self):
"""
Removes all images and annotations (but not categories)
Example:
>>> self = CocoDataset.demo()
>>> self.clear_images()
>>> print(ub.repr2(self.basic_stats(), nobr=1, nl=0, si=1))
n_anns: 0, n_imgs: 0, n_videos: 0, n_cats: 8
"""
# self.dataset['images'].clear()
# self.dataset['annotations'].clear()
del self.dataset['images'][:]
del self.dataset['annotations'][:]
self.index._remove_all_images()
self._invalidate_hashid(['images', 'annotations'])
[docs] def clear_annotations(self):
"""
Removes all annotations (but not images and categories)
Example:
>>> self = CocoDataset.demo()
>>> self.clear_annotations()
>>> print(ub.repr2(self.basic_stats(), nobr=1, nl=0, si=1))
n_anns: 0, n_imgs: 3, n_videos: 0, n_cats: 8
"""
# self.dataset['annotations'].clear()
del self.dataset['annotations'][:]
self.index._remove_all_annotations()
self._invalidate_hashid(['annotations'])
remove_all_images = clear_images
remove_all_annotations = clear_annotations
[docs] def remove_annotation(self, aid_or_ann):
"""
Remove a single annotation from the dataset
If you have multiple annotations to remove its more efficient to remove
them in batch with ``self.remove_annotations``
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> aids_or_anns = [self.anns[2], 3, 4, self.anns[1]]
>>> self.remove_annotations(aids_or_anns)
>>> assert len(self.dataset['annotations']) == 7
>>> self._check_index()
"""
# Do the simple thing, its O(n) anyway,
remove_ann = self._resolve_to_ann(aid_or_ann)
self.dataset['annotations'].remove(remove_ann)
self.index.clear()
self._invalidate_hashid(['annotations'])
[docs] def remove_annotations(self, aids_or_anns, verbose=0, safe=True):
"""
Remove multiple annotations from the dataset.
Args:
anns_or_aids (List): list of annotation dicts or ids
safe (bool, default=True): if True, we perform checks to remove
duplicates and non-existing identifiers.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> prev_n_annots = self.n_annots
>>> aids_or_anns = [self.anns[2], 3, 4, self.anns[1]]
>>> self.remove_annotations(aids_or_anns) # xdoc: +IGNORE_WANT
{'annotations': 4}
>>> assert len(self.dataset['annotations']) == prev_n_annots - 4
>>> self._check_index()
"""
remove_info = {'annotations': None}
# Do nothing if given no input
if aids_or_anns:
# build mapping from aid to index O(n)
# TODO: it would be nice if this mapping was as part of the index.
aid_to_index = {
ann['id']: index
for index, ann in enumerate(self.dataset['annotations'])
}
remove_aids = list(map(self._resolve_to_id, aids_or_anns))
if safe:
remove_aids = sorted(set(remove_aids))
remove_info['annotations'] = len(remove_aids)
# Lookup the indices to remove, sort in descending order
if verbose > 1:
print('Removing {} annotations'.format(len(remove_aids)))
remove_idxs = list(ub.take(aid_to_index, remove_aids))
_delitems(self.dataset['annotations'], remove_idxs)
self.index._remove_annotations(remove_aids, verbose=verbose)
self._invalidate_hashid(['annotations'])
return remove_info
[docs] def remove_categories(self, cat_identifiers, keep_annots=False, verbose=0,
safe=True):
"""
Remove categories and all annotations in those categories.
Currently does not change any hierarchy information
Args:
cat_identifiers (List): list of category dicts, names, or ids
keep_annots (bool, default=False):
if True, keeps annotations, but removes category labels.
safe (bool, default=True): if True, we perform checks to remove
duplicates and non-existing identifiers.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> self = CocoDataset.demo()
>>> cat_identifiers = [self.cats[1], 'rocket', 3]
>>> self.remove_categories(cat_identifiers)
>>> assert len(self.dataset['categories']) == 5
>>> self._check_index()
"""
remove_info = {'annotations': None, 'categories': None}
if cat_identifiers:
if verbose > 1:
print('Removing annots of removed categories')
if safe:
remove_cids = set()
for identifier in cat_identifiers:
try:
cid = self._resolve_to_cid(identifier)
remove_cids.add(cid)
except Exception:
pass
remove_cids = sorted(remove_cids)
else:
remove_cids = list(map(self._resolve_to_cid, cat_identifiers))
# First remove any annotation that belongs to those categories
if self.cid_to_aids:
remove_aids = list(it.chain(*[self.cid_to_aids[cid]
for cid in remove_cids]))
else:
remove_aids = [ann['id'] for ann in self.dataset['annotations']
if ann['category_id'] in remove_cids]
if keep_annots:
# Simply remove category information instead of removing the
# entire annotation.
for aid in remove_aids:
self.anns[aid].pop('category_id')
else:
rminfo = self.remove_annotations(remove_aids, verbose=verbose)
remove_info.update(rminfo)
remove_info['categories'] = len(remove_cids)
if verbose > 1:
print('Removing {} category entries'.format(len(remove_cids)))
cid_to_index = {
cat['id']: index
for index, cat in enumerate(self.dataset['categories'])
}
# Lookup the indices to remove, sort in descending order
remove_idxs = list(ub.take(cid_to_index, remove_cids))
_delitems(self.dataset['categories'], remove_idxs)
self.index._remove_categories(remove_cids, verbose=verbose)
self._invalidate_hashid(['categories', 'annotations'])
return remove_info
[docs] def remove_images(self, gids_or_imgs, verbose=0, safe=True):
"""
Args:
gids_or_imgs (List): list of image dicts, names, or ids
safe (bool, default=True): if True, we perform checks to remove
duplicates and non-existing identifiers.
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> from kwcoco.coco_dataset import *
>>> self = CocoDataset.demo()
>>> assert len(self.dataset['images']) == 3
>>> gids_or_imgs = [self.imgs[2], 'astro.png']
>>> self.remove_images(gids_or_imgs) # xdoc: +IGNORE_WANT
{'annotations': 11, 'images': 2}
>>> assert len(self.dataset['images']) == 1
>>> self._check_index()
>>> gids_or_imgs = [3]
>>> self.remove_images(gids_or_imgs)
>>> assert len(self.dataset['images']) == 0
>>> self._check_index()
"""
remove_info = {'annotations': None, 'images': None}
if gids_or_imgs:
if verbose > 1:
print('Removing annots of removed images')
remove_gids = list(map(self._resolve_to_gid, gids_or_imgs))
if safe:
remove_gids = sorted(set(remove_gids))
# First remove any annotation that belongs to those images
if self.gid_to_aids:
remove_aids = list(it.chain(*[self.gid_to_aids[gid]
for gid in remove_gids]))
else:
remove_aids = [ann['id'] for ann in self.dataset['annotations']
if ann['image_id'] in remove_gids]
rminfo = self.remove_annotations(remove_aids, verbose=verbose)
remove_info.update(rminfo)
remove_info['images'] = len(remove_gids)
if verbose > 1:
print('Removing {} image entries'.format(len(remove_gids)))
gid_to_index = {
img['id']: index
for index, img in enumerate(self.dataset['images'])
}
# Lookup the indices to remove, sort in descending order
remove_idxs = list(ub.take(gid_to_index, remove_gids))
_delitems(self.dataset['images'], remove_idxs)
self.index._remove_images(remove_gids, verbose=verbose)
self._invalidate_hashid(['images', 'annotations'])
return remove_info
[docs] def remove_annotation_keypoints(self, kp_identifiers):
"""
Removes all keypoints with a particular category
Args:
kp_identifiers (List): list of keypoint category dicts, names, or ids
Returns:
Dict: num_removed: information on the number of items removed
"""
# kpnames = {k['name'] for k in remove_kpcats}
# TODO: needs optimization
remove_kpcats = list(map(self._resolve_to_kpcat, kp_identifiers))
kpcids = {k['id'] for k in remove_kpcats}
num_kps_removed = 0
for ann in self.dataset['annotations']:
remove_idxs = [
kp_idx for kp_idx, kp in enumerate(ann['keypoints'])
if kp['keypoint_category_id'] in kpcids
]
num_kps_removed += len(remove_idxs)
_delitems(ann['keypoints'], remove_idxs)
remove_info = {'annotation_keypoints': num_kps_removed}
return remove_info
[docs] def remove_keypoint_categories(self, kp_identifiers):
"""
Removes all keypoints of a particular category as well as all
annotation keypoints with those ids.
Args:
kp_identifiers (List): list of keypoint category dicts, names, or ids
Returns:
Dict: num_removed: information on the number of items removed
Example:
>>> self = CocoDataset.demo('shapes', rng=0)
>>> kp_identifiers = ['left_eye', 'mid_tip']
>>> remove_info = self.remove_keypoint_categories(kp_identifiers)
>>> print('remove_info = {!r}'.format(remove_info))
>>> # FIXME: for whatever reason demodata generation is not determenistic when seeded
>>> # assert remove_info == {'keypoint_categories': 2, 'annotation_keypoints': 16, 'reflection_ids': 1}
>>> assert self._resolve_to_kpcat('right_eye')['reflection_id'] is None
"""
remove_info = {
'keypoint_categories': None,
'annotation_keypoints': None
}
remove_kpcats = list(map(self._resolve_to_kpcat, kp_identifiers))
_ann_remove_info = self.remove_annotation_keypoints(remove_kpcats)
remove_info.update(_ann_remove_info)
remove_kpcids = {k['id'] for k in remove_kpcats}
for kpcat in remove_kpcats:
self.dataset['keypoint_categories'].remove(kpcat)
# handle reflection ids
remove_reflect_ids = 0
for kpcat in self.dataset['keypoint_categories']:
if kpcat.get('reflection_id', None) in remove_kpcids:
kpcat['reflection_id'] = None
remove_reflect_ids += 1
remove_info['reflection_ids'] = remove_reflect_ids
remove_info['keypoint_categories'] = len(remove_kpcats)
return remove_info
[docs] def set_annotation_category(self, aid_or_ann, cid_or_cat):
"""
Sets the category of a single annotation
Args:
aid_or_ann (dict | int): annotation dict or id
cid_or_cat (dict | int): category dict or id
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> old_freq = self.category_annotation_frequency()
>>> aid_or_ann = aid = 2
>>> cid_or_cat = new_cid = self.ensure_category('kitten')
>>> self.set_annotation_category(aid, new_cid)
>>> new_freq = self.category_annotation_frequency()
>>> print('new_freq = {}'.format(ub.repr2(new_freq, nl=1)))
>>> print('old_freq = {}'.format(ub.repr2(old_freq, nl=1)))
>>> assert sum(new_freq.values()) == sum(old_freq.values())
>>> assert new_freq['kitten'] == 1
"""
new_cid = self._resolve_to_cid(cid_or_cat)
ann = self._resolve_to_ann(aid_or_ann)
aid = ann['id']
if self.index:
if 'category_id' in ann:
old_cid = ann['category_id']
self.cid_to_aids[old_cid].remove(aid)
ann['category_id'] = new_cid
if self.index:
self.cid_to_aids[new_cid].add(aid)
self._invalidate_hashid(['annotations'])
[docs]class CocoIndex(object):
"""
Fast lookup index for the COCO dataset with dynamic modification
Attributes:
imgs (Dict[int, dict]):
mapping between image ids and the image dictionaries
anns (Dict[int, dict]):
mapping between annotation ids and the annotation dictionaries
cats (Dict[int, dict]):
mapping between category ids and the category dictionaries
"""
# _set = ub.oset # many operations are much slower for oset
_set = set
def __init__(index):
index.anns = None
index.imgs = None
index.videos = None
index.cats = None
index._id_lookup = None
index.gid_to_aids = None
index.cid_to_aids = None
index.vidid_to_gids = None
index.name_to_video = None
index.name_to_cat = None
index.file_name_to_img = None
index._CHECKS = True
# index.name_to_video = None # TODO
# index.kpcid_to_aids = None # TODO
def __bool__(index):
return index.anns is not None
__nonzero__ = __bool__ # python 2 support
# On-demand lookup tables
@property
def cid_to_gids(index):
"""
Example:
>>> import kwcoco
>>> self = dset = kwcoco.CocoDataset()
>>> self.index.cid_to_gids
"""
from scriptconfig.dict_like import DictLike
class ProxyCidToGids(DictLike):
def __init__(self, index):
self.index = index
def getitem(self, cid):
aids = self.index.cid_to_aids[cid]
gids = {self.index.anns[aid]['image_id'] for aid in aids}
return gids
def keys(self):
return self.index.cid_to_aids.keys()
cid_to_gids = ProxyCidToGids(index=index)
return cid_to_gids
def _add_video(index, vidid, video):
if index.videos is not None:
# name = video['name']
# if index._CHECKS:
# if name in index.name_to_video:
# raise ValueError(
# 'video with name={} already exists'.format(name))
index.videos[vidid] = video
if vidid not in index.vidid_to_gids:
index.vidid_to_gids[vidid] = index._set()
# index.name_to_video[name] = video
def _add_image(index, gid, img):
"""
Example:
>>> # Test adding image to video that doesnt exist
>>> import kwcoco
>>> self = dset = kwcoco.CocoDataset()
>>> dset.add_image(file_name='frame1', video_id=1)
>>> dset.add_image(file_name='frame2', video_id=1)
>>> dset._check_pointers()
>>> dset._check_index()
>>> print('dset.index.vidid_to_gids = {!r}'.format(dset.index.vidid_to_gids))
>>> assert len(dset.index.vidid_to_gids) == 1
>>> dset.add_video(name='foo-vid', id=1)
>>> assert len(dset.index.vidid_to_gids) == 1
>>> dset._check_pointers()
>>> dset._check_index()
"""
if index.imgs is not None:
file_name = img['file_name']
if index._CHECKS:
if file_name in index.file_name_to_img:
raise ValueError(
'image with file_name={} already exists'.format(
file_name))
index.imgs[gid] = img
index.gid_to_aids[gid] = index._set()
index.file_name_to_img[file_name] = img
if 'video_id' in img:
vidid = img['video_id']
try:
index.vidid_to_gids[vidid].add(gid)
except KeyError:
# Should warning messages contain data-specific info?
# msg = ('Adding image-id={} to '
# 'non-existing video-id={}').format(gid, vidid)
msg = 'Adding image to non-existing video'
warnings.warn(msg)
index.vidid_to_gids[vidid] = index._set()
index.vidid_to_gids[vidid].add(gid)
def _add_images(index, imgs):
"""
Note:
THIS FUNCTION WAS DESIGNED FOR SPEED, AS SUCH IT DOES NOT CHECK IF
THE IMAGE-IDs or FILE_NAMES ARE DUPLICATED AND WILL BLINDLY ADD
DATA EVEN IF IT IS BAD. THE SINGLE IMAGE VERSION IS SLOWER BUT
SAFER.
Ignore:
# If we did do checks, what would be the fastest way?
import kwcoco
x = kwcoco.CocoDataset()
for i in range(1000):
x.add_image(file_name=str(i))
y = kwcoco.CocoDataset()
for i in range(1000, 2000):
y.add_image(file_name=str(i))
imgs = list(y.imgs.values())
new_file_name_to_img = {img['file_name']: img for img in imgs}
import ubelt as ub
ti = ub.Timerit(100, bestof=10, verbose=2)
for timer in ti.reset('set intersection'):
with timer:
# WINNER
bool(set(x.index.file_name_to_img) & set(new_file_name_to_img))
for timer in ti.reset('dict contains'):
with timer:
any(f in x.index.file_name_to_img
for f in new_file_name_to_img.keys())
"""
if index.imgs is not None:
gids = [img['id'] for img in imgs]
new_imgs = dict(zip(gids, imgs))
index.imgs.update(new_imgs)
index.file_name_to_img.update(
{img['file_name']: img for img in imgs})
for gid in gids:
index.gid_to_aids[gid] = index._set()
if index.vidid_to_gids:
vidid_to_gids = ub.group_items(
[g['id'] for g in imgs],
[g.get('video_id', None) for g in imgs]
)
vidid_to_gids.pop(None, None)
for vidid, gids in vidid_to_gids.items():
index.vidid_to_gids[vidid].update(gids)
def _add_annotation(index, aid, gid, cid, ann):
if index.anns is not None:
index.anns[aid] = ann
# Note: it should be ok to have None's here
index.gid_to_aids[gid].add(aid)
index.cid_to_aids[cid].add(aid)
def _add_annotations(index, anns):
if index.anns is not None:
aids = [ann['id'] for ann in anns]
gids = [ann['image_id'] for ann in anns]
cids = [ann['category_id'] for ann in anns]
new_anns = dict(zip(aids, anns))
index.anns.update(new_anns)
for gid, cid, aid in zip(gids, cids, aids):
index.gid_to_aids[gid].add(aid)
index.cid_to_aids[cid].add(aid)
def _add_category(index, cid, name, cat):
if index.cats is not None:
index.cats[cid] = cat
index.cid_to_aids[cid] = index._set()
index.name_to_cat[name] = cat
def _remove_all_annotations(index):
# Keep the category and image indexes alive
if index.anns is not None:
index.anns.clear()
for _ in index.gid_to_aids.values():
_.clear()
for _ in index.cid_to_aids.values():
_.clear()
def _remove_all_images(index):
# Keep the category indexes alive
if index.imgs is not None:
index.imgs.clear()
index.anns.clear()
index.gid_to_aids.clear()
index.file_name_to_img.clear()
for _ in index.cid_to_aids.values():
_.clear()
for _ in index.vidid_to_gids.values():
_.clear()
def _remove_annotations(index, remove_aids, verbose=0):
if index.anns is not None:
if verbose > 1:
print('Updating annotation index')
# This is faster for simple set cid_to_aids
for aid in remove_aids:
ann = index.anns.pop(aid)
gid = ann['image_id']
cid = ann['category_id']
index.cid_to_aids[cid].remove(aid)
index.gid_to_aids[gid].remove(aid)
def _remove_categories(index, remove_cids, verbose=0):
# dynamically update the category index
if index.cats is not None:
for cid in remove_cids:
cat = index.cats.pop(cid)
del index.cid_to_aids[cid]
del index.name_to_cat[cat['name']]
if verbose > 2:
print('Updated category index')
def _remove_images(index, remove_gids, verbose=0):
# dynamically update the image index
if index.imgs is not None:
for gid in remove_gids:
img = index.imgs.pop(gid)
vidid = img.get('video_id', None)
if vidid in index.vidid_to_gids:
index.vidid_to_gids[vidid].remove(gid)
del index.gid_to_aids[gid]
del index.file_name_to_img[img['file_name']]
if verbose > 2:
print('Updated image index')
[docs] def clear(index):
index.anns = None
index.imgs = None
index.videos = None
index.cats = None
index._id_lookup = None
index.gid_to_aids = None
index.vidid_to_gids = None
index.cid_to_aids = None
index.name_to_cat = None
index.file_name_to_img = None
# index.name_to_video = None
# index.kpcid_to_aids = None # TODO
[docs] def build(index, parent):
"""
Build all id-to-obj reverse indexes from scratch.
Args:
parent (CocoDataset): the dataset to index
Notation:
aid - Annotation ID
gid - imaGe ID
cid - Category ID
vidid - Video ID
Example:
>>> from kwcoco.demo.toydata import * # NOQA
>>> parent = CocoDataset.demo('vidshapes1', num_frames=4, rng=1)
>>> index = parent.index
>>> index.build(parent)
"""
# create index
anns, cats, imgs = {}, {}, {}
videos = {}
# Build one-to-one index-lookup maps
for cat in parent.dataset.get('categories', []):
cid = cat['id']
if cid in cat:
warnings.warn(
'Categories have the same id in {}:\n{} and\n{}'.format(
parent, cats[cid], cat))
cats[cid] = cat
for video in parent.dataset.get('videos', []):
vidid = video['id']
if vidid in videos:
warnings.warn(
'Video has the same id in {}:\n{} and\n{}'.format(
parent, videos[vidid], video))
videos[vidid] = video
for img in parent.dataset.get('images', []):
gid = img['id']
if gid in imgs:
warnings.warn(
'Images have the same id in {}:\n{} and\n{}'.format(
parent, imgs[gid], img))
imgs[gid] = img
for ann in parent.dataset.get('annotations', []):
aid = ann['id']
if aid in anns:
warnings.warn(
'Annotations at index {} and {} '
'have the same id in {}:\n{} and\n{}'.format(
parent.dataset['annotations'].index(anns[aid]),
parent.dataset['annotations'].index(ann),
parent, anns[aid], ann))
anns[aid] = ann
# Build one-to-many lookup maps
vidid_to_gids = ub.group_items(
[g['id'] for g in imgs.values()],
[g.get('video_id', None) for g in imgs.values()]
)
vidid_to_gids.pop(None, None)
# Ensure that the values are cast to the appropriate set type
vidid_to_gids = ub.map_vals(index._set, vidid_to_gids)
if 0:
# The following is slightly slower, but it is also many fewer lines
# Not sure if its correct to replace the else block or not
aids = [d['id'] for d in anns.values()]
gid_to_aids = ub.group_items(aids, (d['image_id'] for d in anns.values()))
cid_to_aids = ub.group_items(aids, (d.get('category_id', None) for d in anns.values()))
cid_to_aids.pop(None, None)
gid_to_aids = ub.map_vals(index._set, gid_to_aids)
cid_to_aids = ub.map_vals(index._set, cid_to_aids)
vidid_to_gids = ub.map_vals(index._set, vidid_to_gids)
else:
gid_to_aids = defaultdict(index._set)
cid_to_aids = defaultdict(index._set)
for ann in anns.values():
try:
aid = ann['id']
gid = ann['image_id']
except KeyError:
raise KeyError('Annotation does not have ids {}'.format(ann))
if not isinstance(aid, INT_TYPES):
raise TypeError('bad aid={} type={}'.format(aid, type(aid)))
if not isinstance(gid, INT_TYPES):
raise TypeError('bad gid={} type={}'.format(gid, type(gid)))
gid_to_aids[gid].add(aid)
if gid not in imgs:
warnings.warn('Annotation {} in {} references '
'unknown image_id'.format(ann, parent))
try:
cid = ann['category_id']
except KeyError:
warnings.warn('Annotation {} in {} is missing '
'a category_id'.format(ann, parent))
else:
cid_to_aids[cid].add(aid)
if not isinstance(cid, INT_TYPES) and cid is not None:
raise TypeError('bad cid={} type={}'.format(cid, type(cid)))
if cid not in cats and cid is not None:
warnings.warn('Annotation {} in {} references '
'unknown category_id'.format(ann, parent))
# Fix one-to-zero cases
for cid in cats.keys():
if cid not in cid_to_aids:
cid_to_aids[cid] = index._set()
for gid in imgs.keys():
if gid not in gid_to_aids:
gid_to_aids[gid] = index._set()
for vidid in videos.keys():
if vidid not in vidid_to_gids:
vidid_to_gids[vidid] = index._set()
# create class members
index._id_lookup = {
'categories': cats,
'images': imgs,
'annotations': anns,
'videos': videos,
}
index.anns = anns
index.imgs = imgs
index.cats = cats
index.videos = videos
# Remove defaultdict like behavior
gid_to_aids.default_factory = None
# Actually, its important to have defaultdict like behavior for
# categories so we can allow for the category_id=None case
# cid_to_aids.default_factory = None
# vidid_to_gids.default_factory = None
index.gid_to_aids = gid_to_aids
index.cid_to_aids = cid_to_aids
index.vidid_to_gids = vidid_to_gids
index.name_to_cat = {cat['name']: cat for cat in index.cats.values()}
index.file_name_to_img = {
img['file_name']: img for img in index.imgs.values()}
[docs]class MixinCocoIndex(object):
"""
Give the dataset top level access to index attributes
"""
@property
def anns(self):
return self.index.anns
@property
def imgs(self):
return self.index.imgs
@property
def cats(self):
return self.index.cats
@property
def videos(self):
return self.index.videos
@property
def gid_to_aids(self):
return self.index.gid_to_aids
@property
def cid_to_aids(self):
return self.index.cid_to_aids
@property
def name_to_cat(self):
return self.index.name_to_cat
[docs]class CocoDataset(ub.NiceRepr, MixinCocoAddRemove, MixinCocoStats,
MixinCocoAttrs, MixinCocoDraw, MixinCocoExtras,
MixinCocoIndex, MixinCocoDepricate):
"""
Notes:
A keypoint annotation
{
"image_id" : int,
"category_id" : int,
"keypoints" : [x1,y1,v1,...,xk,yk,vk],
"score" : float,
}
Note that ``v[i]`` is a visibility flag, where v=0: not labeled,
v=1: labeled but not visible, and v=2: labeled and visible.
A bounding box annotation
{
"image_id" : int,
"category_id" : int,
"bbox" : [x,y,width,height],
"score" : float,
}
We also define a non-standard "line" annotation (which
our fixup scripts will interpret as the diameter of a circle to
convert into a bounding box)
A line* annotation (note this is a non-standard field)
{
"image_id" : int,
"category_id" : int,
"line" : [x1,y1,x2,y2],
"score" : float,
}
Lastly, note that our datasets will sometimes specify multiple bbox,
line, and/or, keypoints fields. In this case we may also specify a
field roi_shape, which denotes which field is the "main" annotation
type.
Attributes:
dataset (Dict): raw json data structure. This is the base dictionary
that contains {'annotations': List, 'images': List,
'categories': List}
index (CocoIndex): an efficient lookup index into the coco data
structure. The index defines its own attributes like
``anns``, ``cats``, ``imgs``, etc. See :class:`CocoIndex` for more
details on which attributes are available.
fpath (PathLike | None):
if known, this stores the filepath the dataset was loaded from
tag (str):
A tag indicating the name of the dataset.
img_root (PathLike | None) :
If known, this is the root path that all image file names are
relative to. This can also be manually overwritten by the user.
hashid (str | None) :
If computed, this will be a hash uniquely identifing the dataset.
To ensure this is computed see :func:`_build_hashid`.
References:
http://cocodataset.org/#format
http://cocodataset.org/#download
CommandLine:
python -m kwcoco.coco_dataset CocoDataset --show
Example:
>>> dataset = demo_coco_data()
>>> self = CocoDataset(dataset, tag='demo')
>>> # xdoctest: +REQUIRES(--show)
>>> self.show_image(gid=2)
>>> from matplotlib import pyplot as plt
>>> plt.show()
"""
def __init__(self, data=None, tag=None, img_root=None, autobuild=True):
"""
Args:
data (str | dict):
Either a filepath to a coco json file, or a dictionary
containing the actual coco json structure. For a more generally
coercable constructor see func:`CocoDataset.coerce`.
tag (str) :
Name of the dataset for display purposes, and does not
influence behavior of the underlying data structure, although
it may be used via convinience methods. We attempt to
autopopulate this via information in ``data`` if available.
If unspecfied and ``data`` is a filepath this becomes the
basename.
img_root (str | None):
the root of the dataset that images / external data will be
assumed to be relative to. (in the future this name might
change to data_root). If unspecfied, we attempt to determine it
using information in ``data``. If ``data`` is a filepath, we
use the dirname of that path. If ``data`` is a dictionary, we
look for the "img_root" key. If unspecfied and we fail to
introspect then, we fallback to the current working directory.
"""
if data is None:
# TODO: rely on subset of SPEC keys
data = {
'categories': [],
'videos': [],
'images': [],
'annotations': [],
'licenses': [],
'info': [],
}
fpath = None
if isinstance(data, dict):
# Assumption: If data is a dict and are not explicitly given
# img_root, then we assume it is relative to the cwd.
assumed_root = '.'
elif isinstance(data, six.string_types):
fpath = data
key = basename(fpath)
data = json.load(open(fpath, 'r'))
# If data is a path it gives us the absolute location of the root
assumed_root = dirname(fpath)
if tag is None:
tag = key
else:
raise TypeError(
'data must be a dict or path to json file, '
'but got: {!r}'.format(type(data)))
if img_root is None:
if 'img_root' in data:
# allow image root to be specified in the dataset
# we refer to this as a json data "body root".
body_root = data['img_root']
if body_root is None:
body_root = ''
elif isinstance(body_root, six.string_types):
_tmp = ub.expandpath(body_root)
if exists(_tmp):
body_root = _tmp
else:
if isinstance(body_root, list) and body_root == []:
body_root = ''
else:
raise TypeError('body_root = {!r}'.format(body_root))
try:
img_root = join(assumed_root, body_root)
except Exception:
print('body_root = {!r}'.format(body_root))
print('assumed_root = {!r}'.format(assumed_root))
raise
else:
img_root = assumed_root
self.index = CocoIndex()
self.hashid = None
self.hashid_parts = None
self.fpath = fpath
self.tag = tag
self.dataset = data
self.img_root = ub.expandpath(img_root)
# Keep track of an unused id we may use
self._next_ids = _NextId(self)
if autobuild:
self._build_index()
[docs] @classmethod
def from_data(CocoDataset, data, img_root=None):
"""
Constructor from a json dictionary
"""
coco_dset = CocoDataset(data, img_root=img_root)
return coco_dset
[docs] @classmethod
def from_image_paths(CocoDataset, gpaths, img_root=None):
"""
Constructor from a list of images paths
Example:
>>> coco_dset = CocoDataset.from_image_paths(['a.png', 'b.png'])
>>> assert coco_dset.n_images == 2
"""
coco_dset = CocoDataset(img_root=img_root)
for gpath in gpaths:
coco_dset.add_image(gpath)
return coco_dset
[docs] @classmethod
def from_coco_paths(CocoDataset, fpaths, max_workers=0, verbose=1,
mode='thread', union='try'):
"""
Constructor from multiple coco file paths.
Loads multiple coco datasets and unions the result
Notes:
if the union operation fails, the list of individually loaded files
is returned instead.
Args:
fpaths (List[str]): list of paths to multiple coco files to be
loaded and unioned.
max_workers (int, default=0): number of worker threads / processes
verbose (int): verbosity level
mode (str): thread, process, or serial
union (str | bool, default='try'): If True, unions the result
datasets after loading. If False, just returns the result list.
If 'try', then try to preform the union, but return the result
list if it fails.
"""
# Can this be done better with asyncio?
from kwcoco.util import util_futures
jobs = util_futures.JobPool(mode, max_workers=max_workers)
for fpath in ub.ProgIter(fpaths, desc='submit load coco jobs', verbose=verbose):
jobs.submit(CocoDataset, fpath, autobuild=False)
results = [f.result() for f in ub.ProgIter(jobs.as_completed(),
desc='collect load coco jobs', total=len(jobs), verbose=verbose)]
if union:
try:
if verbose:
# TODO: it would be nice if we had a way to combine results on
# the fly, so we can work while the remaining io jobs are
# loading
print('combining results')
coco_dset = CocoDataset.union(*results)
except Exception as ex:
if union == 'try':
warnings.warn('Failed to union coco results: {!r}'.format(ex))
return results
else:
raise
else:
return coco_dset
else:
return results
[docs] def copy(self):
"""
Deep copies this object
Example:
>>> from kwcoco.coco_dataset import *
>>> self = CocoDataset.demo()
>>> new = self.copy()
>>> assert new.imgs[1] is new.dataset['images'][0]
>>> assert new.imgs[1] == self.dataset['images'][0]
>>> assert new.imgs[1] is not self.dataset['images'][0]
"""
new = copy.copy(self)
new.index = CocoIndex()
new.hashid_parts = copy.deepcopy(self.hashid_parts)
new.dataset = copy.deepcopy(self.dataset)
new._next_ids = _NextId(new)
new._build_index()
return new
def __nice__(self):
parts = []
parts.append('tag={}'.format(self.tag))
if self.dataset is not None:
info = ub.repr2(self.basic_stats(), kvsep='=', si=1, nobr=1, nl=0)
parts.append(info)
return ', '.join(parts)
[docs] def dumps(self, indent=None, newlines=False):
"""
Writes the dataset out to the json format
Args:
newlines (bool) :
if True, each annotation, image, category gets its own line
Notes:
Using newlines=True is similar to:
print(ub.repr2(dset.dataset, nl=2, trailsep=False))
However, the above may not output valid json if it contains
ndarrays.
Example:
>>> from kwcoco.coco_dataset import *
>>> import json
>>> self = CocoDataset.demo()
>>> text = self.dumps(newlines=True)
>>> print(text)
>>> self2 = CocoDataset(json.loads(text), tag='demo2')
>>> assert self2.dataset == self.dataset
>>> assert self2.dataset is not self.dataset
>>> text = self.dumps(newlines=True)
>>> print(text)
>>> self2 = CocoDataset(json.loads(text), tag='demo2')
>>> assert self2.dataset == self.dataset
>>> assert self2.dataset is not self.dataset
Ignore:
for k in self2.dataset:
if self.dataset[k] == self2.dataset[k]:
print('YES: k = {!r}'.format(k))
else:
print('NO: k = {!r}'.format(k))
self2.dataset['categories']
self.dataset['categories']
"""
def _json_dumps(data, indent=None):
fp = StringIO()
json.dump(data, fp, indent=indent, ensure_ascii=False)
fp.seek(0)
text = fp.read()
return text
# Instead of using json to dump the whole thing make the text a bit
# more pretty.
if newlines:
if indent is None:
indent = ''
if isinstance(indent, int):
indent = ' ' * indent
dict_lines = []
main_keys = SPEC_KEYS
other_keys = sorted(set(self.dataset.keys()) - set(main_keys))
for key in main_keys:
if key not in self.dataset:
continue
# We know each main entry is a list, so make it such that
# Each entry gets its own line
value = self.dataset[key]
value_lines = [_json_dumps(v) for v in value]
if value_lines:
value_body = (',\n' + indent).join(value_lines)
value_repr = '[\n' + indent + value_body + '\n]'
else:
value_repr = '[]'
item_repr = '{}: {}'.format(_json_dumps(key), value_repr)
dict_lines.append(item_repr)
for key in other_keys:
# Dont assume anything about other data
value = self.dataset.get(key, [])
value_repr = _json_dumps(value)
item_repr = '{}: {}'.format(_json_dumps(key), value_repr)
dict_lines.append(item_repr)
text = '{\n' + ',\n'.join(dict_lines) + '\n}'
else:
# TODO: do main key sorting here as well
text = _json_dumps(self.dataset, indent=indent)
return text
[docs] def dump(self, file, indent=None, newlines=False):
"""
Writes the dataset out to the json format
Args:
file (PathLike | FileLike):
Where to write the data. Can either be a path to a file or an
open file pointer / stream.
newlines (bool) : if True, each annotation, image, category gets
its own line.
Example:
>>> import tempfile
>>> from kwcoco.coco_dataset import *
>>> self = CocoDataset.demo()
>>> file = tempfile.NamedTemporaryFile('w')
>>> self.dump(file)
>>> file.seek(0)
>>> text = open(file.name, 'r').read()
>>> print(text)
>>> file.seek(0)
>>> dataset = json.load(open(file.name, 'r'))
>>> self2 = CocoDataset(dataset, tag='demo2')
>>> assert self2.dataset == self.dataset
>>> assert self2.dataset is not self.dataset
>>> file = tempfile.NamedTemporaryFile('w')
>>> self.dump(file, newlines=True)
>>> file.seek(0)
>>> text = open(file.name, 'r').read()
>>> print(text)
>>> file.seek(0)
>>> dataset = json.load(open(file.name, 'r'))
>>> self2 = CocoDataset(dataset, tag='demo2')
>>> assert self2.dataset == self.dataset
>>> assert self2.dataset is not self.dataset
"""
if isinstance(file, six.string_types):
with open(file, 'w') as fp:
self.dump(fp, indent=indent, newlines=newlines)
else:
if newlines:
file.write(self.dumps(indent=indent, newlines=newlines))
else:
json.dump(self.dataset, file, indent=indent, ensure_ascii=False)
def _check_json_serializable(self, verbose=1):
"""
Debug which part of a coco dataset might not be json serializable
"""
from kwcoco.util.util_json import find_json_unserializable
bad_parts_gen = find_json_unserializable(self.dataset)
bad_parts = []
for part in bad_parts_gen:
if verbose == 3:
print('part = {!r}'.format(part))
elif verbose and len(bad_parts) == 0:
# print out the first one we find
print('Found at least one bad part = {!r}'.format(part))
bad_parts.append(part)
if verbose:
# if bad_parts:
# print(ub.repr2(bad_parts))
summary = 'There are {} total errors'.format(len(bad_parts))
print('summary = {}'.format(summary))
return bad_parts
def _check_integrity(self):
""" perform all checks """
self._check_index()
self._check_pointers()
assert len(self.missing_images()) == 0
def _check_index(self):
"""
Example:
>>> import kwcoco
>>> self = kwcoco.CocoDataset.demo()
>>> self._check_index()
>>> # Force a failure
>>> self.index.anns.pop(1)
>>> self.index.anns.pop(2)
>>> import pytest
>>> with pytest.raises(AssertionError):
>>> self._check_index()
"""
# We can verify our index invariants by copying the raw dataset and
# checking if the newly constructed index is the same as this index.
new_dataset = copy.deepcopy(self.dataset)
new = self.__class__(new_dataset, autobuild=False)
new._build_index()
checks = {}
checks['anns'] = self.index.anns == new.index.anns
checks['imgs'] = self.index.imgs == new.index.imgs
checks['cats'] = self.index.cats == new.index.cats
checks['gid_to_aids'] = self.index.gid_to_aids == new.index.gid_to_aids
checks['cid_to_aids'] = self.index.cid_to_aids == new.index.cid_to_aids
checks['name_to_cat'] = self.index.name_to_cat == new.index.name_to_cat
checks['file_name_to_img'] = self.index.file_name_to_img == new.index.file_name_to_img
checks['vidid_to_gids'] = self.index.vidid_to_gids == new.index.vidid_to_gids
failed_checks = {k: v for k, v in checks.items() if not v}
if any(failed_checks):
raise AssertionError(
'Failed index checks: {}'.format(list(failed_checks)))
return True
def _check_pointers(self, verbose=1):
"""
Check that all category and image ids referenced by annotations exist
TODO:
- [ ] Check video_id attr in images
"""
if not self.index:
raise Exception('Build index before running pointer check')
errors = []
annots = self.dataset['annotations']
iter_ = ub.ProgIter(annots, desc='check annots', enabled=verbose)
for ann in iter_:
aid = ann['id']
cid = ann['category_id']
gid = ann['image_id']
if cid not in self.cats:
if cid is not None:
errors.append('aid={} references bad cid={}'.format(aid, cid))
else:
if self.cats[cid]['id'] != cid:
errors.append('cid={} has a bad index'.format(cid))
if gid not in self.imgs:
errors.append('aid={} references bad gid={}'.format(aid, gid))
else:
if self.imgs[gid]['id'] != gid:
errors.append('gid={} has a bad index'.format(gid))
if 0:
# WIP
iter_ = ub.ProgIter(self.dataset['images'], desc='check images', enabled=verbose)
for img in iter_:
img['video_id']
pass
if errors:
raise Exception('\n'.join(errors))
elif verbose:
print('Pointers are consistent')
return True
def _build_index(self):
self.index.build(self)
[docs] def union(self, *others, **kwargs):
"""
Merges multiple :class:`CocoDataset` items into one. Names and
associations are retained, but ids may be different.
Args:
self : note that :func:`union` can be called as an instance method
or a class method. If it is a class method, then this is the
class type, otherwise the instance will also be unioned with
``others``.
*others : a series of CocoDatasets that we will merge
**kwargs : constructor options for the new merged CocoDataset
Returns:
CocoDataset: a new merged coco dataset
Example:
>>> # Test union works with different keypoint categories
>>> dset1 = CocoDataset.demo('shapes1')
>>> dset2 = CocoDataset.demo('shapes2')
>>> dset1.remove_keypoint_categories(['bot_tip', 'mid_tip', 'right_eye'])
>>> dset2.remove_keypoint_categories(['top_tip', 'left_eye'])
>>> dset_12a = CocoDataset.union(dset1, dset2)
>>> dset_12b = dset1.union(dset2)
>>> dset_21 = dset2.union(dset1)
>>> def add_hist(h1, h2):
>>> return {k: h1.get(k, 0) + h2.get(k, 0) for k in set(h1) | set(h2)}
>>> kpfreq1 = dset1.keypoint_annotation_frequency()
>>> kpfreq2 = dset2.keypoint_annotation_frequency()
>>> kpfreq_want = add_hist(kpfreq1, kpfreq2)
>>> kpfreq_got1 = dset_12a.keypoint_annotation_frequency()
>>> kpfreq_got2 = dset_12b.keypoint_annotation_frequency()
>>> assert kpfreq_want == kpfreq_got1
>>> assert kpfreq_want == kpfreq_got2
>>> # Test disjoint gid datasets
>>> import kwcoco
>>> dset1 = kwcoco.CocoDataset.demo('shapes3')
>>> for new_gid, img in enumerate(dset1.dataset['images'], start=10):
>>> for aid in dset1.gid_to_aids[img['id']]:
>>> dset1.anns[aid]['image_id'] = new_gid
>>> img['id'] = new_gid
>>> dset1.index.clear()
>>> dset1._build_index()
>>> # ------
>>> dset2 = kwcoco.CocoDataset.demo('shapes2')
>>> for new_gid, img in enumerate(dset2.dataset['images'], start=100):
>>> for aid in dset2.gid_to_aids[img['id']]:
>>> dset2.anns[aid]['image_id'] = new_gid
>>> img['id'] = new_gid
>>> dset1.index.clear()
>>> dset2._build_index()
>>> others = [dset1, dset2]
>>> merged = kwcoco.CocoDataset.union(*others)
>>> print('merged = {!r}'.format(merged))
>>> print('merged.imgs = {}'.format(ub.repr2(merged.imgs, nl=1)))
>>> assert set(merged.imgs) & set([10, 11, 12, 100, 101]) == set(merged.imgs)
>>> # Test data is not preserved
>>> dset2 = kwcoco.CocoDataset.demo('shapes2')
>>> dset1 = kwcoco.CocoDataset.demo('shapes3')
>>> others = (dset1, dset2)
>>> cls = self = kwcoco.CocoDataset
>>> merged = cls.union(*others)
>>> print('merged = {!r}'.format(merged))
>>> print('merged.imgs = {}'.format(ub.repr2(merged.imgs, nl=1)))
>>> assert set(merged.imgs) & set([1, 2, 3, 4, 5]) == set(merged.imgs)
TODO:
- [ ] are supercategories broken?
- [ ] reuse image ids where possible
- [ ] reuse annotation / category ids where possible
- [ ] disambiguate track-ids
- [x] disambiguate video-ids
"""
if self.__class__ is type:
# Method called as classmethod
cls = self
else:
# Method called as instancemethod
cls = self.__class__
others = (self,) + others
def _coco_union(relative_dsets, common_root):
""" union of dictionary based data structure """
# TODO: rely on subset of SPEC keys
merged = _dict([
('licenses', []),
('info', []),
('categories', []),
('videos', []),
('images', []),
('annotations', []),
])
# TODO: need to handle keypoint_categories
merged_cat_name_to_id = {}
merged_kp_name_to_id = {}
def update_ifnotin(d1, d2):
""" copies keys from d2 that doent exist in d1 into d1 """
for k, v in d2.items():
if k not in d1:
d1[k] = v
return d1
def _has_duplicates(items):
seen = set()
for item in items:
if item in seen:
return True
seen.add(item)
return False
_all_imgs = (img for _, d in relative_dsets for img in d['images'])
_all_gids = (img['id'] for img in _all_imgs)
preserve_gids = not _has_duplicates(_all_gids)
_all_videos = (video for _, d in relative_dsets for video in (d.get('videos', None) or []))
_all_vidids = (video['id'] for video in _all_videos)
preserve_vidids = not _has_duplicates(_all_vidids)
for subdir, old_dset in relative_dsets:
# Create temporary indexes to map from old to new
cat_id_map = {None: None}
img_id_map = {}
video_id_map = {}
kpcat_id_map = {}
# Add the licenses / info into the merged dataset
# Licenses / info are unused in our datas, so this might not be
# correct
merged['licenses'].extend(old_dset.get('licenses', []))
merged['info'].extend(old_dset.get('info', []))
# Add the categories into the merged dataset
for old_cat in old_dset['categories']:
new_id = merged_cat_name_to_id.get(old_cat['name'], None)
# The same category might exist in different datasets.
if new_id is None:
# Only add if it does not yet exist
new_id = len(merged_cat_name_to_id) + 1
merged_cat_name_to_id[old_cat['name']] = new_id
new_cat = _dict([
('id', new_id),
('name', old_cat['name']),
# ('supercategory', old_cat['supercategory']),
])
update_ifnotin(new_cat, old_cat)
merged['categories'].append(new_cat)
cat_id_map[old_cat['id']] = new_id
# Add the keypoint categories into the merged dataset
if 'keypoint_categories' in old_dset:
if 'keypoint_categories' not in merged:
merged['keypoint_categories'] = []
old_id_to_name = {k['id']: k['name']
for k in old_dset['keypoint_categories']}
postproc_kpcats = []
for old_kpcat in old_dset['keypoint_categories']:
new_id = merged_kp_name_to_id.get(old_kpcat['name'], None)
# The same kpcategory might exist in different datasets.
if new_id is None:
# Only add if it does not yet exist
new_id = len(merged_kp_name_to_id) + 1
merged_kp_name_to_id[old_kpcat['name']] = new_id
new_kpcat = _dict([
('id', new_id),
('name', old_kpcat['name']),
])
update_ifnotin(new_kpcat, old_kpcat)
old_reflect_id = new_kpcat.get('reflection_id', None)
if old_reflect_id is not None:
# Temporarilly overwrite reflectid with name
reflect_name = old_id_to_name.get(old_reflect_id, None)
new_kpcat['reflection_id'] = reflect_name
postproc_kpcats.append(new_kpcat)
merged['keypoint_categories'].append(new_kpcat)
kpcat_id_map[old_kpcat['id']] = new_id
# Fix reflection ids
for kpcat in postproc_kpcats:
reflect_name = kpcat['reflection_id']
new_reflect_id = merged_kp_name_to_id.get(reflect_name, None)
kpcat['reflection_id'] = new_reflect_id
# Add the videos into the merged dataset
for old_video in old_dset.get('videos', []):
if preserve_vidids:
new_id = old_video['id']
else:
new_id = len(merged['videos']) + 1
new_video = _dict([
('id', new_id),
('name', join(subdir, old_video['name'])),
])
# copy over other metadata
update_ifnotin(new_video, old_video)
video_id_map[old_video['id']] = new_video['id']
merged['videos'].append(new_video)
# Add the images into the merged dataset
for old_img in old_dset['images']:
if preserve_gids:
new_id = old_img['id']
else:
new_id = len(merged['images']) + 1
new_img = _dict([
('id', new_id),
('file_name', join(subdir, old_img['file_name'])),
])
video_img_id = video_id_map.get(old_img.get('video_id'), None)
if video_img_id is not None:
new_img['video_id'] = video_img_id
# copy over other metadata
update_ifnotin(new_img, old_img)
img_id_map[old_img['id']] = new_img['id']
merged['images'].append(new_img)
# Add the annotations into the merged dataset
for old_annot in old_dset['annotations']:
old_cat_id = old_annot['category_id']
old_img_id = old_annot['image_id']
new_cat_id = cat_id_map.get(old_cat_id, ub.NoParam)
new_img_id = img_id_map.get(old_img_id, None)
if new_cat_id is ub.NoParam:
# NOTE: category_id is allowed to be None
warnings.warn('annot {} in {} has bad category-id {}'.format(
old_annot, subdir, old_cat_id))
# raise Exception
if new_img_id is None:
warnings.warn('annot {} in {} has bad image-id {}'.format(
old_annot, subdir, old_img_id))
# sanity check:
# if any(img['id'] == old_img_id for img in old_dset['images']):
# raise Exception('Image id {} does not exist in {}'.format(old_img_id, subdir))
new_annot = _dict([
('id', len(merged['annotations']) + 1),
('image_id', new_img_id),
('category_id', new_cat_id),
])
update_ifnotin(new_annot, old_annot)
if kpcat_id_map:
# Need to copy keypoint dict to not clobber original
# dset
if 'keypoints' in new_annot:
old_keypoints = new_annot['keypoints']
new_keypoints = copy.deepcopy(old_keypoints)
for kp in new_keypoints:
kp['keypoint_category_id'] = kpcat_id_map.get(
kp['keypoint_category_id'], None)
new_annot['keypoints'] = new_keypoints
merged['annotations'].append(new_annot)
return merged
# handle soft data roots
from os.path import normpath
soft_dset_roots = [dset.img_root for dset in others]
soft_dset_roots = [normpath(r) if r is not None else None for r in soft_dset_roots]
if ub.allsame(soft_dset_roots):
soft_img_root = ub.peek(soft_dset_roots)
else:
soft_img_root = None
# Handle hard coded data roots
from os.path import normpath
hard_dset_roots = [dset.dataset.get('img_root', None) for dset in others]
hard_dset_roots = [normpath(r) if r is not None else None for r in hard_dset_roots]
if ub.allsame(hard_dset_roots):
common_root = ub.peek(hard_dset_roots)
relative_dsets = [('', d.dataset) for d in others]
else:
common_root = None
relative_dsets = [(d.img_root, d.dataset) for d in others]
merged = _coco_union(relative_dsets, common_root)
if common_root is not None:
merged['img_root'] = common_root
new_dset = cls(merged, **kwargs)
if common_root is None and soft_img_root is not None:
new_dset.img_root = soft_img_root
return new_dset
[docs] def subset(self, gids, copy=False, autobuild=True):
"""
Return a subset of the larger coco dataset by specifying which images
to port. All annotations in those images will be taken.
Args:
gids (List[int]): image-ids to copy into a new dataset
copy (bool, default=False): if True, makes a deep copy of
all nested attributes, otherwise makes a shallow copy.
autobuild (bool, default=True): if True will automatically
build the fast lookup index.
Example:
>>> self = CocoDataset.demo()
>>> gids = [1, 3]
>>> sub_dset = self.subset(gids)
>>> assert len(self.gid_to_aids) == 3
>>> assert len(sub_dset.gid_to_aids) == 2
Example:
>>> self = CocoDataset.demo()
>>> sub1 = self.subset([1])
>>> sub2 = self.subset([2])
>>> sub3 = self.subset([3])
>>> others = [sub1, sub2, sub3]
>>> rejoined = CocoDataset.union(*others)
>>> assert len(sub1.anns) == 9
>>> assert len(sub2.anns) == 2
>>> assert len(sub3.anns) == 0
>>> assert rejoined.basic_stats() == self.basic_stats()
"""
new_dataset = _dict([(k, []) for k in self.dataset])
new_dataset['categories'] = self.dataset['categories']
new_dataset['info'] = self.dataset.get('info', [])
new_dataset['licenses'] = self.dataset.get('licenses', [])
if 'keypoint_categories' in self.dataset:
new_dataset['keypoint_categories'] = self.dataset['keypoint_categories']
if 'videos' in self.dataset:
# TODO: Take only videos with image support?
new_dataset['videos'] = self.dataset['videos']
gids = sorted(set(gids))
sub_aids = sorted([aid for gid in gids
for aid in self.gid_to_aids.get(gid, [])])
new_dataset['annotations'] = list(ub.take(self.anns, sub_aids))
new_dataset['images'] = list(ub.take(self.imgs, gids))
new_dataset['img_root'] = self.dataset.get('img_root', None)
if copy:
from copy import deepcopy
new_dataset = deepcopy(new_dataset)
sub_dset = CocoDataset(new_dataset, img_root=self.img_root,
autobuild=autobuild)
return sub_dset
def _delitems(items, remove_idxs, thresh=750):
"""
Args:
items (List): list which will be modified
remove_idxs (List[int]): integers to remove (MUST BE UNIQUE)
"""
if len(remove_idxs) > thresh:
# Its typically faster to just make a new list when there are
# lots and lots of items to remove.
keep_idxs = sorted(set(range(len(items))) - set(remove_idxs))
newlist = [items[idx] for idx in keep_idxs]
items[:] = newlist
else:
# However, when there are a few hundred items to remove, del is faster.
for idx in sorted(remove_idxs, reverse=True):
del items[idx]
[docs]def demo_coco_data():
"""
Simple data for testing
Ignore:
# code for getting a segmentation polygon
kwimage.grab_test_image_fpath('astro')
labelme /home/joncrall/.cache/kwimage/demodata/astro.png
cat /home/joncrall/.cache/kwimage/demodata/astro.json
Example:
>>> # xdoctest: +REQUIRES(--show)
>>> from kwcoco.coco_dataset import demo_coco_data, CocoDataset
>>> dataset = demo_coco_data()
>>> self = CocoDataset(dataset, tag='demo')
>>> import kwplot
>>> kwplot.autompl()
>>> self.show_image(gid=1)
>>> kwplot.show_if_requested()
"""
import kwimage
from kwimage.im_demodata import _TEST_IMAGES
from os.path import commonprefix, relpath
test_imgs_keys = ['astro', 'carl', 'stars']
urls = {k: _TEST_IMAGES[k]['url'] for k in test_imgs_keys}
gpaths = {k: kwimage.grab_test_image_fpath(k) for k in test_imgs_keys}
img_root = commonprefix(list(gpaths.values()))
gpath1, gpath2, gpath3 = ub.take(gpaths, test_imgs_keys)
url1, url2, url3 = ub.take(urls, test_imgs_keys)
# gpath2 = kwimage.grab_test_image_fpath('carl')
# gpath3 = kwimage.grab_test_image_fpath('stars')
# gpath1 = ub.grabdata('https://i.imgur.com/KXhKM72.png')
# gpath2 = ub.grabdata('https://i.imgur.com/flTHWFD.png')
# gpath3 = ub.grabdata('https://i.imgur.com/kCi7C1r.png')
# Make file names relative for consistent testing purpose
gname1 = relpath(gpath1, img_root)
gname2 = relpath(gpath2, img_root)
gname3 = relpath(gpath3, img_root)
dataset = {
'img_root': img_root,
'categories': [
{
'id': 1, 'name': 'astronaut',
'supercategory': 'human',
},
{'id': 2, 'name': 'rocket', 'supercategory': 'object'},
{'id': 3, 'name': 'helmet', 'supercategory': 'object'},
{
'id': 4, 'name': 'mouth',
'supercategory': 'human',
'keypoints': [
'mouth-right-corner',
'mouth-right-bot',
'mouth-left-bot',
'mouth-left-corner',
],
'skeleton': [[0, 1]],
},
{
'id': 5, 'name': 'star',
'supercategory': 'object',
'keypoints': ['star-center'],
'skeleton': [],
},
{'id': 6, 'name': 'astronomer', 'supercategory': 'human'},
{'id': 7, 'name': 'astroturf', 'supercategory': 'object'},
{
'id': 8, 'name': 'human',
'keypoints': ['left-eye', 'right-eye'],
'skeleton': [[0, 1]],
},
],
'images': [
# {'id': 1, 'file_name': gname1},
# {'id': 2, 'file_name': gname2},
# {'id': 3, 'file_name': gname3},
{'id': 1, 'file_name': gname1, 'url': url1},
{'id': 2, 'file_name': gname2, 'url': url2},
{'id': 3, 'file_name': gname3, 'url': url3},
],
'annotations': [
{'id': 1, 'image_id': 1, 'category_id': 1,
'bbox': [10, 10, 360, 490],
'keypoints': [247, 101, 2, 202, 100, 2],
'segmentation': [[
40, 509, 26, 486, 20, 419, 28, 334, 51, 266, 85, 229, 102,
216, 118, 197, 125, 176, 148, 151, 179, 147, 182, 134, 174,
128, 166, 115, 156, 94, 155, 64, 162, 48, 193, 34, 197, 26,
210, 21, 231, 14, 265, 24, 295, 49, 300, 90, 297, 111, 280,
126, 277, 132, 266, 137, 264, 152, 255, 164, 256, 174, 283,
195, 301, 220, 305, 234, 338, 262, 350, 286, 360, 326, 363,
351, 324, 369, 292, 404, 280, 448, 276, 496, 280, 511]],
},
{'id': 2, 'image_id': 1, 'category_id': 2,
'bbox': [350, 5, 130, 290]},
{'id': 3, 'image_id': 1, 'category_id': 3,
'line': [326, 369, 500, 500]},
{'id': 4, 'image_id': 1, 'category_id': 4,
'keypoints': [
202, 139, 2,
215, 150, 2,
229, 150, 2,
244, 142, 2,
]},
{'id': 5, 'image_id': 1, 'category_id': 5,
'keypoints': [37, 65, 1]},
{'id': 6, 'image_id': 1, 'category_id': 5,
'keypoints': [37, 16, 1]},
{'id': 7, 'image_id': 1, 'category_id': 5,
'keypoints': [3, 9, 1]},
{'id': 8, 'image_id': 1, 'category_id': 5,
'keypoints': [2, 111, 1]},
{'id': 9, 'image_id': 1, 'category_id': 5,
'keypoints': [2, 60, 1]},
{'id': 10, 'image_id': 2, 'category_id': 6,
'bbox': [37, 6, 230, 240]},
{'id': 11, 'image_id': 2, 'category_id': 4,
'bbox': [124, 96, 45, 18]}
],
'licenses': [],
'info': [],
}
return dataset
if __name__ == '__main__':
r"""
CommandLine:
xdoctest kwcoco.coco_dataset all
"""
import xdoctest
xdoctest.doctest_module(__file__)