Source code for kwcoco.coco_image

"""
Defines the CocoImage class which is an object oriented way of manipulating
data pointed to by a COCO image dictionary.

Notably this provides the ``.imdelay`` method for delayed image loading ( which
enables things like fast loading of subimage-regions / coarser scales in images
that contain tiles / overviews - e.g. Cloud Optimized Geotiffs or COGs (Medical
image formats may be supported in the future).
"""
import ubelt as ub
import os
import numpy as np
from os.path import join
from kwcoco.util.util_deprecate import deprecated_function_alias
from kwcoco.util.dict_proxy2 import AliasedDictProxy

try:
    from xdev import profile
except Exception:
    profile = ub.identity


__docstubs__ = """
from kwcoco.coco_objects1d import Annots
"""


DEFAULT_RESOLUTION_KEYS = {
    'resolution',
    'target_gsd',  # only exists as a convinience for other projects. Remove in the future.
}


[docs]class CocoImage(AliasedDictProxy, ub.NiceRepr): """ An object-oriented representation of a coco image. It provides helper methods that are specific to a single image. This operates directly on a single coco image dictionary, but it can optionally be connected to a parent dataset, which allows it to use CocoDataset methods to query about relationships and resolve pointers. This is different than the Images class in coco_object1d, which is just a vectorized interface to multiple objects. Example: >>> import kwcoco >>> dset1 = kwcoco.CocoDataset.demo('shapes8') >>> dset2 = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = kwcoco.CocoImage(dset1.imgs[1], dset1) >>> print('self = {!r}'.format(self)) >>> print('self.channels = {}'.format(ub.urepr(self.channels, nl=1))) >>> self = kwcoco.CocoImage(dset2.imgs[1], dset2) >>> print('self.channels = {}'.format(ub.urepr(self.channels, nl=1))) >>> self.primary_asset() >>> assert 'auxiliary' in self """ __alias_to_primary__ = { # In the future we will switch assets to be primary. 'assets': 'auxiliary', } def __init__(self, img, dset=None): self.img = img self._proxy = img self.dset = dset self._bundle_dpath = None self._video = None
[docs] @classmethod def from_gid(cls, dset, gid): img = dset.index.imgs[gid] self = cls(img, dset=dset) return self
@property def bundle_dpath(self): if self.dset is not None: return self.dset.bundle_dpath else: return self._bundle_dpath @bundle_dpath.setter def bundle_dpath(self, value): self._bundle_dpath = value @property def video(self): """ Helper to grab the video for this image if it exists """ if self._video is None and self.dset is not None: vidid = self.img.get('video_id', None) if vidid is None: video = None else: video = self.dset.index.videos[vidid] else: video = self._video return video @video.setter def video(self, value): # TODO: ducktype with an object self._video = value
[docs] def detach(self): """ Removes references to the underlying coco dataset, but keeps special information such that it wont be needed. """ self._bundle_dpath = self.bundle_dpath self._video = self.video self.dset = None return self
@property def assets(self): assets = [] for obj in self.iter_asset_objs(): # TODO: ducktype with an object asset = CocoAsset(obj, bundle_dpath=self.bundle_dpath) # asset = obj assets.append(asset) return assets @property def datetime(self): """ Try to get datetime information for this image. Not always possible. """ img = self.img if 'timestamp' in img: img['timestamp'] if 'date_captured' in img: img['date_captured'] raise NotImplementedError
[docs] def annots(self): """ Returns: Annots: a 1d annotations object referencing annotations in this image """ return self.dset.annots(image_id=self.img['id'])
def __nice__(self): """ Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> with ub.CaptureStdout() as cap: ... dset = kwcoco.CocoDataset.demo('shapes8') >>> self = CocoImage(dset.dataset['images'][0], dset) >>> print('self = {!r}'.format(self)) >>> dset = kwcoco.CocoDataset.demo() >>> self = CocoImage(dset.dataset['images'][0], dset) >>> print('self = {!r}'.format(self)) """ from kwcoco.util.util_truncate import smart_truncate from functools import partial stats = ub.udict(self.stats()) stats = stats.map_values(str) stats = stats.map_values( partial(smart_truncate, max_length=32, trunc_loc=0.5)) return ub.urepr(stats, compact=1, nl=0)
[docs] def stats(self): """ """ key_attrname = [ ('wh', 'dsize'), ('n_chan', 'num_channels'), ('channels', 'channels'), ] stats = {} for key, attrname in key_attrname: try: stats[key] = getattr(self, attrname) except Exception as ex: stats[key] = repr(ex) return stats
def __contains__(self, key): if '_unstructured' in self._proxy: if AliasedDictProxy.__contains__(self, key): return True return key in self._proxy['_unstructured'] else: return AliasedDictProxy.__contains__(self, key)
[docs] def get(self, key, default=ub.NoParam): try: return self[key] except KeyError: if default is ub.NoParam: raise else: return default
[docs] def keys(self): """ Proxy getter attribute for underlying `self.img` dictionary """ if '_unstructured' in self._proxy: # SQL compatibility _keys = ub.flatten([self._proxy.keys(), self._proxy['_unstructured'].keys()]) return iter((k for k in _keys if k != '_unstructured')) else: return self._proxy.keys()
def __getitem__(self, key): """ Proxy getter attribute for underlying `self.img` dictionary CommandLine: xdoctest -m kwcoco.coco_image CocoImage.__getitem__ Example: >>> import pytest >>> # without _unstructured populated >>> import kwcoco >>> self = kwcoco.CocoImage({'foo': 1}) >>> assert self.get('foo') == 1 >>> assert self.get('foo', None) == 1 >>> # with _unstructured populated >>> self = kwcoco.CocoImage({'_unstructured': {'foo': 1}}) >>> assert self.get('foo') == 1 >>> assert self.get('foo', None) == 1 >>> # without _unstructured empty >>> self = kwcoco.CocoImage({}) >>> print('----') >>> with pytest.raises(KeyError): >>> self.get('foo') >>> assert self.get('foo', None) is None >>> # with _unstructured empty >>> self = kwcoco.CocoImage({'_unstructured': {'bar': 1}}) >>> with pytest.raises(KeyError): >>> self.get('foo') >>> assert self.get('foo', None) is None """ if AliasedDictProxy.__contains__(self, key): return AliasedDictProxy.__getitem__(self, key) else: _img = self._proxy if '_unstructured' in _img: # Workaround for sql-view, treat items in "_unstructured" as # if they are in the top level image. _extra = _img['_unstructured'] if key in _extra: return _extra[key] else: raise KeyError(key) else: raise KeyError(key) @property def channels(self): from kwcoco.channel_spec import FusedChannelSpec from kwcoco.channel_spec import ChannelSpec img_parts = [] for obj in self.iter_asset_objs(): obj_parts = obj.get('channels', None) if obj_parts is not None: # obj_chan = FusedChannelSpec.coerce(obj_parts).normalize() obj_chan = FusedChannelSpec.coerce(obj_parts) img_parts.append(obj_chan.spec) if not img_parts: return None # return ChannelSpec.coerce('*') spec = ChannelSpec(','.join(img_parts)) return spec @property def num_channels(self): return self.channels.numel() # return sum(map(len, self.channels.streams())) @property def dsize(self): width = self.img.get('width', None) height = self.img.get('height', None) return width, height
[docs] def primary_image_filepath(self, requires=None): dpath = ub.Path(self.bundle_dpath) fname = self.primary_asset()['file_name'] fpath = dpath / fname return fpath
[docs] def primary_asset(self, requires=None): """ Compute a "main" image asset. Note: Uses a heuristic. * First, try to find the auxiliary image that has with the smallest distortion to the base image (if known via warp_aux_to_img) * Second, break ties by using the largest image if w / h is known * Last, if previous information not available use the first auxiliary image. Args: requires (List[str] | None): list of attribute that must be non-None to consider an object as the primary one. Returns: None | dict : the asset dict or None if it is not found TODO: - [ ] Add in primary heuristics Example: >>> import kwarray >>> from kwcoco.coco_image import * # NOQA >>> rng = kwarray.ensure_rng(0) >>> def random_auxiliary(name, w=None, h=None): >>> return {'file_name': name, 'width': w, 'height': h} >>> self = CocoImage({ >>> 'auxiliary': [ >>> random_auxiliary('1'), >>> random_auxiliary('2'), >>> random_auxiliary('3'), >>> ] >>> }) >>> assert self.primary_asset()['file_name'] == '1' >>> self = CocoImage({ >>> 'auxiliary': [ >>> random_auxiliary('1'), >>> random_auxiliary('2', 3, 3), >>> random_auxiliary('3'), >>> ] >>> }) >>> assert self.primary_asset()['file_name'] == '2' """ import kwimage if requires is None: requires = [] img = self.img has_base_image = img.get('file_name', None) is not None candidates = [] if has_base_image: obj = img if all(k in obj for k in requires): # Return the base image if we can return obj # Choose "best" auxiliary image based on a hueristic. eye = kwimage.Affine.eye().matrix asset_objs = img.get('auxiliary', img.get('assets', [])) or [] for idx, obj in enumerate(asset_objs): # Take frobenius norm to get "distance" between transform and # the identity. We want to find the auxiliary closest to the # identity transform. warp_aux_to_img = kwimage.Affine.coerce(obj.get('warp_aux_to_img', None)) fro_dist = np.linalg.norm(warp_aux_to_img - eye, ord='fro') w = obj.get('width', None) or 0 h = obj.get('height', None) or 0 if all(k in obj for k in requires): candidates.append({ 'idx': idx, 'area': w * h, 'fro_dist': fro_dist, 'obj': obj, }) if len(candidates) == 0: return None idx = ub.argmin( candidates, key=lambda val: ( val['fro_dist'], -val['area'], val['idx']) ) obj = candidates[idx]['obj'] return obj
[docs] def iter_image_filepaths(self, with_bundle=True): """ Could rename to iter_asset_filepaths Args: with_bundle (bool): If True, prepends the bundle dpath to fully specify the path. Otherwise, just returns the registered string in the file_name attribute of each asset. Defaults to True. Yields: ub.Path """ dpath = ub.Path(self.bundle_dpath) for obj in self.iter_asset_objs(): fname = obj.get('file_name') fpath = dpath / fname yield fpath
[docs] def iter_asset_objs(self): """ Iterate through base + auxiliary dicts that have file paths Yields: dict: an image or auxiliary dictionary """ img = self.img has_base_image = img.get('file_name', None) is not None if has_base_image: obj = img # cant remove auxiliary otherwise inplace modification doesnt work # obj = ub.dict_diff(img, {'auxiliary'}) yield obj for obj in img.get('auxiliary', None) or []: yield obj for obj in img.get('assets', None) or []: yield obj
[docs] def find_asset_obj(self, channels): """ Find the asset dictionary with the specified channels Example: >>> import kwcoco >>> coco_img = kwcoco.CocoImage({'width': 128, 'height': 128}) >>> coco_img.add_auxiliary_item( >>> 'rgb.png', channels='red|green|blue', width=32, height=32) >>> assert coco_img.find_asset_obj('red') is not None >>> assert coco_img.find_asset_obj('green') is not None >>> assert coco_img.find_asset_obj('blue') is not None >>> assert coco_img.find_asset_obj('red|blue') is not None >>> assert coco_img.find_asset_obj('red|green|blue') is not None >>> assert coco_img.find_asset_obj('red|green|blue') is not None >>> assert coco_img.find_asset_obj('black') is None >>> assert coco_img.find_asset_obj('r') is None Example: >>> # Test with concise channel code >>> import kwcoco >>> coco_img = kwcoco.CocoImage({'width': 128, 'height': 128}) >>> coco_img.add_auxiliary_item( >>> 'msi.png', channels='foo.0:128', width=32, height=32) >>> assert coco_img.find_asset_obj('foo') is None >>> assert coco_img.find_asset_obj('foo.3') is not None >>> assert coco_img.find_asset_obj('foo.3:5') is not None >>> assert coco_img.find_asset_obj('foo.3000') is None """ from kwcoco.channel_spec import FusedChannelSpec channels = FusedChannelSpec.coerce(channels) found = None for obj in self.iter_asset_objs(): obj_channels = FusedChannelSpec.coerce(obj['channels']) if (obj_channels & channels).numel(): found = obj break return found
[docs] def _assets_key(self): """ Internal helper for transition from auxiliary -> assets in the image spec """ if 'auxiliary' in self: return 'auxiliary' elif 'assets' in self: return 'assets' else: return 'auxiliary'
[docs] def add_annotation(self, **ann): """ Adds an annotation to this image. This is a convinience method, and requires that this CocoImage is still connected to a parent dataset. Args: **ann: annotation attributes (e.g. bbox, category_id) Returns: int: the new annotation id SeeAlso: :func:`kwcoco.CocoDataset.add_annotation` """ if self.dset is None: raise RuntimeError( 'Can only add an annotation through a CocoImage ' 'if it is connected to its parent CocoDataset') return self.dset.add_annotation(image_id=self.img['id'], **ann)
[docs] def add_asset(self, file_name=None, channels=None, imdata=None, warp_aux_to_img=None, width=None, height=None, imwrite=False, image_id=None, **kw): """ Adds an auxiliary / asset item to the image dictionary. This operation can be done purely in-memory (the default), or the image data can be written to a file on disk (via the imwrite=True flag). Args: file_name (str | PathLike | None): The name of the file relative to the bundle directory. If unspecified, imdata must be given. channels (str | kwcoco.FusedChannelSpec | None): The channel code indicating what each of the bands represents. These channels should be disjoint wrt to the existing data in this image (this is not checked). imdata (ndarray | None): The underlying image data this auxiliary item represents. If unspecified, it is assumed file_name points to a path on disk that will eventually exist. If imdata, file_name, and the special imwrite=True flag are specified, this function will write the data to disk. warp_aux_to_img (kwimage.Affine | None): The transformation from this auxiliary space to image space. If unspecified, assumes this item is related to image space by only a scale factor. width (int | None): Width of the data in auxiliary space (inferred if unspecified) height (int | None): Height of the data in auxiliary space (inferred if unspecified) imwrite (bool): If specified, both imdata and file_name must be specified, and this will write the data to disk. Note: it it recommended that you simply call imwrite yourself before or after calling this function. This lets you better control imwrite parameters. image_id (int | None): An asset dictionary contains an image-id, but it should *not* be specified here. If it is, then it *must* agree with this image's id. **kw : stores arbitrary key/value pairs in this new asset. TODO: - [ ] Allow imwrite to specify an executor that is used to return a Future so the imwrite call does not block. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> coco_img = dset.coco_image(1) >>> imdata = np.random.rand(32, 32, 5) >>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5') >>> coco_img.add_asset(imdata=imdata, channels=channels) Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset() >>> gid = dset.add_image(name='my_image_name', width=200, height=200) >>> coco_img = dset.coco_image(gid) >>> coco_img.add_asset('path/img1_B0.tif', channels='B0', width=200, height=200) >>> coco_img.add_asset('path/img1_B1.tif', channels='B1', width=200, height=200) >>> coco_img.add_asset('path/img1_B2.tif', channels='B2', width=200, height=200) >>> coco_img.add_asset('path/img1_TCI.tif', channels='r|g|b', width=200, height=200) """ from os.path import isabs, join # NOQA import kwimage from kwcoco.channel_spec import FusedChannelSpec img = self.img if imdata is None and file_name is None: raise ValueError('must specify file_name or imdata') # Check type of resolution inputs. if not isinstance(width, int) and width is not None: raise TypeError(f'"width" input is neither an int or None variable but type: "{type(width)}"') if not isinstance(height, int) and height is not None: raise TypeError(f'"height" input is neither an int or None variable but type: "{type(height)}"') # Infer resolution inputs from image data. if width is None and imdata is not None: width = imdata.shape[1] if height is None and imdata is not None: height = imdata.shape[0] if warp_aux_to_img is None: img_width = img.get('width', None) img_height = img.get('height', None) if img_width is None or img_height is None: raise ValueError('Parent image canvas has an unknown size. ' 'Need to set width/height') if width is None or height is None: raise ValueError('Unable to infer warp_aux_to_img without width') # Assume we can just scale up the auxiliary data to match the image # space unless the user says otherwise warp_aux_to_img = kwimage.Affine.scale(( img_width / width, img_height / height)) else: warp_aux_to_img = kwimage.Affine.coerce(warp_aux_to_img) # Normalize for json serializability if channels is not None: channels = FusedChannelSpec.coerce(channels).spec if file_name is not None: file_name = os.fspath(file_name) # Make the asset info dict parent_image_id = img.get('id', None) if image_id is not None: if parent_image_id is not None: assert image_id == parent_image_id, ( f'The specified image_id ({image_id}) did not match the ' f'parent image id ({parent_image_id}) proprty.' ) else: image_id = parent_image_id obj = { 'image_id': image_id, # for when assets move to their own table 'file_name': file_name, 'height': height, 'width': width, 'channels': channels, 'warp_aux_to_img': warp_aux_to_img.concise(), } if imdata is not None: if imwrite: if __debug__ and file_name is None: raise ValueError( 'file_name must be given if imwrite is True') # if self.dset is None: # fpath = file_name # if not isabs(fpath): # raise ValueError(ub.paragraph( # ''' # Got relative file_name, but no dataset is attached # to this coco image. Attatch a dataset or use an # absolute path. # ''')) # else: fpath = join(self.bundle_dpath, file_name) kwimage.imwrite(fpath, imdata) else: obj['imdata'] = imdata obj.update(**kw) assets_key = self._assets_key() asset_list = img.get(assets_key, None) if asset_list is None: asset_list = img[assets_key] = [] asset_list.append(obj) if self.dset is not None: self.dset._invalidate_hashid()
[docs] @profile def imdelay(self, channels=None, space='image', resolution=None, bundle_dpath=None, interpolation='linear', antialias=True, nodata_method=None, RESOLUTION_KEY=None): """ Perform a delayed load on the data in this image. The delayed load can load a subset of channels, and perform lazy warping operations. If the underlying data is in a tiled format this can reduce the amount of disk IO needed to read the data if only a small crop or lower resolution view of the data is needed. Note: This method is experimental and relies on the delayed load proof-of-concept. Args: gid (int): image id to load channels (kwcoco.FusedChannelSpec): specific channels to load. if unspecified, all channels are loaded. space (str): can either be "image" for loading in image space, or "video" for loading in video space. resolution (None | str | float): If specified, applies an additional scale factor to the result such that the data is loaded at this specified resolution. This requires that the image / video has a registered resolution attribute and that its units agree with this request. TODO: - [ ] This function could stand to have a better name. Maybe imread with a delayed=True flag? Or maybe just delayed_load? Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> gid = 1 >>> # >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = CocoImage(dset.imgs[gid], dset) >>> delayed = self.imdelay() >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> # >>> dset = kwcoco.CocoDataset.demo('shapes8') >>> delayed = dset.coco_image(gid).imdelay() >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> crop = delayed.crop((slice(0, 3), slice(0, 3))) >>> crop.finalize() >>> # TODO: should only select the "red" channel >>> dset = kwcoco.CocoDataset.demo('shapes8') >>> delayed = CocoImage(dset.imgs[gid], dset).imdelay(channels='r') >>> import kwcoco >>> gid = 1 >>> # >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> delayed = dset.coco_image(gid).imdelay(channels='B1|B2', space='image') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).imdelay(channels='B1|B2|B11', space='image') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).imdelay(channels='B8|B1', space='video') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).imdelay(channels='B8|foo|bar|B1', space='video') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo() >>> coco_img = dset.coco_image(1) >>> # Test case where nothing is registered in the dataset >>> delayed = coco_img.imdelay() >>> final = delayed.finalize() >>> assert final.shape == (512, 512, 3) >>> delayed = coco_img.imdelay() >>> final = delayed.finalize() >>> print('final.shape = {}'.format(ub.urepr(final.shape, nl=1))) >>> assert final.shape == (512, 512, 3) Example: >>> # Test that delay works when imdata is stored in the image >>> # dictionary itself. >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> coco_img = dset.coco_image(1) >>> imdata = np.random.rand(6, 6, 5) >>> imdata[:] = np.arange(5)[None, None, :] >>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5') >>> coco_img.add_auxiliary_item(imdata=imdata, channels=channels) >>> delayed = coco_img.imdelay(channels='B1|Aux:2:4') >>> final = delayed.finalize() Example: >>> # Test delay when loading in asset space >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> stream1 = coco_img.channels.streams()[0] >>> stream2 = coco_img.channels.streams()[1] >>> asset_delayed = coco_img.imdelay(stream1, space='asset') >>> img_delayed = coco_img.imdelay(stream1, space='image') >>> vid_delayed = coco_img.imdelay(stream1, space='video') >>> # >>> aux_imdata = asset_delayed.as_xarray().finalize() >>> img_imdata = img_delayed.as_xarray().finalize() >>> assert aux_imdata.shape != img_imdata.shape >>> # Cannot load multiple asset items at the same time in >>> # asset space >>> import pytest >>> fused_channels = stream1 | stream2 >>> from delayed_image.delayed_nodes import CoordinateCompatibilityError >>> with pytest.raises(CoordinateCompatibilityError): >>> aux_delayed2 = coco_img.imdelay(fused_channels, space='asset') Example: >>> # Test loading at a specific resolution. >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> coco_img.img['resolution'] = '1 meter' >>> img_delayed1 = coco_img.imdelay(space='image') >>> vid_delayed1 = coco_img.imdelay(space='video') >>> # test with unitless request >>> img_delayed2 = coco_img.imdelay(space='image', resolution=3.1) >>> vid_delayed2 = coco_img.imdelay(space='video', resolution='3.1 meter') >>> np.ceil(img_delayed1.shape[0] / 3.1) == img_delayed2.shape[0] >>> np.ceil(vid_delayed1.shape[0] / 3.1) == vid_delayed2.shape[0] >>> # test with unitless data >>> coco_img.img['resolution'] = 1 >>> img_delayed2 = coco_img.imdelay(space='image', resolution=3.1) >>> vid_delayed2 = coco_img.imdelay(space='video', resolution='3.1 meter') >>> np.ceil(img_delayed1.shape[0] / 3.1) == img_delayed2.shape[0] >>> np.ceil(vid_delayed1.shape[0] / 3.1) == vid_delayed2.shape[0] """ from kwimage.transform import Affine from kwcoco.channel_spec import FusedChannelSpec if bundle_dpath is None: bundle_dpath = self.bundle_dpath img = self.img requested = channels if requested is not None: requested = FusedChannelSpec.coerce(requested).normalize() # Get info about the primary image and check if its channels are # requested (if it even has any) img_info = _delay_load_imglike(bundle_dpath, img, nodata_method=nodata_method) obj_info_list = [(img_info, img)] asset_list = img.get('auxiliary', img.get('assets', [])) or [] for asset in asset_list: info = _delay_load_imglike(bundle_dpath, asset, nodata_method=nodata_method) obj_info_list.append((info, asset)) chan_list = [] for info, obj in obj_info_list: if info.get('chan_construct', None) is not None: include_flag = requested is None if not include_flag: if requested.intersection(info['channels']): include_flag = True if include_flag: chncls, chnkw = info['chan_construct'] chan = chncls(**chnkw) quant = info.get('quantization', None) if quant is not None: chan = chan.dequantize(quant) if space not in {'auxiliary', 'asset'}: aux_to_img = Affine.coerce(obj.get('warp_aux_to_img', None)) chan = chan.warp( aux_to_img, dsize=img_info['dsize']) chan_list.append(chan) if space == 'video': video = self.video width = video.get('width', img.get('width', None)) height = video.get('height', img.get('height', None)) elif space in {'asset', 'auxiliary'}: if len(chan_list) == 0: width = img.get('width', None) height = img.get('height', None) else: # TODO: should check these are all in the same space width, height = chan_list[0].dsize elif space == 'image': width = img.get('width', None) height = img.get('height', None) else: raise KeyError(space) dsize = (width, height) if len(chan_list) == 0: if requested is not None: # Handle case where the image doesnt have the requested # channels. from delayed_image import DelayedNans from delayed_image import DelayedChannelConcat delayed = DelayedNans(dsize=dsize, channels=requested) delayed = DelayedChannelConcat([delayed]) else: raise ValueError('no data registered in kwcoco image') else: from delayed_image import DelayedChannelConcat delayed = DelayedChannelConcat(chan_list) # Reorder channels in the requested order if requested is not None: delayed = delayed.take_channels(requested) if hasattr(delayed, 'components'): if len(delayed.components) == 1: delayed = delayed.components[0] if space in {'image', 'auxiliary', 'asset'}: pass elif space == 'video': img_to_vid = self.warp_vid_from_img delayed = delayed.warp(img_to_vid, dsize=dsize, interpolation=interpolation, antialias=antialias) else: raise KeyError('space = {}'.format(space)) if resolution is not None: # Adjust to the requested resolution factor = self._scalefactor_for_resolution( space=space, resolution=resolution, RESOLUTION_KEY=RESOLUTION_KEY) delayed = delayed.scale( factor, antialias=antialias, interpolation=interpolation) return delayed
[docs] @ub.memoize_method def valid_region(self, space='image'): """ If this image has a valid polygon, return it in image, or video space Returns: None | kwimage.MultiPolygon """ import kwimage valid_coco_poly = self.img.get('valid_region', None) if valid_coco_poly is None: valid_poly = None else: kw_poly_img = kwimage.MultiPolygon.coerce(valid_coco_poly) if kw_poly_img is None: valid_poly = None else: if space == 'image': valid_poly = kw_poly_img elif space == 'video': warp_vid_from_img = self.warp_vid_from_img valid_poly = kw_poly_img.warp(warp_vid_from_img) else: # To warp it into an auxiliary space we need to know which one raise NotImplementedError(space) return valid_poly
@ub.memoize_property def warp_vid_from_img(self): """ Affine transformation that warps image space -> video space. Returns: kwimage.Affine: The transformation matrix """ import kwimage warp_img_to_vid = kwimage.Affine.coerce(self.img.get('warp_img_to_vid', None)) if warp_img_to_vid.matrix is None: # Hack to ensure the matrix property always is an array warp_img_to_vid.matrix = np.asarray(warp_img_to_vid) return warp_img_to_vid @ub.memoize_property def warp_img_from_vid(self): """ Affine transformation that warps video space -> image space. Returns: kwimage.Affine: The transformation matrix """ return self.warp_vid_from_img.inv()
[docs] def _annot_segmentation(self, ann, space='video'): import kwimage warp_vid_from_img = self.warp_vid_from_img img_sseg = kwimage.MultiPolygon.coerce(ann['segmentation']) if space == 'image': warped_sseg = img_sseg pass elif space == 'video': vid_sseg = img_sseg.warp(warp_vid_from_img) warped_sseg = vid_sseg else: raise NotImplementedError(space) # auxiliary/asset space return warped_sseg
[docs] def resolution(self, space='image', channel=None, RESOLUTION_KEY=None): """ Returns the resolution of this CocoImage in the requested space if known. Errors if this information is not registered. Args: space (str): the space to the resolution of. Can be either "image", "video", or "asset". channel (str | kwcoco.FusedChannelSpec | None): a channel that identifies a single asset, only relevant if asking for asset space Returns: Dict: has items mag (with the magnitude of the resolution) and unit, which is a convinience and only loosely enforced. Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> self.img['resolution'] = 1 >>> self.resolution() >>> self.img['resolution'] = '1 meter' >>> self.resolution(space='video') {'mag': (1.0, 1.0), 'unit': 'meter'} >>> self.resolution(space='asset', channel='B11') >>> self.resolution(space='asset', channel='B1') """ import kwimage # Compute the offset transform from the requested space # Handle the cases where resolution is specified at the image or at the # video level. if RESOLUTION_KEY is None: RESOLUTION_KEY = DEFAULT_RESOLUTION_KEYS def aliased_get(d, keys, default=None): if not ub.iterable(keys): return d.get(keys, default) else: found = 0 for key in keys: if key in d: found = 1 val = d[key] break if not found: val = default return val if space == 'video': vid_resolution_expr = aliased_get(self.video, RESOLUTION_KEY, None) if vid_resolution_expr is None: # Do we have an image level resolution? img_resolution_expr = aliased_get(self.img, RESOLUTION_KEY, None) assert img_resolution_expr is not None img_resolution_info = coerce_resolution(img_resolution_expr) img_resolution_mat = kwimage.Affine.scale(img_resolution_info['mag']) vid_resolution = (self.warp_vid_from_img @ img_resolution_mat.inv()).inv() vid_resolution_info = { 'mag': vid_resolution.decompose()['scale'], 'unit': img_resolution_info['unit'] } else: vid_resolution_info = coerce_resolution(vid_resolution_expr) space_resolution_info = vid_resolution_info elif space == 'image': img_resolution_expr = aliased_get(self.img, RESOLUTION_KEY, None) if img_resolution_expr is None: # Do we have an image level resolution? vid_resolution_expr = aliased_get(self.video, RESOLUTION_KEY, None) assert vid_resolution_expr is not None vid_resolution_info = coerce_resolution(vid_resolution_expr) vid_resolution_mat = kwimage.Affine.scale(vid_resolution_info['mag']) img_resolution = (self.warp_img_from_vid @ vid_resolution_mat.inv()).inv() img_resolution_info = { 'mag': img_resolution.decompose()['scale'], 'unit': vid_resolution_info['unit'] } else: img_resolution_info = coerce_resolution(img_resolution_expr) space_resolution_info = img_resolution_info elif space in {'asset', 'auxiliary'}: if channel is None: raise ValueError('must specify a channel to ask for the asset resolution') # Use existing code to get the resolution of the image (could be more efficient) space_resolution_info = self.resolution('image', RESOLUTION_KEY=RESOLUTION_KEY).copy() # Adjust the image resolution based on the asset scale factor warp_img_from_aux = kwimage.Affine.coerce(self.find_asset_obj(channel).get('warp_aux_to_img', None)) img_res_mat = kwimage.Affine.scale(space_resolution_info['mag']) aux_res_mat = img_res_mat @ warp_img_from_aux space_resolution_info['mag'] = np.array(aux_res_mat.decompose()['scale']) else: raise KeyError(space) return space_resolution_info
[docs] def _scalefactor_for_resolution(self, space, resolution, channel=None, RESOLUTION_KEY=None): """ Given image or video space, compute the scale factor needed to achieve the target resolution. # Use this to implement scale_resolution_from_img scale_resolution_from_vid Args: space (str): the space to the resolution of. Can be either "image", "video", or "asset". resolution (str | float | int): the resolution (ideally with units) you want. channel (str | kwcoco.FusedChannelSpec | None): a channel that identifies a single asset, only relevant if asking for asset space Returns: Tuple[float, float]: the x and y scale factor that can be used to scale the underlying "space" to acheive the requested resolution. Ignore: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> self.img['resolution'] = "3 puppies" >>> scale_factor = self._scalefactor_for_resolution(space='asset', channel='B11', resolution="7 puppies") >>> print('scale_factor = {}'.format(ub.urepr(scale_factor, precision=4, nl=0))) scale_factor = (1.2857, 1.2857) """ if resolution is None: return (1., 1.) space_resolution_info = self.resolution(space=space, channel=channel, RESOLUTION_KEY=RESOLUTION_KEY) request_resolution_info = coerce_resolution(resolution) # If units are unspecified, assume they are compatible if space_resolution_info['unit'] is not None: if request_resolution_info['unit'] is not None: assert space_resolution_info['unit'] == request_resolution_info['unit'] x1, y1 = request_resolution_info['mag'] x2, y2 = space_resolution_info['mag'] scale_factor = (x2 / x1, y2 / y1) return scale_factor
[docs] def _detections_for_resolution(coco_img, space='video', resolution=None, RESOLUTION_KEY=None): """ This is slightly less than ideal in terms of API, but it will work for now. """ import kwimage # Build transform from image to requested space warp_vid_from_img = coco_img.warp_vid_from_img scale = coco_img._scalefactor_for_resolution(space='video', resolution=resolution, RESOLUTION_KEY=RESOLUTION_KEY) warp_req_from_vid = kwimage.Affine.scale(scale) warp_req_from_img = warp_req_from_vid @ warp_vid_from_img # Get annotations in "Image Space" annots = coco_img.dset.annots(image_id=coco_img.img['id']) imgspace_dets = annots.detections # Warp them into the requested space reqspace_dets = imgspace_dets.warp(warp_req_from_img) reqspace_dets.data['aids'] = np.array(list(annots)) return reqspace_dets
# Deprecated aliases add_auxiliary_item = deprecated_function_alias( 'kwcoco', 'add_auxiliary_item', deprecate='now', new_func=add_asset) delay = deprecated_function_alias( 'kwcoco', 'delay', new_func=imdelay, deprecate='now')
[docs] def show(self, **kwargs): """ Show the image with matplotlib if possible SeeAlso: :func:`kwcoco.CocoDataset.show_image` Example: >>> # xdoctest: +REQUIRES(module:kwplot) >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autoplt() >>> self.show() """ if self.dset is None: raise Exception('Currently requires a connected dataset. ' 'This may be relaxed in the future') return self.dset.show_image(self['id'], **kwargs)
[docs] def draw(self, **kwargs): """ Draw the image on an ndarray using opencv SeeAlso: :func:`kwcoco.CocoDataset.draw_image` Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> canvas = self.draw() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(canvas) """ if self.dset is None: raise Exception('Currently requires a connected dataset. ' 'This may be relaxed in the future') return self.dset.draw_image(self['id'], **kwargs)
[docs]class CocoAsset(AliasedDictProxy, ub.NiceRepr): """ A Coco Asset / Auxiliary Item Represents one 2D image file relative to a parent img. Could be a single asset, or an image with sub-assets, but sub-assets are ignored here. Initially we called these "auxiliary" items, but I think we should change their name to "assets", which better maps with STAC terminology. Example: >>> from kwcoco.coco_image import * # NOQA >>> self = CocoAsset({'warp_aux_to_img': 'foo'}) >>> assert 'warp_aux_to_img' in self >>> assert 'warp_img_from_asset' in self >>> assert 'warp_wld_from_asset' not in self >>> assert 'warp_to_wld' not in self >>> self['warp_aux_to_img'] = 'bar' >>> assert self._proxy == {'warp_aux_to_img': 'bar'} """ # To maintain backwards compatibility we register aliases of properties # The main key should be the primary property. __alias_to_primary__ = { 'warp_img_from_asset': 'warp_aux_to_img', 'warp_wld_from_asset': 'warp_to_wld', } def __init__(self, asset, bundle_dpath=None): self._proxy = asset self._bundle_dpath = bundle_dpath def __nice__(self): return repr(self.__json__())
[docs] def image_filepath(self): if self._bundle_dpath is None: raise Exception('Bundle dpath must be populated to use this method') # return self['file_name'] else: return ub.Path(self._bundle_dpath) / self['file_name'] ...
# TODO? # class _CocoObject(AliasedDictProxy, ub.NiceRepr): # """ # TODO: general coco scalar object # """ # __alias_to_primary__ = {} # def __init__(self, obj, /, dset=None): # self._proxy = obj # self.dset = dset # self._bundle_dpath = None # @property # def bundle_dpath(self): # if self.dset is not None: # return self.dset.bundle_dpath # else: # return self._bundle_dpath # @bundle_dpath.setter # def bundle_dpath(self, value): # self._bundle_dpath = value # def detach(self): # """ # Removes references to the underlying coco dataset, but keeps special # information such that it wont be needed. # """ # self._bundle_dpath = self.bundle_dpath # self.dset = None # return self # class CocoVideo(_CocoObject): # """ # TODO: general coco scalars # """ # __alias_to_primary__ = {} # class CocoAnnotation(_CocoObject): # """ # TODO: general coco scalars # """ # __alias_to_primary__ = {} # class CocoCategory(_CocoObject): # """ # TODO: general coco scalars # """ # __alias_to_primary__ = {} # class CocoTrack(_CocoObject): # """ # TODO: general coco scalars # """ # __alias_to_primary__ = {}
[docs]def _delay_load_imglike(bundle_dpath, obj, nodata_method=None): # from os.path import join from kwcoco.channel_spec import FusedChannelSpec from delayed_image import DelayedLoad, DelayedIdentity info = {} fname = obj.get('file_name', None) imdata = obj.get('imdata', None) channels_ = obj.get('channels', None) if channels_ is not None: channels_ = FusedChannelSpec.coerce(channels_) channels_ = channels_.normalize() info['channels'] = channels_ width = obj.get('width', None) height = obj.get('height', None) if height is not None and width is not None: info['dsize'] = dsize = (width, height) else: info['dsize'] = dsize = (None, None) quantization = obj.get('quantization', None) if imdata is not None: info['chan_construct'] = (DelayedIdentity, dict( data=imdata, channels=channels_, dsize=dsize)) elif fname is not None: info['fpath'] = fpath = join(bundle_dpath, fname) info['chan_construct'] = ( DelayedLoad, dict(fpath=fpath, channels=channels_, dsize=dsize, nodata_method=nodata_method)) info['quantization'] = quantization return info
[docs]def parse_quantity(expr): import re expr_pat = re.compile( r'^(?P<magnitude>[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?)' '(?P<spaces> *)' '(?P<unit>.*)$') match = expr_pat.match(expr.strip()) if match is None: raise ValueError(f'Unable to parse {expr!r}') return match.groupdict()
[docs]def coerce_resolution(expr): if isinstance(expr, str): result = parse_quantity(expr) unit = result['unit'] x = y = float(result['magnitude']) else: x = y = float(expr) unit = None parsed = { 'mag': (x, y), 'unit': unit, } return parsed