Source code for kwcoco.coco_image

"""
Defines the CocoImage class which is an object oriented way of manipulating
data pointed to by a COCO image dictionary.

Notably this provides the ``.delay`` method for delayed image loading ( which
enables things like fast loading of subimage-regions / coarser scales in images
that contain tiles / overviews - e.g. Cloud Optimized Geotiffs or COGs (Medical
image formats may be supported in the future).
"""
import ubelt as ub
import numpy as np
from os.path import join
# from kwcoco.util.dict_like import DictLike

try:
    from xdev import profile
except Exception:
    profile = ub.identity


DEFAULT_RESOLUTION_KEYS = {
    'resolution',
    'target_gsd',  # only exists as a convinience for other projects. Remove in the future.
}


[docs]class CocoImage(ub.NiceRepr): """ An object-oriented representation of a coco image. It provides helper methods that are specific to a single image. This operates directly on a single coco image dictionary, but it can optionally be connected to a parent dataset, which allows it to use CocoDataset methods to query about relationships and resolve pointers. This is different than the Images class in coco_object1d, which is just a vectorized interface to multiple objects. Example: >>> import kwcoco >>> dset1 = kwcoco.CocoDataset.demo('shapes8') >>> dset2 = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = CocoImage(dset1.imgs[1], dset1) >>> print('self = {!r}'.format(self)) >>> print('self.channels = {}'.format(ub.repr2(self.channels, nl=1))) >>> self = CocoImage(dset2.imgs[1], dset2) >>> print('self.channels = {}'.format(ub.repr2(self.channels, nl=1))) >>> self.primary_asset() """ def __init__(self, img, dset=None): self.img = img self.dset = dset self._bundle_dpath = None self._video = None
[docs] @classmethod def from_gid(cls, dset, gid): img = dset.index.imgs[gid] self = cls(img, dset=dset) return self
@property def bundle_dpath(self): if self.dset is not None: return self.dset.bundle_dpath else: return self._bundle_dpath @bundle_dpath.setter def bundle_dpath(self, value): self._bundle_dpath = value @property def video(self): """ Helper to grab the video for this image if it exists """ if self._video is None and self.dset is not None: vidid = self.img.get('video_id', None) if vidid is None: video = None else: video = self.dset.index.videos[vidid] else: video = self._video return video @video.setter def video(self, value): # TODO: ducktype with an object self._video = value
[docs] def detach(self): """ Removes references to the underlying coco dataset, but keeps special information such that it wont be needed. """ self._bundle_dpath = self.bundle_dpath self._video = self.video self.dset = None return self
@property def assets(self): assets = [] for obj in self.iter_asset_objs(): # TODO: ducktype with an object # asset = CocoAsset(obj) asset = obj assets.append(asset) return assets def __nice__(self): """ Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> with ub.CaptureStdout() as cap: ... dset = kwcoco.CocoDataset.demo('shapes8') >>> self = CocoImage(dset.dataset['images'][0], dset) >>> print('self = {!r}'.format(self)) >>> dset = kwcoco.CocoDataset.demo() >>> self = CocoImage(dset.dataset['images'][0], dset) >>> print('self = {!r}'.format(self)) """ from kwcoco.util.util_truncate import smart_truncate from functools import partial stats = self.stats() stats = ub.map_vals(str, stats) stats = ub.map_vals( partial(smart_truncate, max_length=32, trunc_loc=0.5), stats) return ub.repr2(stats, compact=1, nl=0, sort=0)
[docs] def stats(self): """ """ key_attrname = [ ('wh', 'dsize'), ('n_chan', 'num_channels'), ('channels', 'channels'), ] stats = {} for key, attrname in key_attrname: try: stats[key] = getattr(self, attrname) except Exception as ex: stats[key] = repr(ex) return stats
def __contains__(self, key): return key in self.keys() def __getitem__(self, key): """ Proxy getter attribute for underlying `self.img` dictionary """ return self.get(key)
[docs] def keys(self): """ Proxy getter attribute for underlying `self.img` dictionary """ if 'extra' in self.img: # SQL compatibility _keys = ub.flatten([self.img.keys(), self.img['extra'].keys()]) return iter((k for k in _keys if k != 'extra')) else: return self.img.keys()
[docs] def get(self, key, default=ub.NoParam): """ Proxy getter attribute for underlying `self.img` dictionary Example: >>> import pytest >>> # without extra populated >>> import kwcoco >>> self = kwcoco.CocoImage({'foo': 1}) >>> assert self.get('foo') == 1 >>> assert self.get('foo', None) == 1 >>> # with extra populated >>> self = kwcoco.CocoImage({'extra': {'foo': 1}}) >>> assert self.get('foo') == 1 >>> assert self.get('foo', None) == 1 >>> # without extra empty >>> self = kwcoco.CocoImage({}) >>> with pytest.raises(KeyError): >>> self.get('foo') >>> assert self.get('foo', None) is None >>> # with extra empty >>> self = kwcoco.CocoImage({'extra': {'bar': 1}}) >>> with pytest.raises(KeyError): >>> self.get('foo') >>> assert self.get('foo', None) is None """ _img = self.img if default is ub.NoParam: if 'extra' in _img: # Workaround for sql-view if key in _img: return _img[key] else: _extra = _img['extra'] if key in _extra: return _extra[key] else: raise KeyError(key) else: return _img[key] else: if 'extra' in _img: # Workaround for sql-view if key in _img: return _img.get(key, default) else: return _img['extra'].get(key, default) else: return _img.get(key, default)
@property def channels(self): from kwcoco.channel_spec import FusedChannelSpec from kwcoco.channel_spec import ChannelSpec img_parts = [] for obj in self.iter_asset_objs(): obj_parts = obj.get('channels', None) # obj_chan = FusedChannelSpec.coerce(obj_parts).normalize() obj_chan = FusedChannelSpec.coerce(obj_parts) img_parts.append(obj_chan.spec) spec = ChannelSpec(','.join(img_parts)) return spec @property def num_channels(self): return self.channels.numel() # return sum(map(len, self.channels.streams())) @property def dsize(self): width = self.img.get('width', None) height = self.img.get('height', None) return width, height
[docs] def primary_image_filepath(self, requires=None): dpath = self.bundle_dpath fname = self.primary_asset()['file_name'] fpath = join(dpath, fname) return fpath
[docs] def primary_asset(self, requires=None): """ Compute a "main" image asset. Notes: Uses a heuristic. * First, try to find the auxiliary image that has with the smallest distortion to the base image (if known via warp_aux_to_img) * Second, break ties by using the largest image if w / h is known * Last, if previous information not available use the first auxiliary image. Args: requires (List[str] | None): list of attribute that must be non-None to consider an object as the primary one. Returns: None | dict : the asset dict or None if it is not found TODO: - [ ] Add in primary heuristics Example: >>> import kwarray >>> from kwcoco.coco_image import * # NOQA >>> rng = kwarray.ensure_rng(0) >>> def random_auxiliary(name, w=None, h=None): >>> return {'file_name': name, 'width': w, 'height': h} >>> self = CocoImage({ >>> 'auxiliary': [ >>> random_auxiliary('1'), >>> random_auxiliary('2'), >>> random_auxiliary('3'), >>> ] >>> }) >>> assert self.primary_asset()['file_name'] == '1' >>> self = CocoImage({ >>> 'auxiliary': [ >>> random_auxiliary('1'), >>> random_auxiliary('2', 3, 3), >>> random_auxiliary('3'), >>> ] >>> }) >>> assert self.primary_asset()['file_name'] == '2' """ import kwimage if requires is None: requires = [] img = self.img has_base_image = img.get('file_name', None) is not None candidates = [] if has_base_image: obj = img if all(k in obj for k in requires): # Return the base image if we can return obj # Choose "best" auxiliary image based on a hueristic. eye = kwimage.Affine.eye().matrix asset_objs = img.get('auxiliary', img.get('assets', [])) or [] for idx, obj in enumerate(asset_objs): # Take frobenius norm to get "distance" between transform and # the identity. We want to find the auxiliary closest to the # identity transform. warp_aux_to_img = kwimage.Affine.coerce(obj.get('warp_aux_to_img', None)) fro_dist = np.linalg.norm(warp_aux_to_img - eye, ord='fro') w = obj.get('width', None) or 0 h = obj.get('height', None) or 0 if all(k in obj for k in requires): candidates.append({ 'idx': idx, 'area': w * h, 'fro_dist': fro_dist, 'obj': obj, }) if len(candidates) == 0: return None idx = ub.argmin( candidates, key=lambda val: ( val['fro_dist'], -val['area'], val['idx']) ) obj = candidates[idx]['obj'] return obj
[docs] def iter_image_filepaths(self, with_bundle=True): """ Could rename to iter_asset_filepaths Args: with_bundle (bool): If True, prepends the bundle dpath to fully specify the path. Otherwise, just returns the registered string in the file_name attribute of each asset. Defaults to True. """ dpath = self.bundle_dpath for obj in self.iter_asset_objs(): fname = obj.get('file_name') fpath = join(dpath, fname) yield fpath
[docs] def iter_asset_objs(self): """ Iterate through base + auxiliary dicts that have file paths Yields: dict: an image or auxiliary dictionary """ img = self.img has_base_image = img.get('file_name', None) is not None if has_base_image: obj = img # cant remove auxiliary otherwise inplace modification doesnt work # obj = ub.dict_diff(img, {'auxiliary'}) yield obj for obj in img.get('auxiliary', None) or []: yield obj for obj in img.get('assets', None) or []: yield obj
[docs] def find_asset_obj(self, channels): """ Find the asset dictionary with the specified channels Example: >>> import kwcoco >>> coco_img = kwcoco.CocoImage({'width': 128, 'height': 128}) >>> coco_img.add_auxiliary_item( >>> 'rgb.png', channels='red|green|blue', width=32, height=32) >>> assert coco_img.find_asset_obj('red') is not None >>> assert coco_img.find_asset_obj('green') is not None >>> assert coco_img.find_asset_obj('blue') is not None >>> assert coco_img.find_asset_obj('red|blue') is not None >>> assert coco_img.find_asset_obj('red|green|blue') is not None >>> assert coco_img.find_asset_obj('red|green|blue') is not None >>> assert coco_img.find_asset_obj('black') is None >>> assert coco_img.find_asset_obj('r') is None Example: >>> # Test with concise channel code >>> import kwcoco >>> coco_img = kwcoco.CocoImage({'width': 128, 'height': 128}) >>> coco_img.add_auxiliary_item( >>> 'msi.png', channels='foo.0:128', width=32, height=32) >>> assert coco_img.find_asset_obj('foo') is None >>> assert coco_img.find_asset_obj('foo.3') is not None >>> assert coco_img.find_asset_obj('foo.3:5') is not None >>> assert coco_img.find_asset_obj('foo.3000') is None """ from kwcoco.channel_spec import FusedChannelSpec channels = FusedChannelSpec.coerce(channels) found = None for obj in self.iter_asset_objs(): obj_channels = FusedChannelSpec.coerce(obj['channels']) if (obj_channels & channels).numel(): found = obj break return found
def _assets_key(self): """ Internal helper for transition from auxiliary -> assets in the image spec """ if 'auxiliary' in self: return 'auxiliary' elif 'assets' in self: return 'assets' else: return 'auxiliary'
[docs] def add_auxiliary_item(self, file_name=None, channels=None, imdata=None, warp_aux_to_img=None, width=None, height=None, imwrite=False): """ Adds an auxiliary / asset item to the image dictionary. This operation can be done purely in-memory (the default), or the image data can be written to a file on disk (via the imwrite=True flag). Args: file_name (str | PathLike | None): The name of the file relative to the bundle directory. If unspecified, imdata must be given. channels (str | kwcoco.FusedChannelSpec | None): The channel code indicating what each of the bands represents. These channels should be disjoint wrt to the existing data in this image (this is not checked). imdata (ndarray | None): The underlying image data this auxiliary item represents. If unspecified, it is assumed file_name points to a path on disk that will eventually exist. If imdata, file_name, and the special imwrite=True flag are specified, this function will write the data to disk. warp_aux_to_img (kwimage.Affine | None): The transformation from this auxiliary space to image space. If unspecified, assumes this item is related to image space by only a scale factor. width (int | None): Width of the data in auxiliary space (inferred if unspecified) height (int | None): Height of the data in auxiliary space (inferred if unspecified) imwrite (bool): If specified, both imdata and file_name must be specified, and this will write the data to disk. Note: it it recommended that you simply call imwrite yourself before or after calling this function. This lets you better control imwrite parameters. TODO: - [ ] Allow imwrite to specify an executor that is used to return a Future so the imwrite call does not block. Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> coco_img = dset.coco_image(1) >>> imdata = np.random.rand(32, 32, 5) >>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5') >>> coco_img.add_auxiliary_item(imdata=imdata, channels=channels) Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset() >>> gid = dset.add_image(name='my_image_name', width=200, height=200) >>> coco_img = dset.coco_image(gid) >>> coco_img.add_auxiliary_item('path/img1_B0.tif', channels='B0', width=200, height=200) >>> coco_img.add_auxiliary_item('path/img1_B1.tif', channels='B1', width=200, height=200) >>> coco_img.add_auxiliary_item('path/img1_B2.tif', channels='B2', width=200, height=200) >>> coco_img.add_auxiliary_item('path/img1_TCI.tif', channels='r|g|b', width=200, height=200) """ from os.path import isabs, join # NOQA import kwimage from kwcoco.channel_spec import FusedChannelSpec img = self.img if imdata is None and file_name is None: raise ValueError('must specify file_name or imdata') # Check type of resolution inputs. if not isinstance(width, int) and width is not None: raise TypeError(f'"width" input is neither an int or None variable but type: "{type(width)}"') if not isinstance(height, int) and height is not None: raise TypeError(f'"height" input is neither an int or None variable but type: "{type(height)}"') # Infer resolution inputs from image data. if width is None and imdata is not None: width = imdata.shape[1] if height is None and imdata is not None: height = imdata.shape[0] if warp_aux_to_img is None: img_width = img.get('width', None) img_height = img.get('height', None) if img_width is None or img_height is None: raise ValueError('Parent image canvas has an unknown size. ' 'Need to set width/height') if width is None or height is None: raise ValueError('Unable to infer aux_to_img without width') # Assume we can just scale up the auxiliary data to match the image # space unless the user says otherwise warp_aux_to_img = kwimage.Affine.scale(( img_width / width, img_height / height)) else: warp_aux_to_img = kwimage.Affine.coerce(warp_aux_to_img) if channels is not None: channels = FusedChannelSpec.coerce(channels).spec # Make the aux info dict obj = { 'file_name': file_name, 'height': height, 'width': width, 'channels': channels, 'warp_aux_to_img': warp_aux_to_img.concise(), } if imdata is not None: if imwrite: if __debug__ and file_name is None: raise ValueError( 'file_name must be given if imwrite is True') # if self.dset is None: # fpath = file_name # if not isabs(fpath): # raise ValueError(ub.paragraph( # ''' # Got relative file_name, but no dataset is attached # to this coco image. Attatch a dataset or use an # absolute path. # ''')) # else: fpath = join(self.bundle_dpath, file_name) kwimage.imwrite(fpath, imdata) else: obj['imdata'] = imdata assets_key = self._assets_key() asset_list = img.get(assets_key, None) if asset_list is None: asset_list = img[assets_key] = [] asset_list.append(obj) if self.dset is not None: self.dset._invalidate_hashid()
# Alias for add_auxiliary_item (which will eventually be deprecated) add_asset = add_auxiliary_item
[docs] @profile def delay(self, channels=None, space='image', resolution=None, bundle_dpath=None, interpolation='linear', antialias=True, nodata_method=None, RESOLUTION_KEY=None): """ Perform a delayed load on the data in this image. The delayed load can load a subset of channels, and perform lazy warping operations. If the underlying data is in a tiled format this can reduce the amount of disk IO needed to read the data if only a small crop or lower resolution view of the data is needed. Note: This method is experimental and relies on the delayed load proof-of-concept. Args: gid (int): image id to load channels (kwcoco.FusedChannelSpec): specific channels to load. if unspecified, all channels are loaded. space (str): can either be "image" for loading in image space, or "video" for loading in video space. resolution (None | str | float): If specified, applies an additional scale factor to the result such that the data is loaded at this specified resolution. This requires that the image / video has a registered resolution attribute and that its units agree with this request. TODO: - [ ] This function could stand to have a better name. Maybe imread with a delayed=True flag? Or maybe just delayed_load? Example: >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> gid = 1 >>> # >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = CocoImage(dset.imgs[gid], dset) >>> delayed = self.delay() >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> # >>> dset = kwcoco.CocoDataset.demo('shapes8') >>> delayed = dset.coco_image(gid).delay() >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> crop = delayed.crop((slice(0, 3), slice(0, 3))) >>> crop.finalize() >>> # TODO: should only select the "red" channel >>> dset = kwcoco.CocoDataset.demo('shapes8') >>> delayed = CocoImage(dset.imgs[gid], dset).delay(channels='r') >>> import kwcoco >>> gid = 1 >>> # >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> delayed = dset.coco_image(gid).delay(channels='B1|B2', space='image') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).delay(channels='B1|B2|B11', space='image') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).delay(channels='B8|B1', space='video') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) >>> delayed = dset.coco_image(gid).delay(channels='B8|foo|bar|B1', space='video') >>> print('delayed = {!r}'.format(delayed)) >>> print('delayed.finalize() = {!r}'.format(delayed.finalize())) Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo() >>> coco_img = dset.coco_image(1) >>> # Test case where nothing is registered in the dataset >>> delayed = coco_img.delay() >>> final = delayed.finalize() >>> assert final.shape == (512, 512, 3) >>> delayed = coco_img.delay() >>> final = delayed.finalize() >>> print('final.shape = {}'.format(ub.repr2(final.shape, nl=1))) >>> assert final.shape == (512, 512, 3) Example: >>> # Test that delay works when imdata is stored in the image >>> # dictionary itself. >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> coco_img = dset.coco_image(1) >>> imdata = np.random.rand(6, 6, 5) >>> imdata[:] = np.arange(5)[None, None, :] >>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5') >>> coco_img.add_auxiliary_item(imdata=imdata, channels=channels) >>> delayed = coco_img.delay(channels='B1|Aux:2:4') >>> final = delayed.finalize() Example: >>> # Test delay when loading in asset space >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> stream1 = coco_img.channels.streams()[0] >>> stream2 = coco_img.channels.streams()[1] >>> aux_delayed = coco_img.delay(stream1, space='asset') >>> img_delayed = coco_img.delay(stream1, space='image') >>> vid_delayed = coco_img.delay(stream1, space='video') >>> # >>> aux_imdata = aux_delayed.as_xarray().finalize() >>> img_imdata = img_delayed.as_xarray().finalize() >>> assert aux_imdata.shape != img_imdata.shape >>> # Cannot load multiple asset items at the same time in >>> # asset space >>> import pytest >>> fused_channels = stream1 | stream2 >>> from delayed_image.delayed_nodes import CoordinateCompatibilityError >>> with pytest.raises(CoordinateCompatibilityError): >>> aux_delayed2 = coco_img.delay(fused_channels, space='asset') Example: >>> # Test loading at a specific resolution. >>> from kwcoco.coco_image import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor') >>> coco_img = dset.coco_image(1) >>> coco_img.img['resolution'] = '1 meter' >>> img_delayed1 = coco_img.delay(space='image') >>> vid_delayed1 = coco_img.delay(space='video') >>> # test with unitless request >>> img_delayed2 = coco_img.delay(space='image', resolution=3.1) >>> vid_delayed2 = coco_img.delay(space='video', resolution='3.1 meter') >>> np.ceil(img_delayed1.shape[0] / 3.1) == img_delayed2.shape[0] >>> np.ceil(vid_delayed1.shape[0] / 3.1) == vid_delayed2.shape[0] >>> # test with unitless data >>> coco_img.img['resolution'] = 1 >>> img_delayed2 = coco_img.delay(space='image', resolution=3.1) >>> vid_delayed2 = coco_img.delay(space='video', resolution='3.1 meter') >>> np.ceil(img_delayed1.shape[0] / 3.1) == img_delayed2.shape[0] >>> np.ceil(vid_delayed1.shape[0] / 3.1) == vid_delayed2.shape[0] """ from kwimage.transform import Affine from kwcoco.channel_spec import FusedChannelSpec if bundle_dpath is None: bundle_dpath = self.bundle_dpath img = self.img requested = channels if requested is not None: requested = FusedChannelSpec.coerce(requested).normalize() # Get info about the primary image and check if its channels are # requested (if it even has any) img_info = _delay_load_imglike(bundle_dpath, img, nodata_method=nodata_method) obj_info_list = [(img_info, img)] auxlist = img.get('auxiliary', img.get('assets', [])) or [] for aux in auxlist: info = _delay_load_imglike(bundle_dpath, aux, nodata_method=nodata_method) obj_info_list.append((info, aux)) chan_list = [] for info, obj in obj_info_list: if info.get('chan_construct', None) is not None: include_flag = requested is None if not include_flag: if requested.intersection(info['channels']): include_flag = True if include_flag: chncls, chnkw = info['chan_construct'] chan = chncls(**chnkw) quant = info.get('quantization', None) if quant is not None: chan = chan.dequantize(quant) if space not in {'auxiliary', 'asset'}: aux_to_img = Affine.coerce(obj.get('warp_aux_to_img', None)) chan = chan.warp( aux_to_img, dsize=img_info['dsize']) chan_list.append(chan) # TODO: allow load in auxiliary/asset space if space == 'video': video = self.video width = video.get('width', img.get('width', None)) height = video.get('height', img.get('height', None)) else: width = img.get('width', None) height = img.get('height', None) dsize = (width, height) if len(chan_list) == 0: if requested is not None: # Handle case where the image doesnt have the requested # channels. from delayed_image import DelayedNans from delayed_image import DelayedChannelConcat delayed = DelayedNans(dsize=dsize, channels=requested) delayed = DelayedChannelConcat([delayed]) else: raise ValueError('no data registered in kwcoco image') else: from delayed_image import DelayedChannelConcat delayed = DelayedChannelConcat(chan_list) # Reorder channels in the requested order if requested is not None: delayed = delayed.take_channels(requested) if hasattr(delayed, 'components'): if len(delayed.components) == 1: delayed = delayed.components[0] if space in {'image', 'auxiliary', 'asset'}: pass elif space == 'video': img_to_vid = self.warp_vid_from_img delayed = delayed.warp(img_to_vid, dsize=dsize, interpolation=interpolation, antialias=antialias) else: raise KeyError('space = {}'.format(space)) if resolution is not None: # Adjust to the requested resolution factor = self._scalefactor_for_resolution( space=space, resolution=resolution, RESOLUTION_KEY=RESOLUTION_KEY) delayed = delayed.scale(factor) return delayed
[docs] @ub.memoize_method def valid_region(self, space='image'): """ If this image has a valid polygon, return it in image, or video space """ import kwimage valid_coco_poly = self.img.get('valid_region', None) if valid_coco_poly is None: valid_poly = None else: kw_poly_img = kwimage.MultiPolygon.coerce(valid_coco_poly) if kw_poly_img is None: valid_poly = None else: if space == 'image': valid_poly = kw_poly_img elif space == 'video': warp_vid_from_img = self.warp_vid_from_img valid_poly = kw_poly_img.warp(warp_vid_from_img) else: # To warp it into an auxiliary space we need to know which one raise NotImplementedError(space) return valid_poly
# def warp_vid_from_img(self): # pass # def warp_vid_from_img(self): # pass @ub.memoize_property def warp_vid_from_img(self): import kwimage warp_img_to_vid = kwimage.Affine.coerce(self.img.get('warp_img_to_vid', None)) if warp_img_to_vid.matrix is None: # Hack to ensure the matrix property always is an array warp_img_to_vid.matrix = np.asarray(warp_img_to_vid) return warp_img_to_vid @ub.memoize_property def warp_img_from_vid(self): return self.warp_vid_from_img.inv() def _annot_segmentation(self, ann, space='video'): import kwimage warp_vid_from_img = self.warp_vid_from_img img_sseg = kwimage.MultiPolygon.coerce(ann['segmentation']) if space == 'image': warped_sseg = img_sseg pass elif space == 'video': vid_sseg = img_sseg.warp(warp_vid_from_img) warped_sseg = vid_sseg else: raise NotImplementedError(space) # auxiliary/asset space return warped_sseg
[docs] def resolution(self, space='image', RESOLUTION_KEY=None): """ Returns the resolution of this CocoImage in the requested space if known. Errors if this information is not registered. Example: >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = dset.coco_image(1) >>> self.img['resolution'] = 1 >>> self.resolution() >>> self.img['resolution'] = '1 meter' >>> self.resolution(space='video') """ import kwimage # Compute the offset transform from the requested space # Handle the cases where resolution is specified at the image or at the # video level. if RESOLUTION_KEY is None: RESOLUTION_KEY = DEFAULT_RESOLUTION_KEYS def aliased_get(d, keys, default=None): if not ub.iterable(keys): return d.get(keys, default) else: found = 0 for key in keys: if key in d: found = 1 val = d[key] break if not found: val = default return val if space == 'video': vid_resolution_expr = aliased_get(self.video, RESOLUTION_KEY, None) if vid_resolution_expr is None: # Do we have an image level resolution? img_resolution_expr = aliased_get(self.img, RESOLUTION_KEY, None) assert img_resolution_expr is not None img_resolution_info = coerce_resolution(img_resolution_expr) img_resolution_mat = kwimage.Affine.scale(img_resolution_info['mag']) vid_resolution = (self.warp_vid_from_img @ img_resolution_mat.inv()).inv() vid_resolution_info = { 'mag': vid_resolution.decompose()['scale'], 'unit': img_resolution_info['unit'] } else: vid_resolution_info = coerce_resolution(vid_resolution_expr) space_resolution_info = vid_resolution_info elif space == 'image': img_resolution_expr = aliased_get(self.img, RESOLUTION_KEY, None) if img_resolution_expr is None: # Do we have an image level resolution? vid_resolution_expr = aliased_get(self.video, RESOLUTION_KEY, None) assert vid_resolution_expr is not None vid_resolution_info = coerce_resolution(vid_resolution_expr) vid_resolution_mat = kwimage.Affine.scale(vid_resolution_info['mag']) img_resolution = (self.warp_img_from_vid @ vid_resolution_mat.inv()).inv() img_resolution_info = { 'mag': img_resolution.decompose()['scale'], 'unit': vid_resolution_info['unit'] } else: img_resolution_info = coerce_resolution(img_resolution_expr) space_resolution_info = img_resolution_info elif space == 'asset': raise NotImplementedError(space) else: raise KeyError(space) return space_resolution_info
def _scalefactor_for_resolution(self, space, resolution, RESOLUTION_KEY=None): """ Given image or video space, compute the scale factor needed to achieve the target resolution. """ space_resolution_info = self.resolution(space=space, RESOLUTION_KEY=RESOLUTION_KEY) request_resolution_info = coerce_resolution(resolution) # If units are unspecified, assume they are compatible if space_resolution_info['unit'] is not None: if request_resolution_info['unit'] is not None: assert space_resolution_info['unit'] == request_resolution_info['unit'] x1, y1 = request_resolution_info['mag'] x2, y2 = space_resolution_info['mag'] scale_factor = (x2 / x1, y2 / y1) return scale_factor def _detections_for_resolution(coco_img, space='video', resolution=None, RESOLUTION_KEY=None): """ This is slightly less than ideal in terms of API, but it will work for now. """ import kwimage # Build transform from image to requested space warp_vid_from_img = coco_img.warp_vid_from_img scale = coco_img._scalefactor_for_resolution(space='video', resolution=resolution, RESOLUTION_KEY=RESOLUTION_KEY) warp_req_from_vid = kwimage.Affine.scale(scale) warp_req_from_img = warp_req_from_vid @ warp_vid_from_img # Get annotations in "Image Space" annots = coco_img.dset.annots(image_id=coco_img.img['id']) imgspace_dets = annots.detections # Warp them into the requested space reqspace_dets = imgspace_dets.warp(warp_req_from_img) reqspace_dets.data['aids'] = np.array(list(annots)) return reqspace_dets
# TODO: # class AliasedDictProxy(DictLike): # def __init__(self, _data): # self._data = _data # def resolve_key(self, key): # return self.__key_resolver__.get(key, key) # def getitem(self, key): # ... # def keys(self, key): # return map(self.resolve_key, self._data.keys()) # TODO:
[docs]class CocoAsset(object): """ A Coco Asset / Auxiliary Item Represents one 2D image file relative to a parent img. Could be a single asset, or an image with sub-assets, but sub-assets are ignored here. Initially we called these "auxiliary" items, but I think we should change their name to "assets", which better maps with STAC terminology. """ # To maintain backwards compatibility we register aliases of properties # The main key should be the primary property. __key_aliases__ = { 'warp_img_from_asset': ['warp_aux_to_img'], 'warp_wld_from_asset': ['warp_to_wld'], } __key_resolver__ = { v: k for k, vs in __key_aliases__.items() for v in vs } def __init__(self, obj): self.obj = obj def __getitem__(self, key): """ Proxy getter attribute for underlying `self.obj` dictionary """ return self.obj[key]
[docs] def keys(self): """ Proxy getter attribute for underlying `self.obj` dictionary """ return self.obj.keys()
[docs] def get(self, key, default=ub.NoParam): """ Proxy getter attribute for underlying `self.obj` dictionary """ if default is ub.NoParam: return self.obj.get(key) else: return self.obj.get(key, default)
def _delay_load_imglike(bundle_dpath, obj, nodata_method=None): from os.path import join from kwcoco.channel_spec import FusedChannelSpec info = {} fname = obj.get('file_name', None) imdata = obj.get('imdata', None) channels_ = obj.get('channels', None) if channels_ is not None: channels_ = FusedChannelSpec.coerce(channels_) channels_ = channels_.normalize() info['channels'] = channels_ width = obj.get('width', None) height = obj.get('height', None) if height is not None and width is not None: info['dsize'] = dsize = (width, height) else: info['dsize'] = dsize = (None, None) from delayed_image import DelayedLoad, DelayedIdentity quantization = obj.get('quantization', None) if imdata is not None: info['chan_construct'] = (DelayedIdentity, dict( data=imdata, channels=channels_, dsize=dsize)) elif fname is not None: info['fpath'] = fpath = join(bundle_dpath, fname) info['chan_construct'] = ( DelayedLoad, dict(fpath=fpath, channels=channels_, dsize=dsize, nodata_method=nodata_method)) info['quantization'] = quantization return info
[docs]def parse_quantity(expr): import re expr_pat = re.compile( r'^(?P<magnitude>[-+]?[0-9]*\.?[0-9]+([eE][-+]?[0-9]+)?)' '(?P<spaces> *)' '(?P<unit>.*)$') match = expr_pat.match(expr.strip()) if match is None: raise ValueError(f'Unable to parse {expr!r}') return match.groupdict()
[docs]def coerce_resolution(expr): if isinstance(expr, str): result = parse_quantity(expr) unit = result['unit'] x = y = float(result['magnitude']) else: x = y = float(expr) unit = None parsed = { 'mag': (x, y), 'unit': unit, } return parsed