import ubelt as ub
[docs]class CocoImage(ub.NiceRepr):
"""
An object-oriented representation of a coco image.
It provides helper methods that are specific to a single image.
This operates directly on a single coco image dictionary, but it can
optionally be connected to a parent dataset, which allows it to use
CocoDataset methods to query about relationships and resolve pointers.
This is different than the Images class in coco_object1d, which is just a
vectorized interface to multiple objects.
Example:
>>> import kwcoco
>>> dset1 = kwcoco.CocoDataset.demo('shapes8')
>>> dset2 = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> self = CocoImage(dset1.imgs[1], dset1)
>>> print('self = {!r}'.format(self))
>>> print('self.channels = {}'.format(ub.repr2(self.channels, nl=1)))
>>> self = CocoImage(dset2.imgs[1], dset2)
>>> print('self.channels = {}'.format(ub.repr2(self.channels, nl=1)))
>>> self.primary_asset()
"""
def __init__(self, img, dset=None):
self.img = img
self.dset = dset
@classmethod
[docs] def from_gid(cls, dset, gid):
img = dset.index.imgs[gid]
self = cls(img, dset=dset)
return self
[docs] def __nice__(self):
"""
Example:
>>> from kwcoco.coco_image import * # NOQA
>>> import kwcoco
>>> with ub.CaptureStdout() as cap:
... dset = kwcoco.CocoDataset.demo('shapes8')
>>> self = CocoImage(dset.dataset['images'][0], dset)
>>> print('self = {!r}'.format(self))
>>> dset = kwcoco.CocoDataset.demo()
>>> self = CocoImage(dset.dataset['images'][0], dset)
>>> print('self = {!r}'.format(self))
"""
from kwcoco.util.util_truncate import smart_truncate
from functools import partial
stats = self.stats()
stats = ub.map_vals(str, stats)
stats = ub.map_vals(
partial(smart_truncate, max_length=32, trunc_loc=0.5),
stats)
return ub.repr2(stats, compact=1, nl=0, sort=0)
[docs] def stats(self):
"""
"""
key_attrname = [
('wh', 'dsize'),
('n_chan', 'num_channels'),
('channels', 'channels'),
]
stats = {}
for key, attrname in key_attrname:
try:
stats[key] = getattr(self, attrname)
except Exception as ex:
stats[key] = repr(ex)
return stats
[docs] def __getitem__(self, key):
return self.img[key]
[docs] def keys(self):
return self.img.keys()
[docs] def get(self, key, default=ub.NoParam):
"""
Duck type some of the dict interface
"""
if default is ub.NoParam:
return self.img.get(key)
else:
return self.img.get(key, default)
@property
[docs] def channels(self):
from kwcoco.channel_spec import FusedChannelSpec
from kwcoco.channel_spec import ChannelSpec
img_parts = []
for obj in self.iter_asset_objs():
obj_parts = obj.get('channels', None)
obj_chan = FusedChannelSpec.coerce(obj_parts).normalize()
img_parts.append(obj_chan.spec)
spec = ChannelSpec(','.join(img_parts))
return spec
@property
[docs] def num_channels(self):
return self.channels.numel()
# return sum(map(len, self.channels.streams()))
@property
[docs] def dsize(self):
width = self.img.get('width', None)
height = self.img.get('height', None)
return width, height
[docs] def primary_asset(self, requires=[]):
"""
Compute a "main" image asset.
Args:
requires (List[str]):
list of attribute that must be non-None to consider an object
as the primary one.
TODO:
- [ ] Add in primary heuristics
"""
import kwimage
import numpy as np
img = self.img
has_base_image = img.get('file_name', None) is not None
candidates = []
if has_base_image:
obj = img
if all(k in obj for k in requires):
# Return the base image if we can
return obj
# Choose "best" auxiliary image based on a hueristic.
eye = kwimage.Affine.eye().matrix
for obj in img.get('auxiliary', []):
# Take frobenius norm to get "distance" between transform and
# the identity. We want to find the auxiliary closest to the
# identity transform.
warp_aux_to_img = kwimage.Affine.coerce(obj.get('warp_aux_to_img', None))
fro_dist = np.linalg.norm(warp_aux_to_img.matrix - eye, ord='fro')
if all(k in obj for k in requires):
candidates.append({
'area': obj['width'] * obj['height'],
'fro_dist': fro_dist,
'obj': obj,
})
if len(candidates) == 0:
return None
idx = ub.argmin(
candidates, key=lambda val: (val['fro_dist'], -val['area'])
)
obj = candidates[idx]['obj']
return obj
[docs] def iter_asset_objs(self):
"""
Iterate through base + auxiliary dicts that have file paths
"""
img = self.img
has_base_image = img.get('file_name', None) is not None
if has_base_image:
obj = img
# cant remove auxiliary otherwise inplace modification doesnt work
# obj = ub.dict_diff(img, {'auxiliary'})
yield obj
for obj in img.get('auxiliary', []):
yield obj
[docs] def delay(self, channels=None, space='image', bundle_dpath=None):
"""
Experimental method
Args:
gid (int): image id to load
channels (FusedChannelSpec): specific channels to load.
if unspecified, all channels are loaded.
space (str):
can either be "image" for loading in image space, or
"video" for loading in video space.
TODO:
- [X] Currently can only take all or none of the channels from each
base-image / auxiliary dict. For instance if the main image is
r|g|b you can't just select g|b at the moment.
- [X] The order of the channels in the delayed load should
match the requested channel order.
- [ ] TODO: add nans to bands that don't exist or throw an error
- [ ] This function could stand to have a better name. Maybe imread
with a delayed=True flag? Or maybe just delayed_load?
Example:
>>> from kwcoco.coco_image import * # NOQA
>>> import kwcoco
>>> gid = 1
>>> #
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> self = CocoImage(dset.imgs[gid], dset)
>>> delayed = self.delay()
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize()))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> #
>>> dset = kwcoco.CocoDataset.demo('shapes8')
>>> delayed = dset.delayed_load(gid)
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize()))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> crop = delayed.delayed_crop((slice(0, 3), slice(0, 3)))
>>> crop.finalize()
>>> crop.finalize(as_xarray=True)
>>> # TODO: should only select the "red" channel
>>> dset = kwcoco.CocoDataset.demo('shapes8')
>>> delayed = CocoImage(dset.imgs[gid], dset).delay(channels='r')
>>> import kwcoco
>>> gid = 1
>>> #
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = dset.delayed_load(gid, channels='B1|B2', space='image')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> delayed = dset.delayed_load(gid, channels='B1|B2|B11', space='image')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> delayed = dset.delayed_load(gid, channels='B8|B1', space='video')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
>>> delayed = dset.delayed_load(gid, channels='B8|foo|bar|B1', space='video')
>>> print('delayed = {!r}'.format(delayed))
>>> print('delayed.finalize() = {!r}'.format(delayed.finalize(as_xarray=True)))
Example:
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo()
>>> coco_img = dset.coco_image(1)
>>> # Test case where nothing is registered in the dataset
>>> delayed = coco_img.delay()
>>> final = delayed.finalize()
>>> assert final.shape == (512, 512, 3)
"""
from kwcoco.util.util_delayed_poc import DelayedChannelConcat
from kwcoco.util.util_delayed_poc import DelayedNans
from kwimage.transform import Affine
from kwcoco.channel_spec import FusedChannelSpec
if bundle_dpath is None:
bundle_dpath = self.dset.bundle_dpath
img = self.img
requested = channels
if requested is not None:
requested = FusedChannelSpec.coerce(requested).normalize()
chan_list = []
# Get info about the primary image and check if its channels are
# requested (if it even has any)
info = img_info = _delay_load_imglike(bundle_dpath, img)
if info.get('chan_construct', None) is not None:
include_flag = requested is None
if not include_flag:
if requested.intersection(info['channels']):
include_flag = True
if include_flag:
chncls, chnkw = info['chan_construct']
chan = chncls(**chnkw)
chan_list.append(chan)
for aux in img.get('auxiliary', []):
info = _delay_load_imglike(bundle_dpath, aux)
include_flag = requested is None
if not include_flag:
if requested.intersection(info['channels']):
include_flag = True
if include_flag:
aux_to_img = Affine.coerce(aux.get('warp_aux_to_img', None))
chncls, chnkw = info['chan_construct']
chan = chncls(**chnkw)
chan = chan.delayed_warp(
aux_to_img, dsize=img_info['dsize'])
chan_list.append(chan)
if space == 'video':
vidid = img['video_id']
video = self.dset.index.videos[vidid]
width = video.get('width', img.get('width', None))
height = video.get('height', img.get('height', None))
else:
width = img.get('width', None)
height = img.get('height', None)
dsize = (width, height)
if len(chan_list) == 0:
if requested is not None:
# Handle case where the image doesnt have the requested
# channels.
delayed = DelayedNans(dsize=dsize, channels=requested)
return delayed
else:
raise ValueError('no data registered in kwcoco image')
else:
delayed = DelayedChannelConcat(chan_list)
# Reorder channels in the requested order
if requested is not None:
delayed = delayed.take_channels(requested)
if hasattr(delayed, 'components'):
if len(delayed.components) == 1:
delayed = delayed.components[0]
if space == 'image':
pass
elif space == 'video':
img_to_vid = Affine.coerce(img.get('warp_img_to_vid', None))
delayed = delayed.delayed_warp(img_to_vid, dsize=dsize)
else:
raise KeyError('space = {}'.format(space))
return delayed
[docs]def _delay_load_imglike(bundle_dpath, obj):
from kwcoco.util.util_delayed_poc import DelayedLoad
from os.path import join
from kwcoco.channel_spec import FusedChannelSpec
info = {}
fname = obj.get('file_name', None)
channels_ = obj.get('channels', None)
if channels_ is not None:
channels_ = FusedChannelSpec.coerce(channels_)
channels_ = channels_.normalize()
info['channels'] = channels_
width = obj.get('width', None)
height = obj.get('height', None)
if height is not None and width is not None:
info['dsize'] = dsize = (width, height)
else:
info['dsize'] = dsize = (None, None)
if fname is not None:
info['fpath'] = fpath = join(bundle_dpath, fname)
# Delaying this gives us a small speed boost
info['chan_construct'] = (DelayedLoad, dict(
fpath=fpath, channels=channels_, dsize=dsize))
return info