Source code for kwcoco.coco_image

"""
Defines the CocoImage class which is an object oriented way of manipulating
data pointed to by a COCO image dictionary.

Notably this provides the ``.imdelay`` method for delayed image loading ( which
enables things like fast loading of subimage-regions / coarser scales in images
that contain tiles / overviews - e.g. Cloud Optimized Geotiffs or COGs (Medical
image formats may be supported in the future).

TODO:
    This file no longer is only images, it has logic for generic single-class
    objects. It should be refactored into coco_objects0d.py or something.
"""
import ubelt as ub
import os
import numpy as np
from os.path import join
from kwcoco.util.util_deprecate import deprecated_function_alias
from kwcoco.util.dict_proxy2 import AliasedDictProxy

try:
    from xdev import profile
except Exception:
    profile = ub.identity


__docstubs__ = """
from kwcoco.channel_spec import FusedChannelSpec
from kwcoco.coco_objects1d import Annots
"""


DEFAULT_RESOLUTION_KEYS = {
    'resolution',
    'target_gsd',  # only exists as a convinience for other projects. Remove in the future.
}


[docs] class _CocoObject(AliasedDictProxy, ub.NiceRepr): """ General coco scalar object """ __alias_to_primary__ = {} def __init__(self, obj, dset=None, bundle_dpath=None): self._proxy = obj self.dset = dset self._bundle_dpath = bundle_dpath @property def bundle_dpath(self): if self.dset is not None: return self.dset.bundle_dpath else: return self._bundle_dpath @bundle_dpath.setter def bundle_dpath(self, value): self._bundle_dpath = value
[docs] def detach(self): """ Removes references to the underlying coco dataset, but keeps special information such that it wont be needed. """ self._bundle_dpath = self.bundle_dpath self.dset = None return self
[docs] class CocoImage(_CocoObject): """ An object-oriented representation of a coco image. It provides helper methods that are specific to a single image. This operates directly on a single coco image dictionary, but it can optionally be connected to a parent dataset, which allows it to use CocoDataset methods to query about relationships and resolve pointers. This is different than the Images class in coco_object1d, which is just a vectorized interface to multiple objects. Example: >>> import kwcoco >>> dset1 = kwcoco.CocoDataset.demo('shapes8') >>> dset2 = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = kwcoco.CocoImage(dset1.imgs[1], dset1) >>> print('self = {!r}'.format(self)) >>> print('self.channels = {}'.format(ub.urepr(self.channels, nl=1))) >>> self = kwcoco.CocoImage(dset2.imgs[1], dset2) >>> print('self.channels = {}'.format(ub.urepr(self.channels, nl=1))) >>> self.primary_asset() >>> assert 'auxiliary' in self """ __alias_to_primary__ = { # In the future we will switch assets to be primary. 'assets': 'auxiliary', } def __init__(self, img, dset=None): super().__init__(img, dset=dset) self.img = img self._video = None
[docs] @classmethod def from_gid(cls, dset, gid): img = dset.index.imgs[gid] self = cls(img, dset=dset) return self
@property def video(self): """ Helper to grab the video for this image if it exists """ if self._video is None and self.dset is not None: vidid = self.img.get('video_id', None) if vidid is None: video = None else: video = self.dset.index.videos[vidid] else: video = self._video return video @video.setter def video(self, value): # TODO: ducktype with an object self._video = value @property def name(self): return self['name']
[docs] def detach(self): """ Removes references to the underlying coco dataset, but keeps special information such that it wont be needed. """ self._video = self.video return super().detach()
@property def assets(self): """ Convinience wrapper around :func: `CocoImage.iter_assets`. """ return list(self.iter_assets()) @ub.memoize_property def datetime(self): """ Try to get datetime information for this image. Not always possible. Returns: datetime.datetime | None """ from kwutil import util_time candidate_keys = [ 'datetime', 'timestamp', 'date_captured' ] for k in candidate_keys: v = self.img.get(k) if v is not None: return util_time.coerce_datetime(v) raise KeyError(f'No keys {candidate_keys} to coerce datetime ' 'found in {list(self.img.keys())}')
[docs] def annots(self): """ Returns: Annots: a 1d annotations object referencing annotations in this image """ return self.dset.annots(image_id=self.img['id'])
def __nice__(self): """ Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> with ub.CaptureStdout() as cap: ... dset = kwcoco.CocoDataset.demo('shapes8') >>> self = CocoImage(dset.dataset['images'][0], dset) >>> print('self = {!r}'.format(self)) >>> dset = kwcoco.CocoDataset.demo() >>> self = CocoImage(dset.dataset['images'][0], dset) >>> print('self = {!r}'.format(self)) """ from kwcoco.util.util_truncate import smart_truncate from functools import partial stats = ub.udict(self.stats()) stats = stats.map_values(str) stats = stats.map_values( partial(smart_truncate, max_length=32, trunc_loc=0.5)) return ub.urepr(stats, compact=1, nl=0)
[docs] def stats(self): """ """ key_attrname = [ ('wh', 'dsize'), ('n_chan', 'num_channels'), ('channels', 'channels'), ('name', 'name'), ] stats = {} for key, attrname in key_attrname: try: stats[key] = getattr(self, attrname) except Exception as ex: stats[key] = repr(ex) if 'channels' in stats: if stats['channels'] is not None: stats['channels'] = stats['channels'].spec return stats
def __contains__(self, key): if '_unstructured' in self._proxy: if AliasedDictProxy.__contains__(self, key): return True return key in self._proxy['_unstructured'] else: return AliasedDictProxy.__contains__(self, key)
[docs] def get(self, key, default=ub.NoParam): try: return self[key] except KeyError: if default is ub.NoParam: raise else: return default
[docs] def keys(self): """ Proxy getter attribute for underlying `self.img` dictionary """ if '_unstructured' in self._proxy: # SQL compatibility _keys = ub.flatten([self._proxy.keys(), self._proxy['_unstructured'].keys()]) return iter((k for k in _keys if k != '_unstructured')) else: return self._proxy.keys()
def __getitem__(self, key): """ Proxy getter attribute for underlying `self.img` dictionary CommandLine: xdoctest -m kwcoco.coco_image CocoImage.__getitem__ Example: >>> import pytest >>> # without _unstructured populated >>> import kwcoco >>> self = kwcoco.CocoImage({'foo': 1}) >>> assert self.get('foo') == 1 >>> assert self.get('foo', None) == 1 >>> # with _unstructured populated >>> self = kwcoco.CocoImage({'_unstructured': {'foo': 1}}) >>> assert self.get('foo') == 1 >>> assert self.get('foo', None) == 1 >>> # without _unstructured empty >>> self = kwcoco.CocoImage({}) >>> print('----') >>> with pytest.raises(KeyError): >>> self.get('foo') >>> assert self.get('foo', None) is None >>> # with _unstructured empty >>> self = kwcoco.CocoImage({'_unstructured': {'bar': 1}}) >>> with pytest.raises(KeyError): >>> self.get('foo') >>> assert self.get('foo', None) is None """ if AliasedDictProxy.__contains__(self, key): return AliasedDictProxy.__getitem__(self, key) else: _img = self._proxy if '_unstructured' in _img: # Workaround for sql-view, treat items in "_unstructured" as # if they are in the top level image. _extra = _img['_unstructured'] if key in _extra: return _extra[key] else: raise KeyError(key) else: raise KeyError(key) @property def channels(self): from kwcoco.channel_spec import FusedChannelSpec from kwcoco.channel_spec import ChannelSpec img_parts = [] for obj in self.iter_asset_objs(): obj_parts = obj.get('channels', None) if obj_parts is not None: # obj_chan = FusedChannelSpec.coerce(obj_parts).normalize() obj_chan = FusedChannelSpec.coerce(obj_parts) img_parts.append(obj_chan.spec) if not img_parts: return None # return ChannelSpec.coerce('*') spec = ChannelSpec(','.join(img_parts)) return spec @property def n_assets(self): """ The number of on-disk files associated with this coco image """ # Hacked because we have too many ways of having assets right now # really need a single assets table. has_main = int(self.get('file_name', None) is not None) num_group1 = len(self.img.get('assets', None) or []) num_group2 = len(self.img.get('auxiliary', None) or []) total = has_main + num_group1 + num_group2 return total @property def num_channels(self): return self.channels.numel() # return sum(map(len, self.channels.streams())) @property def dsize(self): width = self.img.get('width', None) height = self.img.get('height', None) return width, height
[docs] def primary_image_filepath(self, requires=None): dpath = ub.Path(self.bundle_dpath) fname = self.primary_asset()['file_name'] fpath = dpath / fname return fpath
[docs] def primary_asset(self, requires=None, as_dict=True): """ Compute a "main" image asset. Note: Uses a heuristic. * First, try to find the auxiliary image that has with the smallest distortion to the base image (if known via warp_aux_to_img) * Second, break ties by using the largest image if w / h is known * Last, if previous information not available use the first auxiliary image. Args: requires (List[str] | None): list of attribute that must be non-None to consider an object as the primary one. as_dict (bool): if True the return type is a raw dictionary. Otherwise use a newer object-oriented wrapper that should be duck-type swappable. In the future this default will change to False. Returns: None | dict : the asset dict or None if it is not found TODO: - [ ] Add in primary heuristics Example: >>> import kwarray >>> from kwcoco.coco_image import * # NOQA >>> rng = kwarray.ensure_rng(0) >>> def random_asset(name, w=None, h=None): >>> return {'file_name': name, 'width': w, 'height': h} >>> self = CocoImage({ >>> 'auxiliary': [ >>> random_asset('1'), >>> random_asset('2'), >>> random_asset('3'), >>> ] >>> }) >>> assert self.primary_asset()['file_name'] == '1' >>> self = CocoImage({ >>> 'auxiliary': [ >>> random_asset('1'), >>> random_asset('2', 3, 3), >>> random_asset('3'), >>> ] >>> }) >>> assert self.primary_asset()['file_name'] == '2' >>> # >>> # Test new objct oriented output >>> self = CocoImage({ >>> 'file_name': 'foo', >>> 'assets': [ >>> random_asset('1'), >>> random_asset('2'), >>> random_asset('3'), >>> ], >>> }) >>> assert self.primary_asset(as_dict=False) is self >>> self = CocoImage({ >>> 'assets': [ >>> random_asset('1'), >>> random_asset('3'), >>> ], >>> 'auxiliary': [ >>> random_asset('1'), >>> random_asset('2', 3, 3), >>> random_asset('3'), >>> ] >>> }) >>> assert self.primary_asset(as_dict=False)['file_name'] == '2' """ import kwimage if requires is None: requires = [] img = self.img has_base_image = img.get('file_name', None) is not None candidates = [] if has_base_image: obj = img if all(k in obj for k in requires): # Return the base image if we can if as_dict: return obj else: return self # Choose "best" auxiliary image based on a hueristic. eye = kwimage.Affine.eye().matrix asset_objs = img.get('auxiliary', img.get('assets', [])) or [] for idx, obj in enumerate(asset_objs): # Take frobenius norm to get "distance" between transform and # the identity. We want to find the auxiliary closest to the # identity transform. warp_aux_to_img = kwimage.Affine.coerce(obj.get('warp_aux_to_img', None)) fro_dist = np.linalg.norm(warp_aux_to_img - eye, ord='fro') w = obj.get('width', None) or 0 h = obj.get('height', None) or 0 if all(k in obj for k in requires): candidates.append({ 'idx': idx, 'area': w * h, 'fro_dist': fro_dist, 'obj': obj, }) if len(candidates) == 0: return None idx = ub.argmin( candidates, key=lambda val: ( val['fro_dist'], -val['area'], val['idx']) ) obj = candidates[idx]['obj'] if as_dict: return obj else: return CocoAsset(obj, bundle_dpath=self.bundle_dpath)
[docs] def iter_image_filepaths(self, with_bundle=True): """ Could rename to iter_asset_filepaths Args: with_bundle (bool): If True, prepends the bundle dpath to fully specify the path. Otherwise, just returns the registered string in the file_name attribute of each asset. Defaults to True. Yields: ub.Path """ dpath = ub.Path(self.bundle_dpath) for obj in self.iter_asset_objs(): fname = obj.get('file_name') fpath = dpath / fname yield fpath
[docs] def iter_assets(self): """ Iterate through assets (which could include the image itself it points to a file path). Object-oriented alternative to :func:`CocoImage.iter_asset_objs` Yields: CocoImage | CocoAsset: an asset object (or image object if it points to a file) Example: >>> import kwcoco >>> coco_img = kwcoco.CocoImage({'width': 128, 'height': 128}) >>> assert len(list(coco_img.iter_assets())) == 0 >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> assert len(list(self.iter_assets())) > 1 >>> dset = kwcoco.CocoDataset.demo('vidshapes8') >>> self = dset.coco_image(1) >>> assert list(self.iter_assets()) == [self] """ img = self.img has_base_image = img.get('file_name', None) is not None if has_base_image: yield self for obj in img.get('auxiliary', None) or []: yield CocoAsset(obj, self.bundle_dpath) for obj in img.get('assets', None) or []: yield CocoAsset(obj, self.bundle_dpath)
[docs] def iter_asset_objs(self): """ Iterate through base + auxiliary dicts that have file paths Note: In most cases prefer :func:`iter_assets` instead. Yields: dict: an image or auxiliary dictionary """ img = self.img has_base_image = img.get('file_name', None) is not None if has_base_image: yield img for obj in img.get('auxiliary', None) or []: yield obj for obj in img.get('assets', None) or []: yield obj
[docs] def find_asset(self, channels): """ Find the asset dictionary with the specified channels Args: channels (str | FusedChannelSpec): channel names the asset must have. Returns: CocoImage | CocoAsset Example: >>> import kwcoco >>> self = kwcoco.CocoImage({ >>> 'file_name': 'raw', >>> 'channels': 'red|green|blue', >>> 'assets': [ >>> {'file_name': '1', 'channels': 'spam'}, >>> {'file_name': '2', 'channels': 'eggs|jam'}, >>> ], >>> 'auxiliary': [ >>> {'file_name': '3', 'channels': 'foo'}, >>> {'file_name': '4', 'channels': 'bar|baz'}, >>> ] >>> }) >>> assert self.find_asset('blah') is None >>> assert self.find_asset('red|green|blue') is self >>> self.find_asset('foo')['file_name'] == '3' >>> self.find_asset('baz')['file_name'] == '4' """ obj = self.find_asset_obj(channels) if obj is not None: if obj is self.img: return self return CocoAsset(obj, bundle_dpath=self.bundle_dpath)
[docs] def find_asset_obj(self, channels): """ Find the asset dictionary with the specified channels In most cases use :func:`CocoImge.find_asset` instead. Example: >>> import kwcoco >>> coco_img = kwcoco.CocoImage({'width': 128, 'height': 128}) >>> coco_img.add_auxiliary_item( >>> 'rgb.png', channels='red|green|blue', width=32, height=32) >>> assert coco_img.find_asset_obj('red') is not None >>> assert coco_img.find_asset_obj('green') is not None >>> assert coco_img.find_asset_obj('blue') is not None >>> assert coco_img.find_asset_obj('red|blue') is not None >>> assert coco_img.find_asset_obj('red|green|blue') is not None >>> assert coco_img.find_asset_obj('red|green|blue') is not None >>> assert coco_img.find_asset_obj('black') is None >>> assert coco_img.find_asset_obj('r') is None Example: >>> # Test with concise channel code >>> import kwcoco >>> coco_img = kwcoco.CocoImage({'width': 128, 'height': 128}) >>> coco_img.add_auxiliary_item( >>> 'msi.png', channels='foo.0:128', width=32, height=32) >>> assert coco_img.find_asset_obj('foo') is None >>> assert coco_img.find_asset_obj('foo.3') is not None >>> assert coco_img.find_asset_obj('foo.3:5') is not None >>> assert coco_img.find_asset_obj('foo.3000') is None """ from kwcoco.channel_spec import FusedChannelSpec channels = FusedChannelSpec.coerce(channels) for obj in self.iter_asset_objs(): obj_channels = FusedChannelSpec.coerce(obj['channels']) if (obj_channels & channels).numel(): return obj
[docs] def _assets_key(self): """ Internal helper for transition from auxiliary -> assets in the image spec """ if 'auxiliary' in self: return 'auxiliary' elif 'assets' in self: return 'assets' else: return 'auxiliary'
[docs] def add_annotation(self, **ann): """ Adds an annotation to this image. This is a convinience method, and requires that this CocoImage is still connected to a parent dataset. Args: **ann: annotation attributes (e.g. bbox, category_id) Returns: int: the new annotation id SeeAlso: :func:`kwcoco.CocoDataset.add_annotation` """ if self.dset is None: raise RuntimeError( 'Can only add an annotation through a CocoImage ' 'if it is connected to its parent CocoDataset') return self.dset.add_annotation(image_id=self.img['id'], **ann)
[docs] def add_asset(self, file_name=None, channels=None, imdata=None, warp_aux_to_img=None, width=None, height=None, imwrite=False, image_id=None, **kw): """ Adds an auxiliary / asset item to the image dictionary. This operation can be done purely in-memory (the default), or the image data can be written to a file on disk (via the imwrite=True flag). Args: file_name (str | PathLike | None): The name of the file relative to the bundle directory. If unspecified, imdata must be given. channels (str | kwcoco.FusedChannelSpec | None): The channel code indicating what each of the bands represents. These channels should be disjoint wrt to the existing data in this image (this is not checked). imdata (ndarray | None): The underlying image data this auxiliary item represents. If unspecified, it is assumed file_name points to a path on disk that will eventually exist. If imdata, file_name, and the special imwrite=True flag are specified, this function will write the data to disk. warp_aux_to_img (kwimage.Affine | None): The transformation from this auxiliary space to image space. If unspecified, assumes this item is related to image space by only a scale factor. width (int | None): Width of the data in auxiliary space (inferred if unspecified) height (int | None): Height of the data in auxiliary space (inferred if unspecified) imwrite (bool): If specified, both imdata and file_name must be specified, and this will write the data to disk. Note: it it recommended that you simply call imwrite yourself before or after calling this function. This lets you better control imwrite parameters. image_id (int | None): An asset dictionary contains an image-id, but it should *not* be specified here. If it is, then it *must* agree with this image's id. **kw : stores arbitrary key/value pairs in this new asset. TODO: - [ ] Allow imwrite to specify an executor that is used to return a Future so the imwrite call does not block. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> coco_img = dset.coco_image(1) >>> imdata = np.random.rand(32, 32, 5) >>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5') >>> coco_img.add_asset(imdata=imdata, channels=channels) Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset() >>> gid = dset.add_image(name='my_image_name', width=200, height=200) >>> coco_img = dset.coco_image(gid) >>> coco_img.add_asset('path/img1_B0.tif', channels='B0', width=200, height=200) >>> coco_img.add_asset('path/img1_B1.tif', channels='B1', width=200, height=200) >>> coco_img.add_asset('path/img1_B2.tif', channels='B2', width=200, height=200) >>> coco_img.add_asset('path/img1_TCI.tif', channels='r|g|b', width=200, height=200) """ from os.path import isabs, join # NOQA import kwimage from kwcoco.channel_spec import FusedChannelSpec img = self.img if imdata is None and file_name is None: raise ValueError('must specify file_name or imdata') # Check type of resolution inputs. if not isinstance(width, int) and width is not None: raise TypeError(f'"width" input is neither an int or None variable but type: "{type(width)}"') if not isinstance(height, int) and height is not None: raise TypeError(f'"height" input is neither an int or None variable but type: "{type(height)}"') # Infer resolution inputs from image data. if width is None and imdata is not None: width = imdata.shape[1] if height is None and imdata is not None: height = imdata.shape[0] if warp_aux_to_img is None: img_width = img.get('width', None) img_height = img.get('height', None) if img_width is None or img_height is None: raise ValueError('Parent image canvas has an unknown size. ' 'Need to set width/height') if width is None or height is None: raise ValueError('Unable to infer warp_aux_to_img without width') # Assume we can just scale up the auxiliary data to match the image # space unless the user says otherwise warp_aux_to_img = kwimage.Affine.scale(( img_width / width, img_height / height)) else: warp_aux_to_img = kwimage.Affine.coerce(warp_aux_to_img) # Normalize for json serializability if channels is not None: channels = FusedChannelSpec.coerce(channels).spec if file_name is not None: file_name = os.fspath(file_name) # Make the asset info dict parent_image_id = img.get('id', None) if image_id is not None: if parent_image_id is not None: assert image_id == parent_image_id, ( f'The specified image_id ({image_id}) did not match the ' f'parent image id ({parent_image_id}) proprty.' ) else: image_id = parent_image_id obj = { 'image_id': image_id, # for when assets move to their own table 'file_name': file_name, 'height': height, 'width': width, 'channels': channels, 'warp_aux_to_img': warp_aux_to_img.concise(), } if imdata is not None: if imwrite: if __debug__ and file_name is None: raise ValueError( 'file_name must be given if imwrite is True') # if self.dset is None: # fpath = file_name # if not isabs(fpath): # raise ValueError(ub.paragraph( # ''' # Got relative file_name, but no dataset is attached # to this coco image. Attatch a dataset or use an # absolute path. # ''')) # else: fpath = join(self.bundle_dpath, file_name) kwimage.imwrite(fpath, imdata) else: obj['imdata'] = imdata obj.update(**kw) assets_key = self._assets_key() asset_list = img.get(assets_key, None) if asset_list is None: asset_list = img[assets_key] = [] asset_list.append(obj) if self.dset is not None: self.dset._invalidate_hashid()
[docs] @profile def imdelay(self, channels=None, space='image', resolution=None, bundle_dpath=None, interpolation='linear', antialias=True, nodata_method=None, RESOLUTION_KEY=None): """ Perform a delayed load on the data in this image. The delayed load can load a subset of channels, and perform lazy warping operations. If the underlying data is in a tiled format this can reduce the amount of disk IO needed to read the data if only a small crop or lower resolution view of the data is needed. Note: This method is experimental and relies on the delayed load proof-of-concept. Args: gid (int): image id to load channels (kwcoco.FusedChannelSpec): specific channels to load. if unspecified, all channels are loaded. space (str): can either be "image" for loading in image space, or "video" for loading in video space. resolution (None | str | float): If specified, applies an additional scale factor to the result such that the data is loaded at this specified resolution. This requires that the image / video has a registered resolution attribute and that its units agree with this request. TODO: - [ ] This function could stand to have a better name. Maybe imread with a delayed=True flag? Or maybe just delayed_load? Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> gid = 1 >>> # >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = CocoImage(dset.imgs[gid], dset) >>> delayed = self.imdelay() >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> # >>> dset = kwcoco.CocoDataset.demo('shapes8') >>> delayed = dset.coco_image(gid).imdelay() >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> crop = delayed.crop((slice(0, 3), slice(0, 3))) >>> crop.finalize() >>> # TODO: should only select the "red" channel >>> dset = kwcoco.CocoDataset.demo('shapes8') >>> delayed = CocoImage(dset.imgs[gid], dset).imdelay(channels='r') >>> import kwcoco >>> gid = 1 >>> # >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> delayed = dset.coco_image(gid).imdelay(channels='B1|B2', space='image') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).imdelay(channels='B1|B2|B11', space='image') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).imdelay(channels='B8|B1', space='video') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).imdelay(channels='B8|foo|bar|B1', space='video') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo() >>> coco_img = dset.coco_image(1) >>> # Test case where nothing is registered in the dataset >>> delayed = coco_img.imdelay() >>> final = delayed.finalize() >>> assert final.shape == (512, 512, 3) >>> delayed = coco_img.imdelay() >>> final = delayed.finalize() >>> print('final.shape = {}'.format(ub.urepr(final.shape, nl=1))) >>> assert final.shape == (512, 512, 3) Example: >>> # Test that delay works when imdata is stored in the image >>> # dictionary itself. >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> coco_img = dset.coco_image(1) >>> imdata = np.random.rand(6, 6, 5) >>> imdata[:] = np.arange(5)[None, None, :] >>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5') >>> coco_img.add_auxiliary_item(imdata=imdata, channels=channels) >>> delayed = coco_img.imdelay(channels='B1|Aux:2:4') >>> final = delayed.finalize() Example: >>> # Test delay when loading in asset space >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> stream1 = coco_img.channels.streams()[0] >>> stream2 = coco_img.channels.streams()[1] >>> asset_delayed = coco_img.imdelay(stream1, space='asset') >>> img_delayed = coco_img.imdelay(stream1, space='image') >>> vid_delayed = coco_img.imdelay(stream1, space='video') >>> # >>> aux_imdata = asset_delayed.as_xarray().finalize() >>> img_imdata = img_delayed.as_xarray().finalize() >>> assert aux_imdata.shape != img_imdata.shape >>> # Cannot load multiple asset items at the same time in >>> # asset space >>> import pytest >>> fused_channels = stream1 | stream2 >>> from delayed_image.delayed_nodes import CoordinateCompatibilityError >>> with pytest.raises(CoordinateCompatibilityError): >>> aux_delayed2 = coco_img.imdelay(fused_channels, space='asset') Example: >>> # Test loading at a specific resolution. >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> coco_img.img['resolution'] = '1 meter' >>> img_delayed1 = coco_img.imdelay(space='image') >>> vid_delayed1 = coco_img.imdelay(space='video') >>> # test with unitless request >>> img_delayed2 = coco_img.imdelay(space='image', resolution=3.1) >>> vid_delayed2 = coco_img.imdelay(space='video', resolution='3.1 meter') >>> np.ceil(img_delayed1.shape[0] / 3.1) == img_delayed2.shape[0] >>> np.ceil(vid_delayed1.shape[0] / 3.1) == vid_delayed2.shape[0] >>> # test with unitless data >>> coco_img.img['resolution'] = 1 >>> img_delayed2 = coco_img.imdelay(space='image', resolution=3.1) >>> vid_delayed2 = coco_img.imdelay(space='video', resolution='3.1 meter') >>> np.ceil(img_delayed1.shape[0] / 3.1) == img_delayed2.shape[0] >>> np.ceil(vid_delayed1.shape[0] / 3.1) == vid_delayed2.shape[0] """ from kwimage.transform import Affine from kwcoco.channel_spec import FusedChannelSpec if bundle_dpath is None: bundle_dpath = self.bundle_dpath img = self.img requested = channels if requested is not None: requested = FusedChannelSpec.coerce(requested).normalize() # Get info about the primary image and check if its channels are # requested (if it even has any) img_info = _delay_load_imglike(bundle_dpath, img, nodata_method=nodata_method) obj_info_list = [(img_info, img)] asset_list = img.get('auxiliary', img.get('assets', [])) or [] for asset in asset_list: info = _delay_load_imglike(bundle_dpath, asset, nodata_method=nodata_method) obj_info_list.append((info, asset)) chan_list = [] for info, obj in obj_info_list: if info.get('chan_construct', None) is not None: include_flag = requested is None if not include_flag: if requested.intersection(info['channels']): include_flag = True if include_flag: chncls, chnkw = info['chan_construct'] chan = chncls(**chnkw) quant = info.get('quantization', None) if quant is not None: chan = chan.dequantize(quant) if space not in {'auxiliary', 'asset'}: aux_to_img = Affine.coerce(obj.get('warp_aux_to_img', None)) chan = chan.warp( aux_to_img, dsize=img_info['dsize'], interpolation=interpolation, antialias=antialias, ) chan_list.append(chan) if space == 'video': video = self.video or {} width = video.get('width', img.get('width', None)) height = video.get('height', img.get('height', None)) elif space in {'asset', 'auxiliary'}: if len(chan_list) == 0: width = img.get('width', None) height = img.get('height', None) else: # TODO: should check these are all in the same space width, height = chan_list[0].dsize elif space == 'image': width = img.get('width', None) height = img.get('height', None) else: raise KeyError(space) dsize = (width, height) if len(chan_list) == 0: if requested is not None: # Handle case where the image doesnt have the requested # channels. # TODO: We should use a NoData node instead that # can switch between nans and masked images from delayed_image import DelayedNans from delayed_image import DelayedChannelConcat # delayed = DelayedNodata(dsize=dsize, channels=requested, nodata_method=nodata_method) delayed = DelayedNans(dsize=dsize, channels=requested) delayed = DelayedChannelConcat([delayed]) else: raise ValueError('no data registered in kwcoco image') else: from delayed_image import DelayedChannelConcat delayed = DelayedChannelConcat(chan_list) # Reorder channels in the requested order if requested is not None: delayed = delayed.take_channels(requested) if hasattr(delayed, 'components'): if len(delayed.components) == 1: delayed = delayed.components[0] if space in {'image', 'auxiliary', 'asset'}: pass elif space == 'video': img_to_vid = self.warp_vid_from_img delayed = delayed.warp(img_to_vid, dsize=dsize, interpolation=interpolation, antialias=antialias) else: raise KeyError('space = {}'.format(space)) if resolution is not None: # Adjust to the requested resolution factor = self._scalefactor_for_resolution( space=space, resolution=resolution, RESOLUTION_KEY=RESOLUTION_KEY) delayed = delayed.scale( factor, antialias=antialias, interpolation=interpolation) return delayed
[docs] @ub.memoize_method def valid_region(self, space='image'): """ If this image has a valid polygon, return it in image, or video space Returns: None | kwimage.MultiPolygon """ import kwimage valid_coco_poly = self.img.get('valid_region', None) if valid_coco_poly is None: valid_poly = None else: kw_poly_img = kwimage.MultiPolygon.coerce(valid_coco_poly) if kw_poly_img is None: valid_poly = None else: if space == 'image': valid_poly = kw_poly_img elif space == 'video': warp_vid_from_img = self.warp_vid_from_img valid_poly = kw_poly_img.warp(warp_vid_from_img) else: # To warp it into an auxiliary space we need to know which one raise NotImplementedError(space) return valid_poly
@ub.memoize_property def warp_vid_from_img(self): """ Affine transformation that warps image space -> video space. Returns: kwimage.Affine: The transformation matrix """ import kwimage warp_img_to_vid = kwimage.Affine.coerce(self.img.get('warp_img_to_vid', None)) if warp_img_to_vid.matrix is None: # Hack to ensure the matrix property always is an array warp_img_to_vid.matrix = np.asarray(warp_img_to_vid) return warp_img_to_vid @ub.memoize_property def warp_img_from_vid(self): """ Affine transformation that warps video space -> image space. Returns: kwimage.Affine: The transformation matrix """ return self.warp_vid_from_img.inv()
[docs] def _warp_for_resolution(self, space, resolution=None): """ Compute a transform from image-space to the requested space at a target resolution. """ import kwimage if space == 'image': warp_space_from_img = kwimage.Affine(None) elif space == 'video': warp_space_from_img = self.warp_vid_from_img else: raise NotImplementedError(space) # auxiliary/asset space if resolution is None: warp_final_from_img = warp_space_from_img else: # Requested the annotation at a resolution, so we need to apply a # scale factor scale = self._scalefactor_for_resolution(space=space, resolution=resolution) warp_final_from_space = kwimage.Affine.scale(scale) warp_final_from_img = warp_final_from_space @ warp_space_from_img return warp_final_from_img
[docs] def _annot_segmentation(self, ann, space='video', resolution=None): """" Load annotation segmentations in a requested space at a target resolution. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> coco_img.img['resolution'] = '1 meter' >>> ann = coco_img.annots().objs[0] >>> img_sseg = coco_img._annot_segmentation(ann, space='image') >>> vid_sseg = coco_img._annot_segmentation(ann, space='video') >>> img_sseg_2m = coco_img._annot_segmentation(ann, space='image', resolution='2 meter') >>> vid_sseg_2m = coco_img._annot_segmentation(ann, space='video', resolution='2 meter') >>> print(f'img_sseg.area = {img_sseg.area}') >>> print(f'vid_sseg.area = {vid_sseg.area}') >>> print(f'img_sseg_2m.area = {img_sseg_2m.area}') >>> print(f'vid_sseg_2m.area = {vid_sseg_2m.area}') """ import kwimage warp_final_from_img = self._warp_for_resolution(space=space, resolution=resolution) img_sseg = kwimage.MultiPolygon.coerce(ann['segmentation']) warped_sseg = img_sseg.warp(warp_final_from_img) return warped_sseg
[docs] def _annot_segmentations(self, anns, space='video', resolution=None): """" Load multiple annotation segmentations in a requested space at a target resolution. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> coco_img.img['resolution'] = '1 meter' >>> ann = coco_img.annots().objs[0] >>> img_sseg = coco_img._annot_segmentations([ann], space='image') >>> vid_sseg = coco_img._annot_segmentations([ann], space='video') >>> img_sseg_2m = coco_img._annot_segmentations([ann], space='image', resolution='2 meter') >>> vid_sseg_2m = coco_img._annot_segmentations([ann], space='video', resolution='2 meter') >>> print(f'img_sseg.area = {img_sseg[0].area}') >>> print(f'vid_sseg.area = {vid_sseg[0].area}') >>> print(f'img_sseg_2m.area = {img_sseg_2m[0].area}') >>> print(f'vid_sseg_2m.area = {vid_sseg_2m[0].area}') """ import kwimage warp_final_from_img = self._warp_for_resolution(space=space, resolution=resolution) warped_ssegs = [] for ann in anns: img_sseg = kwimage.MultiPolygon.coerce(ann['segmentation']) warped_sseg = img_sseg.warp(warp_final_from_img) warped_ssegs.append(warped_sseg) return warped_ssegs
[docs] def resolution(self, space='image', channel=None, RESOLUTION_KEY=None): """ Returns the resolution of this CocoImage in the requested space if known. Errors if this information is not registered. Args: space (str): the space to the resolution of. Can be either "image", "video", or "asset". channel (str | kwcoco.FusedChannelSpec | None): a channel that identifies a single asset, only relevant if asking for asset space Returns: Dict: has items mag (with the magnitude of the resolution) and unit, which is a convinience and only loosely enforced. Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> self.img['resolution'] = 1 >>> self.resolution() >>> self.img['resolution'] = '1 meter' >>> self.resolution(space='video') {'mag': (1.0, 1.0), 'unit': 'meter'} >>> self.resolution(space='asset', channel='B11') >>> self.resolution(space='asset', channel='B1') """ import kwimage # Compute the offset transform from the requested space # Handle the cases where resolution is specified at the image or at the # video level. if RESOLUTION_KEY is None: RESOLUTION_KEY = DEFAULT_RESOLUTION_KEYS def aliased_get(d, keys, default=None): if not ub.iterable(keys): return d.get(keys, default) else: found = 0 for key in keys: if key in d: found = 1 val = d[key] break if not found: val = default return val if space == 'video': vid_resolution_expr = aliased_get(self.video, RESOLUTION_KEY, None) if vid_resolution_expr is None: # Do we have an image level resolution? img_resolution_expr = aliased_get(self.img, RESOLUTION_KEY, None) assert img_resolution_expr is not None img_resolution_info = coerce_resolution(img_resolution_expr) img_resolution_mat = kwimage.Affine.scale(img_resolution_info['mag']) vid_resolution = (self.warp_vid_from_img @ img_resolution_mat.inv()).inv() vid_resolution_info = { 'mag': vid_resolution.decompose()['scale'], 'unit': img_resolution_info['unit'] } else: vid_resolution_info = coerce_resolution(vid_resolution_expr) space_resolution_info = vid_resolution_info elif space == 'image': img_resolution_expr = aliased_get(self.img, RESOLUTION_KEY, None) if img_resolution_expr is None: # Do we have an image level resolution? vid_resolution_expr = aliased_get(self.video, RESOLUTION_KEY, None) assert vid_resolution_expr is not None vid_resolution_info = coerce_resolution(vid_resolution_expr) vid_resolution_mat = kwimage.Affine.scale(vid_resolution_info['mag']) img_resolution = (self.warp_img_from_vid @ vid_resolution_mat.inv()).inv() img_resolution_info = { 'mag': img_resolution.decompose()['scale'], 'unit': vid_resolution_info['unit'] } else: img_resolution_info = coerce_resolution(img_resolution_expr) space_resolution_info = img_resolution_info elif space in {'asset', 'auxiliary'}: if channel is None: raise ValueError('must specify a channel to ask for the asset resolution') # Use existing code to get the resolution of the image (could be more efficient) space_resolution_info = self.resolution('image', RESOLUTION_KEY=RESOLUTION_KEY).copy() # Adjust the image resolution based on the asset scale factor warp_img_from_aux = kwimage.Affine.coerce(self.find_asset_obj(channel).get('warp_aux_to_img', None)) img_res_mat = kwimage.Affine.scale(space_resolution_info['mag']) aux_res_mat = img_res_mat @ warp_img_from_aux space_resolution_info['mag'] = np.array(aux_res_mat.decompose()['scale']) else: raise KeyError(space) return space_resolution_info
[docs] def _scalefactor_for_resolution(self, space, resolution, channel=None, RESOLUTION_KEY=None): """ Given image or video space, compute the scale factor needed to achieve the target resolution. # Use this to implement scale_resolution_from_img scale_resolution_from_vid Args: space (str): the space to the resolution of. Can be either "image", "video", or "asset". resolution (str | float | int): the resolution (ideally with units) you want. channel (str | kwcoco.FusedChannelSpec | None): a channel that identifies a single asset, only relevant if asking for asset space Returns: Tuple[float, float]: the x and y scale factor that can be used to scale the underlying "space" to acheive the requested resolution. Ignore: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> self.img['resolution'] = "3 puppies" >>> scale_factor = self._scalefactor_for_resolution(space='asset', channel='B11', resolution="7 puppies") >>> print('scale_factor = {}'.format(ub.urepr(scale_factor, precision=4, nl=0))) scale_factor = (1.2857, 1.2857) """ if resolution is None: return (1., 1.) space_resolution_info = self.resolution(space=space, channel=channel, RESOLUTION_KEY=RESOLUTION_KEY) request_resolution_info = coerce_resolution(resolution) # If units are unspecified, assume they are compatible if space_resolution_info['unit'] is not None: if request_resolution_info['unit'] is not None: assert space_resolution_info['unit'] == request_resolution_info['unit'] x1, y1 = request_resolution_info['mag'] x2, y2 = space_resolution_info['mag'] scale_factor = (x2 / x1, y2 / y1) return scale_factor
[docs] def _detections_for_resolution(coco_img, space='video', resolution=None, aids=None, RESOLUTION_KEY=None): """ This is slightly less than ideal in terms of API, but it will work for now. """ import kwimage assert space == 'video', 'other cases are not handled' # Build transform from image to requested space warp_vid_from_img = coco_img.warp_vid_from_img scale = coco_img._scalefactor_for_resolution(space='video', resolution=resolution, RESOLUTION_KEY=RESOLUTION_KEY) warp_req_from_vid = kwimage.Affine.scale(scale) warp_req_from_img = warp_req_from_vid @ warp_vid_from_img # Get annotations in "Image Space" if aids is None: annots = coco_img.dset.annots(image_id=coco_img.img['id']) else: # We already know the annots we want (assume the user # specified them correctly and they belong to this image). annots = coco_img.dset.annots(aids) imgspace_dets = annots.detections # Warp them into the requested space reqspace_dets = imgspace_dets.warp(warp_req_from_img) reqspace_dets.data['aids'] = np.array(list(annots)) return reqspace_dets
# Deprecated aliases add_auxiliary_item = deprecated_function_alias( 'kwcoco', 'add_auxiliary_item', deprecate='now', new_func=add_asset) delay = deprecated_function_alias( 'kwcoco', 'delay', new_func=imdelay, deprecate='now')
[docs] def show(self, **kwargs): """ Show the image with matplotlib if possible SeeAlso: :func:`kwcoco.CocoDataset.show_image` Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autoplt() >>> self.show() """ if self.dset is None: raise Exception('Currently requires a connected dataset. ' 'This may be relaxed in the future') return self.dset.show_image(self['id'], **kwargs)
[docs] def draw(self, **kwargs): """ Draw the image on an ndarray using opencv SeeAlso: :func:`kwcoco.CocoDataset.draw_image` Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> canvas = self.draw() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(canvas) """ if self.dset is None: raise Exception('Currently requires a connected dataset. ' 'This may be relaxed in the future') return self.dset.draw_image(self['id'], **kwargs)
[docs] class CocoAsset(_CocoObject): """ A Coco Asset / Auxiliary Item Represents one 2D image file relative to a parent img. Could be a single asset, or an image with sub-assets, but sub-assets are ignored here. Initially we called these "auxiliary" items, but I think we should change their name to "assets", which better maps with STAC terminology. Example: >>> from kwcoco.coco_image import * # NOQA >>> self = CocoAsset({'warp_aux_to_img': 'foo'}) >>> assert 'warp_aux_to_img' in self >>> assert 'warp_img_from_asset' in self >>> assert 'warp_wld_from_asset' not in self >>> assert 'warp_to_wld' not in self >>> self['warp_aux_to_img'] = 'bar' >>> assert self._proxy == {'warp_aux_to_img': 'bar'} """ # To maintain backwards compatibility we register aliases of properties # The main key should be the primary property. __alias_to_primary__ = { 'warp_img_from_asset': 'warp_aux_to_img', 'warp_wld_from_asset': 'warp_to_wld', } def __init__(self, asset, bundle_dpath=None): super().__init__(asset, bundle_dpath=bundle_dpath) def __nice__(self): return repr(self.__json__())
[docs] def image_filepath(self): if self._bundle_dpath is None: raise Exception('Bundle dpath must be populated to use this method') # return self['file_name'] else: return ub.Path(self._bundle_dpath) / self['file_name']
[docs] class CocoVideo(_CocoObject): """ Object representing a single video. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes1') >>> obj = dset.videos().objs[0] >>> self = CocoVideo(obj, dset) >>> print(f'self={self}') """ __alias_to_primary__ = {} def __nice__(self): return repr(self.__json__())
[docs] class CocoAnnotation(_CocoObject): """ Object representing a single annotation. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes1') >>> obj = dset.annots().objs[0] >>> self = CocoAnnotation(obj, dset) >>> print(f'self={self}') """ __alias_to_primary__ = {} def __nice__(self): return repr(self.__json__())
[docs] class CocoCategory(_CocoObject): """ Object representing a single category. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes1') >>> obj = dset.categories().objs[0] >>> self = CocoCategory(obj, dset) >>> print(f'self={self}') """ __alias_to_primary__ = {} def __nice__(self): return repr(self.__json__())
[docs] class CocoTrack(_CocoObject): """ Object representing a single track. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes1') >>> obj = dset.tracks().objs[0] >>> self = CocoTrack(obj, dset) >>> print(f'self={self}') """ __alias_to_primary__ = {} def __nice__(self): return repr(self.__json__())
[docs] def annots(self): assert self.dset is not None return self.dset.annots(track_id=self['id'])
[docs] def _delay_load_imglike(bundle_dpath, obj, nodata_method=None): # from os.path import join from kwcoco.channel_spec import FusedChannelSpec from delayed_image import DelayedLoad, DelayedIdentity info = {} fname = obj.get('file_name', None) imdata = obj.get('imdata', None) channels_ = obj.get('channels', None) if channels_ is not None: channels_ = FusedChannelSpec.coerce(channels_) channels_ = channels_.normalize() info['channels'] = channels_ width = obj.get('width', None) height = obj.get('height', None) if height is not None and width is not None: info['dsize'] = dsize = (width, height) else: info['dsize'] = dsize = (None, None) quantization = obj.get('quantization', None) if imdata is not None: info['chan_construct'] = (DelayedIdentity, dict( data=imdata, channels=channels_, dsize=dsize)) elif fname is not None: info['fpath'] = fpath = join(bundle_dpath, fname) info['chan_construct'] = ( DelayedLoad, dict(fpath=fpath, channels=channels_, dsize=dsize, nodata_method=nodata_method)) info['quantization'] = quantization return info
[docs] def parse_quantity(expr): import re expr_pat = re.compile( r'^(?P<magnitude>[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?)' '(?P<spaces> *)' '(?P<unit>.*)$') match = expr_pat.match(expr.strip()) if match is None: raise ValueError(f'Unable to parse {expr!r}') return match.groupdict()
[docs] def coerce_resolution(expr): if isinstance(expr, str): result = parse_quantity(expr) unit = result['unit'] x = y = float(result['magnitude']) else: x = y = float(expr) unit = None parsed = { 'mag': (x, y), 'unit': unit, } return parsed