import ubelt as ub
import numpy as np
from os.path import join
try:
from xdev import profile
except Exception:
[docs]class CocoImage(ub.NiceRepr):
"""
An object-oriented representation of a coco image.
It provides helper methods that are specific to a single image.
This operates directly on a single coco image dictionary, but it can
optionally be connected to a parent dataset, which allows it to use
CocoDataset methods to query about relationships and resolve pointers.
This is different than the Images class in coco_object1d, which is just a
vectorized interface to multiple objects.
Example:
>>> import kwcoco
>>> dset1 = kwcoco.CocoDataset.demo('shapes8')
>>> dset2 = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> self = CocoImage(dset1.imgs[1], dset1)
>>> print('self = {!r}'.format(self))
>>> print('self.channels = {}'.format(ub.repr2(self.channels, nl=1)))
>>> self = CocoImage(dset2.imgs[1], dset2)
>>> print('self.channels = {}'.format(ub.repr2(self.channels, nl=1)))
>>> self.primary_asset()
"""
def __init__(self, img, dset=None):
self.img = img
self.dset = dset
self._bundle_dpath = None
self._video = None
@classmethod
[docs] def from_gid(cls, dset, gid):
img = dset.index.imgs[gid]
self = cls(img, dset=dset)
return self
@property
[docs] def bundle_dpath(self):
if self.dset is not None:
return self.dset.bundle_dpath
else:
return self._bundle_dpath
@bundle_dpath.setter
def bundle_dpath(self, value):
self._bundle_dpath = value
@property
[docs] def video(self):
"""
Helper to grab the video for this image if it exists
"""
if self._video is None and self.dset is not None:
vidid = self.img.get('video_id', None)
if vidid is None:
video = None
else:
video = self.dset.index.videos[vidid]
else:
video = self._video
return video
@video.setter
def video(self, value):
self._video = value
[docs] def detach(self):
"""
Removes references to the underlying coco dataset, but keeps special
information such that it wont be needed.
"""
self._bundle_dpath = self.bundle_dpath
self._video = self.video
self.dset = None
return self
[docs] def __nice__(self):
"""
Example:
>>> from kwcoco.coco_image import * # NOQA
>>> import kwcoco
>>> with ub.CaptureStdout() as cap:
... dset = kwcoco.CocoDataset.demo('shapes8')
>>> self = CocoImage(dset.dataset['images'][0], dset)
>>> print('self = {!r}'.format(self))
>>> dset = kwcoco.CocoDataset.demo()
>>> self = CocoImage(dset.dataset['images'][0], dset)
>>> print('self = {!r}'.format(self))
"""
from kwcoco.util.util_truncate import smart_truncate
from functools import partial
stats = self.stats()
stats = ub.map_vals(str, stats)
stats = ub.map_vals(
partial(smart_truncate, max_length=32, trunc_loc=0.5),
stats)
return ub.repr2(stats, compact=1, nl=0, sort=0)
[docs] def stats(self):
"""
"""
key_attrname = [
('wh', 'dsize'),
('n_chan', 'num_channels'),
('channels', 'channels'),
]
stats = {}
for key, attrname in key_attrname:
try:
stats[key] = getattr(self, attrname)
except Exception as ex:
stats[key] = repr(ex)
return stats
[docs] def __getitem__(self, key):
"""
Proxy getter attribute for underlying `self.img` dictionary
"""
if 'extra' in self.img:
# SQL compatibility
try:
return self.img[key]
except KeyError:
return self.img['extra'][key]
else:
return self.img[key]
[docs] def keys(self):
"""
Proxy getter attribute for underlying `self.img` dictionary
"""
if 'extra' in self.img:
# SQL compatibility
return iter([*self.img.keys(), *self.img['extra'].keys()])
else:
return self.img.keys()
[docs] def get(self, key, default=ub.NoParam):
"""
Proxy getter attribute for underlying `self.img` dictionary
"""
# Workaround for sql-view
if 'extra' in self.img:
if key in self.img:
return self.img[key]
elif key in self.img['extra']:
return self.img['extra'][key]
elif default is ub.NoParam:
raise KeyError(key)
else:
return default
else:
if default is ub.NoParam:
return self.img.get(key)
else:
return self.img.get(key, default)
@property
[docs] def channels(self):
from kwcoco.channel_spec import FusedChannelSpec
from kwcoco.channel_spec import ChannelSpec
img_parts = []
for obj in self.iter_asset_objs():
obj_parts = obj.get('channels', None)
# obj_chan = FusedChannelSpec.coerce(obj_parts).normalize()
obj_chan = FusedChannelSpec.coerce(obj_parts)
img_parts.append(obj_chan.spec)
spec = ChannelSpec(','.join(img_parts))
return spec
@property
[docs] def num_channels(self):
return self.channels.numel()
# return sum(map(len, self.channels.streams()))
@property
[docs] def dsize(self):
width = self.img.get('width', None)
height = self.img.get('height', None)
return width, height
[docs] def primary_image_filepath(self, requires=None):
dpath = self.bundle_dpath
fname = self.primary_asset()['file_name']
fpath = join(dpath, fname)
return fpath
[docs] def primary_asset(self, requires=None):
"""
Compute a "main" image asset.
Notes:
Uses a heuristic.
* First, try to find the auxiliary image that has with the smallest
distortion to the base image (if known via warp_aux_to_img)
* Second, break ties by using the largest image if w / h is known
* Last, if previous information not available use the first
auxiliary image.
Args:
requires (List[str]):
list of attribute that must be non-None to consider an object
as the primary one.
TODO:
- [ ] Add in primary heuristics
Example:
>>> import kwarray
>>> from kwcoco.coco_image import * # NOQA
>>> rng = kwarray.ensure_rng(0)
>>> def random_auxiliary(name, w=None, h=None):
>>> return {'file_name': name, 'width': w, 'height': h}
>>> self = CocoImage({
>>> 'auxiliary': [
>>> random_auxiliary('1'),
>>> random_auxiliary('2'),
>>> random_auxiliary('3'),
>>> ]
>>> })
>>> assert self.primary_asset()['file_name'] == '1'
>>> self = CocoImage({
>>> 'auxiliary': [
>>> random_auxiliary('1'),
>>> random_auxiliary('2', 3, 3),
>>> random_auxiliary('3'),
>>> ]
>>> })
>>> assert self.primary_asset()['file_name'] == '2'
"""
import kwimage
if requires is None:
requires = []
img = self.img
has_base_image = img.get('file_name', None) is not None
candidates = []
if has_base_image:
obj = img
if all(k in obj for k in requires):
# Return the base image if we can
return obj
# Choose "best" auxiliary image based on a hueristic.
eye = kwimage.Affine.eye().matrix
for idx, obj in enumerate(img.get('auxiliary', [])):
# Take frobenius norm to get "distance" between transform and
# the identity. We want to find the auxiliary closest to the
# identity transform.
warp_aux_to_img = kwimage.Affine.coerce(obj.get('warp_aux_to_img', None))
fro_dist = np.linalg.norm(warp_aux_to_img - eye, ord='fro')
w = obj.get('width', None) or 0
h = obj.get('height', None) or 0
if all(k in obj for k in requires):
candidates.append({
'idx': idx,
'area': w * h,
'fro_dist': fro_dist,
'obj': obj,
})
if len(candidates) == 0:
return None
idx = ub.argmin(
candidates, key=lambda val: (
val['fro_dist'], -val['area'], val['idx'])
)
obj = candidates[idx]['obj']
return obj
[docs] def iter_image_filepaths(self):
"""
"""
dpath = self.bundle_dpath
for obj in self.iter_asset_objs():
fname = obj.get('file_name')
fpath = join(dpath, fname)
yield fpath
[docs] def iter_asset_objs(self):
"""
Iterate through base + auxiliary dicts that have file paths
Yields:
dict: an image or auxiliary dictionary
"""
img = self.img
has_base_image = img.get('file_name', None) is not None
if has_base_image:
obj = img
# cant remove auxiliary otherwise inplace modification doesnt work
# obj = ub.dict_diff(img, {'auxiliary'})
yield obj
for obj in img.get('auxiliary', []):
yield obj
[docs] def find_asset_obj(self, channels):
"""
Find the asset dictionary with the specified channels
"""
from kwcoco.channel_spec import FusedChannelSpec
channels = FusedChannelSpec.coerce(channels)
found = None
for obj in self.iter_asset_objs():
obj_channels = FusedChannelSpec.coerce(obj['channels'])
if (obj_channels & channels).numel():
found = obj
break
return found
[docs] def add_auxiliary_item(self, file_name=None, channels=None,
imdata=None, warp_aux_to_img=None, width=None,
height=None, imwrite=False):
"""
Adds an auxiliary item to the image dictionary.
This operation can be done purely in-memory (the default), or the image
data can be written to a file on disk (via the imwrite=True flag).
Args:
file_name (str | None):
The name of the file relative to the bundle directory. If
unspecified, imdata must be given.
channels (str | kwcoco.FusedChannelSpec):
The channel code indicating what each of the bands represents.
These channels should be disjoint wrt to the existing data in
this image (this is not checked).
imdata (ndarray | None):
The underlying image data this auxiliary item represents. If
unspecified, it is assumed file_name points to a path on disk
that will eventually exist. If imdata, file_name, and the
special imwrite=True flag are specified, this function will
write the data to disk.
warp_aux_to_img (kwimage.Affine):
The transformation from this auxiliary space to image space.
If unspecified, assumes this item is related to image space by
only a scale factor.
width (int):
Width of the data in auxiliary space (inferred if unspecified)
height (int):
Height of the data in auxiliary space (inferred if unspecified)
imwrite (bool):
If specified, both imdata and file_name must be specified, and
this will write the data to disk. Note: it it recommended that
you simply call imwrite yourself before or after calling this
function. This lets you better control imwrite parameters.
TODO:
- [ ] Allow imwrite to specify an executor that is used to
return a Future so the imwrite call does not block.
Example:
>>> from kwcoco.coco_image import * # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> coco_img = dset.coco_image(1)
>>> imdata = np.random.rand(32, 32, 5)
>>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5')
>>> coco_img.add_auxiliary_item(imdata=imdata, channels=channels)
"""
from os.path import isabs, join # NOQA
import kwimage
from kwcoco.channel_spec import FusedChannelSpec
img = self.img
if imdata is None and file_name is None:
raise ValueError('must specify file_name or imdata')
if width is None and imdata is not None:
width = imdata.shape[1]
if height is None and imdata is not None:
height = imdata.shape[0]
if warp_aux_to_img is None:
if width is None or height is None:
raise ValueError('unable to infer aux_to_img without width')
# Assume we can just scale up the auxiliary data to match the image
# space unless the user says otherwise
warp_aux_to_img = kwimage.Affine.scale((
img['width'] / width, img['height'] / height))
if channels is not None:
channels = FusedChannelSpec.coerce(channels).spec
# Make the aux info dict
aux = {
'file_name': file_name,
'height': height,
'width': width,
'channels': channels,
'warp_aux_to_img': warp_aux_to_img.concise(),
}
if imdata is not None:
if imwrite:
if __debug__ and file_name is None:
raise ValueError(
'file_name must be given if imwrite is True')
# if self.dset is None:
# fpath = file_name
# if not isabs(fpath):
# raise ValueError(ub.paragraph(
# '''
# Got relative file_name, but no dataset is attached
# to this coco image. Attatch a dataset or use an
# absolute path.
# '''))
# else:
fpath = join(self.bundle_dpath, file_name)
kwimage.imwrite(fpath, imdata)
else:
aux['imdata'] = imdata
auxiliary = img.get('auxiliary', None)
if auxiliary is None:
auxiliary = img['auxiliary'] = []
auxiliary.append(aux)
if self.dset is not None:
self.dset._invalidate_hashid()
@profile
[docs] def delay(self, channels=None, space='image', bundle_dpath=None):
"""
Perform a delayed load on the data in this image.
The delayed load can load a subset of channels, and perform lazy
warping operations. If the underlying data is in a tiled format this
can reduce the amount of disk IO needed to read the data if only a
small crop or lower resolution view of the data is needed.
Note:
This method is experimental and relies on the delayed load
proof-of-concept.
Args:
gid (int): image id to load
channels (FusedChannelSpec): specific channels to load.
if unspecified, all channels are loaded.
space (str):
can either be "image" for loading in image space, or
"video" for loading in video space.
TODO:
- [X] Currently can only take all or none of the channels from each
base-image / auxiliary dict. For instance if the main image is
r|g|b you can't just select g|b at the moment.
- [X] The order of the channels in the delayed load should
match the requested channel order.
- [X] TODO: add nans to bands that don't exist or throw an error
- [ ] This function could stand to have a better name. Maybe imread
with a delayed=True flag? Or maybe just delayed_load?
Example:
>>> from kwcoco.coco_image import * # NOQA
>>> import kwcoco
>>> gid = 1
>>> #
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> self = CocoImage(dset.imgs[gid], dset)
>>> delayed = self.delay()
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize()))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> #
>>> dset = kwcoco.CocoDataset.demo('shapes8')
>>> delayed = dset.delayed_load(gid)
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize()))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> crop = delayed.delayed_crop((slice(0, 3), slice(0, 3)))
>>> crop.finalize()
>>> crop.finalize(as_xarray=True)
>>> # TODO: should only select the "red" channel
>>> dset = kwcoco.CocoDataset.demo('shapes8')
>>> delayed = CocoImage(dset.imgs[gid], dset).delay(channels='r')
>>> import kwcoco
>>> gid = 1
>>> #
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = dset.delayed_load(gid, channels='B1|B2', space='image')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> delayed = dset.delayed_load(gid, channels='B1|B2|B11', space='image')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> delayed = dset.delayed_load(gid, channels='B8|B1', space='video')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> delayed = dset.delayed_load(gid, channels='B8|foo|bar|B1', space='video')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo()
>>> coco_img = dset.coco_image(1)
>>> # Test case where nothing is registered in the dataset
>>> delayed = coco_img.delay()
>>> final = delayed.finalize()
>>> assert final.shape == (512, 512, 3)
Example:
>>> # Test that delay works when imdata is stored in the image
>>> # dictionary itself.
>>> from kwcoco.coco_image import * # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> coco_img = dset.coco_image(1)
>>> imdata = np.random.rand(6, 6, 5)
>>> imdata[:] = np.arange(5)[None, None, :]
>>> channels = kwcoco.FusedChannelSpec.coerce('Aux:5')
>>> coco_img.add_auxiliary_item(imdata=imdata, channels=channels)
>>> delayed = coco_img.delay(channels='B1|Aux:2:4')
>>> final = delayed.finalize()
Example:
>>> # Test delay when loading in auxiliary space
>>> from kwcoco.coco_image import * # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-msi-multisensor')
>>> coco_img = dset.coco_image(1)
>>> stream1 = coco_img.channels.streams()[0]
>>> stream2 = coco_img.channels.streams()[1]
>>> aux_delayed = coco_img.delay(stream1, space='auxiliary')
>>> img_delayed = coco_img.delay(stream1, space='image')
>>> vid_delayed = coco_img.delay(stream1, space='video')
>>> #
>>> aux_imdata = aux_delayed.finalize()
>>> img_imdata = img_delayed.finalize()
>>> assert aux_imdata.shape != img_imdata.shape
>>> # Cannot load multiple auxiliary items at the same time in
>>> # auxiliary space
>>> import pytest
>>> fused_channels = stream1 | stream2
>>> with pytest.raises(kwcoco.exceptions.CoordinateCompatibilityError):
>>> aux_delayed2 = coco_img.delay(fused_channels, space='auxiliary')
"""
from kwcoco.util.util_delayed_poc import DelayedChannelConcat
from kwcoco.util.util_delayed_poc import DelayedNans
from kwimage.transform import Affine
from kwcoco.channel_spec import FusedChannelSpec
if bundle_dpath is None:
bundle_dpath = self.bundle_dpath
img = self.img
requested = channels
if requested is not None:
requested = FusedChannelSpec.coerce(requested).normalize()
# Get info about the primary image and check if its channels are
# requested (if it even has any)
img_info = _delay_load_imglike(bundle_dpath, img)
obj_info_list = [(img_info, img)]
for aux in img.get('auxiliary', []):
info = _delay_load_imglike(bundle_dpath, aux)
obj_info_list.append((info, aux))
chan_list = []
for info, obj in obj_info_list:
if info.get('chan_construct', None) is not None:
include_flag = requested is None
if not include_flag:
if requested.intersection(info['channels']):
include_flag = True
if include_flag:
chncls, chnkw = info['chan_construct']
chan = chncls(**chnkw)
if space != 'auxiliary':
aux_to_img = Affine.coerce(obj.get('warp_aux_to_img', None))
chan = chan.delayed_warp(
aux_to_img, dsize=img_info['dsize'])
chan_list.append(chan)
# TODO: allow load in auxiliary space
if space == 'video':
video = self.video
width = video.get('width', img.get('width', None))
height = video.get('height', img.get('height', None))
else:
width = img.get('width', None)
height = img.get('height', None)
dsize = (width, height)
if len(chan_list) == 0:
if requested is not None:
# Handle case where the image doesnt have the requested
# channels.
delayed = DelayedNans(dsize=dsize, channels=requested)
return delayed
else:
raise ValueError('no data registered in kwcoco image')
else:
delayed = DelayedChannelConcat(chan_list)
# Reorder channels in the requested order
if requested is not None:
delayed = delayed.take_channels(requested)
if hasattr(delayed, 'components'):
if len(delayed.components) == 1:
delayed = delayed.components[0]
if space in {'image', 'auxiliary'}:
pass
elif space == 'video':
img_to_vid = Affine.coerce(img.get('warp_img_to_vid', None))
delayed = delayed.delayed_warp(img_to_vid, dsize=dsize)
else:
raise KeyError('space = {}'.format(space))
return delayed
@ub.memoize_method
[docs] def valid_region(self, space='image'):
"""
If this image has a valid polygon, return it in image, or video space
"""
import kwimage
valid_coco_poly = self.img.get('valid_region', None)
if valid_coco_poly is None:
valid_poly = None
else:
kw_poly_img = kwimage.MultiPolygon.coerce(valid_coco_poly)
if kw_poly_img is None:
valid_poly = None
else:
if space == 'image':
valid_poly = kw_poly_img
elif space == 'video':
warp_vid_from_img = self.warp_vid_from_img
valid_poly = kw_poly_img.warp(warp_vid_from_img)
else:
# To warp it into an auxiliary space we need to know which one
raise NotImplementedError(space)
return valid_poly
# def warp_vid_from_img(self):
# pass
# def warp_vid_from_img(self):
# pass
@ub.memoize_property
[docs] def warp_vid_from_img(self):
import kwimage
warp_img_to_vid = kwimage.Affine.coerce(self.img.get('warp_img_to_vid', None))
return warp_img_to_vid
@ub.memoize_property
[docs] def warp_img_from_vid(self):
return self.warp_vid_from_img.inv()
[docs] def _annot_segmentation(self, ann, space='video'):
import kwimage
warp_vid_from_img = self.warp_vid_from_img
img_sseg = kwimage.MultiPolygon.coerce(ann['segmentation'])
if space == 'image':
warped_sseg = img_sseg
pass
elif space == 'video':
vid_sseg = img_sseg.warp(warp_vid_from_img)
warped_sseg = vid_sseg
else:
raise NotImplementedError(space) # auxiliary/asset space
return warped_sseg
[docs]class CocoAsset(object):
"""
Represents one 2D image file relative to a parent img.
Could be a single asset, or an image with sub-assets, but sub-assets are
ignored here.
Initially we called these "auxiliary" items, but I think we should
change their name to "assets", which better maps with STAC terminology.
"""
[docs] def __getitem__(self, key):
"""
Proxy getter attribute for underlying `self.obj` dictionary
"""
return self.obj[key]
[docs] def keys(self):
"""
Proxy getter attribute for underlying `self.obj` dictionary
"""
return self.obj.keys()
[docs] def get(self, key, default=ub.NoParam):
"""
Proxy getter attribute for underlying `self.obj` dictionary
"""
if default is ub.NoParam:
return self.obj.get(key)
else:
return self.obj.get(key, default)
[docs]def _delay_load_imglike(bundle_dpath, obj):
from kwcoco.util.util_delayed_poc import DelayedLoad, DelayedIdentity
from os.path import join
from kwcoco.channel_spec import FusedChannelSpec
info = {}
fname = obj.get('file_name', None)
imdata = obj.get('imdata', None)
channels_ = obj.get('channels', None)
if channels_ is not None:
channels_ = FusedChannelSpec.coerce(channels_)
channels_ = channels_.normalize()
info['channels'] = channels_
width = obj.get('width', None)
height = obj.get('height', None)
if height is not None and width is not None:
info['dsize'] = dsize = (width, height)
else:
info['dsize'] = dsize = (None, None)
quantization = obj.get('quantization', None)
if imdata is not None:
info['chan_construct'] = (DelayedIdentity, dict(
sub_data=imdata, channels=channels_, dsize=dsize,
quantization=quantization))
elif fname is not None:
info['fpath'] = fpath = join(bundle_dpath, fname)
# Delaying this gives us a small speed boost
info['chan_construct'] = (DelayedLoad, dict(
fpath=fpath, channels=channels_, dsize=dsize,
quantization=quantization))
return info