Source code for delayed_image.delayed_nodes

"""
Intermediate operations
"""
import kwarray
import kwimage
import copy
import numpy as np
import ubelt as ub
import warnings
from delayed_image import channel_spec
from delayed_image.delayed_base import DelayedNaryOperation, DelayedUnaryOperation
from delayed_image import delayed_leafs


try:
    from xdev import profile
except Exception:
    from ubelt import identity as profile

# --------
# Stacking
# --------

__docstubs__ = """
from delayed_image.channel_spec import FusedChannelSpec
from delayed_image.delayed_leafs import DelayedIdentity
from delayed_image.delayed_base import DelayedOperation
"""

TRACE_OPTIMIZE = 0  # TODO: make this a local setting


[docs] class DelayedArray(DelayedUnaryOperation): """ A generic NDArray. """ def __init__(self, subdata=None): """ Args: subdata (DelayedArray): """ super().__init__(subdata=subdata) def __nice__(self): """ Returns: str """ return '{}'.format(self.shape) @property def shape(self): """ Returns: None | Tuple[int | None, ...] """ shape = self.subdata.shape return shape
[docs] class DelayedStack(DelayedNaryOperation): """ Stacks multiple arrays together. """ def __init__(self, parts, axis): """ Args: parts (List[DelayedArray]): data to stack axis (int): axes to stack on """ raise NotImplementedError super().__init__(parts=parts) self.meta['axis'] = axis def __nice__(self): """ Returns: str """ return '{}'.format(self.shape) @property def shape(self): """ Returns: None | Tuple[int | None, ...] """ shape = self.subdata.shape return shape
[docs] class DelayedConcat(DelayedNaryOperation): """ Stacks multiple arrays together. """ def __init__(self, parts, axis): """ Args: parts (List[DelayedArray]): data to concat axis (int): axes to concat on """ super().__init__(parts=parts) self.meta['axis'] = axis def __nice__(self): return '{}'.format(self.shape) @property def shape(self): """ Returns: None | Tuple[int | None, ...] """ shape = self.subdata.shape return shape
[docs] class DelayedFrameStack(DelayedStack): """ Stacks multiple arrays together. """ def __init__(self, parts): """ Args: parts (List[DelayedArray]): data to stack """ raise NotImplementedError super().__init__(parts=parts, axis=0)
# ------ # Images # ------
[docs] class ImageOpsMixin:
[docs] @profile def crop(self, space_slice=None, chan_idxs=None, clip=True, wrap=True, pad=0): """ Crops an image along integer pixel coordinates. Args: space_slice (Tuple[slice, slice]): y-slice and x-slice. chan_idxs (List[int]): indexes of bands to take clip (bool): if True, the slice is interpreted normally, where it won't go past the image extent, otherwise slicing into negative regions or past the image bounds will result in padding. Defaults to True. wrap (bool): if True, negative indexes "wrap around", otherwise they are treated as is. Defaults to True. pad (int | List[Tuple[int, int]]): if specified, applies extra padding Returns: DelayedImage Example: >>> from delayed_image import DelayedLoad >>> import kwimage >>> self = DelayedLoad.demo().prepare() >>> self = self.dequantize({'quant_max': 255}) >>> self = self.warp({'scale': 1 / 2}) >>> pad = 0 >>> h, w = space_dims = self.dsize[::-1] >>> grid = list(ub.named_product({ >>> 'left': [0, -64], 'right': [0, 64], >>> 'top': [0, -64], 'bot': [0, 64],})) >>> grid += [ >>> {'left': 64, 'right': -64, 'top': 0, 'bot': 0}, >>> {'left': 64, 'right': 64, 'top': 0, 'bot': 0}, >>> {'left': 0, 'right': 0, 'top': 64, 'bot': -64}, >>> {'left': 64, 'right': -64, 'top': 64, 'bot': -64}, >>> ] >>> crops = [] >>> for pads in grid: >>> space_slice = (slice(pads['top'], h + pads['bot']), >>> slice(pads['left'], w + pads['right'])) >>> delayed = self.crop(space_slice) >>> crop = delayed.finalize() >>> yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0] >>> title = '[{}:{}, {}:{}]'.format(*yyxx) >>> crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray') >>> crops.append(crop_canvas) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen') >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas, title='Normal Slicing: Cropped Images With Wrap+Clipped Slices', doclf=1, fnum=1) >>> kwplot.show_if_requested() Example: >>> # Demo the case with pads / no-clips / no-wraps >>> from delayed_image import DelayedLoad >>> import kwimage >>> self = DelayedLoad.demo().prepare() >>> self = self.dequantize({'quant_max': 255}) >>> self = self.warp({'scale': 1 / 2}) >>> pad = [(64, 128), (32, 96)] >>> pad = [(0, 20), (0, 0)] >>> pad = 0 >>> pad = 8 >>> h, w = space_dims = self.dsize[::-1] >>> grid = list(ub.named_product({ >>> 'left': [0, -64], 'right': [0, 64], >>> 'top': [0, -64], 'bot': [0, 64],})) >>> grid += [ >>> {'left': 64, 'right': -64, 'top': 0, 'bot': 0}, >>> {'left': 64, 'right': 64, 'top': 0, 'bot': 0}, >>> {'left': 0, 'right': 0, 'top': 64, 'bot': -64}, >>> {'left': 64, 'right': -64, 'top': 64, 'bot': -64}, >>> ] >>> crops = [] >>> for pads in grid: >>> space_slice = (slice(pads['top'], h + pads['bot']), >>> slice(pads['left'], w + pads['right'])) >>> delayed = self._padded_crop(space_slice, pad=pad) >>> crop = delayed.finalize(optimize=1) >>> yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0] >>> title = '[{}:{}, {}:{}]'.format(*yyxx) >>> if pad: >>> title += f'{chr(10)}pad={pad}' >>> crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray') >>> crops.append(crop_canvas) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen', resize='smaller') >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas, title='Negative Slicing: Cropped Images With clip=False wrap=False', doclf=1, fnum=2) >>> kwplot.show_if_requested() """ if not clip or not wrap or pad: if clip or wrap: raise NotImplementedError( ub.paragraph( ''' Currently, in "negative slice mode" both clip and wrap params must be set to False if padding is given or either of clip or wrap is False. ''')) # Currently padding doesn't really work with crops, so its not # efficient, but we can hack it to work with warps. new = self._padded_crop(space_slice, pad=pad) else: # Normal efficient case new = DelayedCrop(self, space_slice, chan_idxs) return new
[docs] @profile def _padded_crop(self, space_slice, pad=0): """ Does the type of padded crop we want, but inefficiently using a warp. Reimplementing would be good, but this is good enough for now. """ if self.dsize is None: raise Exception('dsize must be populated to do a padded crop') data_dims = self.dsize[::-1] _data_slice, _extra_padding = kwarray.embed_slice( space_slice, data_dims, pad) offset_d0, extra_d0 = _extra_padding[0] offset_d1, extra_d1 = _extra_padding[1] pad_warp = {'offset': (offset_d1, offset_d0)} data_crop_box = kwimage.Boxes.from_slice( _data_slice, clip=False, wrap=False) dsize = (int(data_crop_box.width.ravel()[0] + offset_d1 + extra_d1), int(data_crop_box.height.ravel()[0] + offset_d0 + extra_d0)) new = self.crop(_data_slice) if any([offset_d0, extra_d0, offset_d1, extra_d1]): # Use a warp to accomplish padding. # Having an explicit padding node would be better. new = new.warp(pad_warp, dsize=dsize) return new
[docs] def warp(self, transform, dsize='auto', **warp_kwargs): """ Applys an affine transformation to the image. See :class:`DelayedWarp`. Args: transform (ndarray | dict | kwimage.Affine): a coercable affine matrix. See :class:`kwimage.Affine` for details on what can be coerced. dsize (Tuple[int, int] | str): The width / height of the output canvas. If 'auto', dsize is computed such that the positive coordinates of the warped image will fit in the new canvas. In this case, any pixel that maps to a negative coordinate will be clipped. This has the property that the input transformation is not modified. antialias (bool): if True determines if the transform is downsampling and applies antialiasing via gaussian a blur. Defaults to False interpolation (str): interpolation code or cv2 integer. Interpolation codes are linear, nearest, cubic, lancsoz, and area. Defaults to "linear". border_value (int | float | str): if auto will be nan for float and 0 for int. noop_eps (float): This is the tolerance for optimizing a warp away. If the transform has all of its decomposed parameters (i.e. scale, rotation, translation, shear) less than this value, the warp node can be optimized away. Defaults to 0. Returns: DelayedImage """ new = DelayedWarp(self, transform, dsize=dsize, **warp_kwargs) return new
[docs] def scale(self, scale, dsize='auto', **warp_kwargs): """ An alias for self.warp({"scale": scale}, ...) """ transform = {'scale': scale} return self.warp(transform, dsize=dsize, **warp_kwargs)
[docs] def resize(self, dsize, **warp_kwargs): """ Resize an image to a specific width/height by scaling it. """ old_dsize = np.array(self.dsize) new_dsize = np.array(dsize) scale = new_dsize / old_dsize transform = {'scale': scale} return self.warp(transform, dsize=dsize, **warp_kwargs)
[docs] def dequantize(self, quantization): """ Rescales image intensities from int to floats. Args: quantization (Dict[str, Any]): quantization information dictionary to undo. see :func:`delayed_image.helpers.dequantize` Expected keys are: orig_dtype (str) orig_min (float) orig_max (float) quant_min (float) quant_max (float) nodata (None | int) Returns: DelayedDequantize Example: >>> from delayed_image.delayed_leafs import DelayedLoad >>> self = DelayedLoad.demo().prepare() >>> quantization = { >>> 'orig_dtype': 'float32', >>> 'orig_min': 0, >>> 'orig_max': 1, >>> 'quant_min': 0, >>> 'quant_max': 255, >>> 'nodata': None, >>> } >>> new = self.dequantize(quantization) >>> assert self.finalize().max() > 1 >>> assert new.finalize().max() <= 1 """ new = DelayedDequantize(self, quantization) return new
[docs] def get_overview(self, overview): """ Downsamples an image by a factor of two. Args: overview (int): the overview to use (assuming it exists) Returns: DelayedOverview """ new = DelayedOverview(self, overview) return new
[docs] def as_xarray(self): """ Returns: DelayedAsXarray """ return DelayedAsXarray(self)
[docs] def get_transform_from(self, src): """ Find a transform from a given node (src) to this node (self / dst). Given two delayed images src and dst that share a common leaf, find the transform from src to dst. Args: src (DelayedOperation): the other view to get a transform to. This must share a leaf with self (which is the dst). Returns: kwimage.Affine: The transform that warps the space of src to the space of self. Example: >>> from delayed_image import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> base = DelayedLoad.demo().prepare() >>> src = base.scale(2) >>> dst = src.warp({'scale': 4, 'offset': (3, 5)}) >>> transform = dst.get_transform_from(src) >>> tf = transform.decompose() >>> assert tf['scale'] == (4, 4) >>> assert tf['offset'] == (3, 5) Example: >>> from delayed_image import demo >>> self = demo.non_aligned_leafs() >>> leaf = list(self._leaf_paths())[0][0] >>> tf1 = self.get_transform_from(leaf) >>> tf2 = leaf.get_transform_from(self) >>> np.allclose(np.linalg.inv(tf2), tf1) """ dst = self try: # Case where there is one known leaf src_from_leaf = src.get_transform_from_leaf() dst_from_leaf = dst.get_transform_from_leaf() except AttributeError: # This seems more robust src_leaf_paths = ub.udict({id(k): v for k, v in src._leaf_paths()}) dst_leaf_paths = ub.udict({id(k): v for k, v in dst._leaf_paths()}) common_leaf_ids = dst_leaf_paths & src_leaf_paths common_leaf_id = common_leaf_ids.peek_key() src_part = src_leaf_paths[common_leaf_id] dst_part = dst_leaf_paths[common_leaf_id] if 0: # In the case where we have a concatenated set of delayed images we # have to consider a single path from the root to a shared leaf. # This will work for now, but there may be a more efficient / # elegant way to implement it. src_channels = src.channels.to_oset() dst_channels = dst.channels.to_oset() common_channel = ub.peek(src_channels & dst_channels) src_chan = src.take_channels(common_channel) dst_chan = dst.take_channels(common_channel) src_chan = src_chan.optimize() dst_chan = dst_chan.optimize() src_part = src_chan.parts[0] dst_part = dst_chan.parts[0] src_from_leaf = src_part.get_transform_from_leaf() dst_from_leaf = dst_part.get_transform_from_leaf() dst_from_src = dst_from_leaf @ src_from_leaf.inv() return dst_from_src
[docs] class DelayedChannelConcat(ImageOpsMixin, DelayedConcat): """ Stacks multiple arrays together. Example: >>> from delayed_image import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> dsize = (307, 311) >>> c1 = DelayedNans(dsize=dsize, channels='foo') >>> c2 = DelayedLoad.demo('astro', dsize=dsize, channels='R|G|B').prepare() >>> cat = DelayedChannelConcat([c1, c2]) >>> warped_cat = cat.warp({'scale': 1.07}, dsize=(328, 332)) >>> warped_cat._validate() >>> warped_cat.finalize() Example: >>> # Test case that failed in initial implementation >>> # Due to incorrectly pushing channel selection under the concat >>> from delayed_image import * # NOQA >>> import kwimage >>> fpath = kwimage.grab_test_image_fpath() >>> base1 = DelayedLoad(fpath, channels='r|g|b').prepare() >>> base2 = DelayedLoad(fpath, channels='x|y|z').prepare().scale(2) >>> base3 = DelayedLoad(fpath, channels='i|j|k').prepare().scale(2) >>> bands = [base2, base1[:, :, 0].scale(2).evaluate(), >>> base1[:, :, 1].evaluate().scale(2), >>> base1[:, :, 2].evaluate().scale(2), base3] >>> delayed = DelayedChannelConcat(bands) >>> delayed = delayed.warp({'scale': 2}) >>> delayed = delayed[0:100, 0:55, [0, 2, 4]] >>> delayed.write_network_text() >>> delayed.optimize() """ def __init__(self, parts, dsize=None): """ Args: parts (List[DelayedArray]): data to concat dsize (Tuple[int, int] | None): size if known a-priori """ super().__init__(parts=parts, axis=2) if dsize is None: dsize_cands = [comp.dsize for comp in self.parts] if not ub.allsame(dsize_cands): raise CoordinateCompatibilityError( # 'parts must all have the same delayed size') 'parts must all have the same delayed size: got {}'.format(dsize_cands)) if len(dsize_cands) == 0: dsize = None else: dsize = dsize_cands[0] self.dsize = dsize try: self.num_channels = sum(comp.num_channels for comp in self.parts) except TypeError: if any(comp.num_channels is None for comp in self.parts): self.num_channels = None else: raise def __nice__(self): """ Returns: str """ if self.channels is None: return '{}'.format(self.shape) else: return '{}, {}'.format(self.shape, self.channels) @property def channels(self): """ Returns: None | FusedChannelSpec """ import delayed_image sub_channs = [] for comp in self.parts: comp_channels = comp.channels if comp_channels is None: return None sub_channs.append(comp_channels) channs = delayed_image.FusedChannelSpec.concat(sub_channs) return channs @property def shape(self): """ Returns: Tuple[int | None, int | None, int | None] """ w, h = self.dsize return (h, w, self.num_channels)
[docs] def _finalize(self): """ Returns: ArrayLike """ stack = [comp._finalize() for comp in self.parts] if len(stack) == 1: final = stack[0] else: stack = [kwarray.atleast_nd(s, 3) for s in stack] final = np.concatenate(stack, axis=2) return final
[docs] @profile def optimize(self): """ Returns: DelayedImage """ new_parts = [part.optimize() for part in self.parts] kw = ub.dict_isect(self.meta, ['dsize']) new = self.__class__(new_parts, **kw) return new
[docs] @profile def take_channels(self, channels): """ This method returns a subset of the vision data with only the specified bands / channels. Args: channels (List[int] | slice | channel_spec.FusedChannelSpec): List of integers indexes, a slice, or a channel spec, which is typically a pipe (`|`) delimited list of channel codes. See :class:`ChannelSpec` for more detials. Returns: DelayedArray: a delayed vision operation that only operates on the following channels. Example: >>> # xdoctest: +REQUIRES(module:kwcoco) >>> from delayed_image.delayed_nodes import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = delayed = dset.coco_image(1).delay() >>> channels = 'B11|B8|B1|B10' >>> new = self.take_channels(channels) Example: >>> # xdoctest: +REQUIRES(module:kwcoco) >>> # Complex case >>> import kwcoco >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> delayed = dset.coco_image(1).delay() >>> astro = DelayedLoad.demo('astro', channels='r|g|b').prepare() >>> aligned = astro.warp(kwimage.Affine.scale(600 / 512), dsize='auto') >>> self = combo = DelayedChannelConcat(delayed.parts + [aligned]) >>> channels = 'B1|r|B8|g' >>> new = self.take_channels(channels) >>> new_cropped = new.crop((slice(10, 200), slice(12, 350))) >>> new_opt = new_cropped.optimize() >>> datas = new_opt.finalize() >>> if 1: >>> new_cropped.write_network_text(with_labels='name') >>> new_opt.write_network_text(with_labels='name') >>> vizable = kwimage.normalize_intensity(datas, axis=2) >>> self._validate() >>> new._validate() >>> new_cropped._validate() >>> new_opt._validate() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> stacked = kwimage.stack_images(vizable.transpose(2, 0, 1)) >>> kwplot.imshow(stacked) Example: >>> # xdoctest: +REQUIRES(module:kwcoco) >>> # Test case where requested channel does not exist >>> import kwcoco >>> from delayed_image.delayed_nodes import * # NOQA >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', use_cache=1, verbose=100) >>> self = delayed = dset.coco_image(1).delay() >>> channels = 'B1|foobar|bazbiz|B8' >>> new = self.take_channels(channels) >>> new_cropped = new.crop((slice(10, 200), slice(12, 350))) >>> fused = new_cropped.finalize() >>> assert fused.shape == (190, 338, 4) >>> assert np.all(np.isnan(fused[..., 1:3])) >>> assert not np.any(np.isnan(fused[..., 0])) >>> assert not np.any(np.isnan(fused[..., 3])) """ if channels is None: return self current_channels = self.channels if isinstance(channels, list): top_idx_mapping = channels top_codes = self.channels.as_list() request_codes = None else: channels = channel_spec.FusedChannelSpec.coerce(channels) # Computer subindex integer mapping request_codes = channels.as_list() top_codes = current_channels.as_oset() top_idx_mapping = [] for code in request_codes: try: top_idx_mapping.append(top_codes.index(code)) except KeyError: top_idx_mapping.append(None) # Rearange subcomponents into the specified channel representation # I am not confident that this logic is the best way to do this. # This may be a bottleneck subindexer = kwarray.FlatIndexer([ comp.num_channels for comp in self.parts]) curr = None outer_accum = [] for request_idx, idx in enumerate(top_idx_mapping): # If possible, keep track of the channel name this index represents code = None if request_codes is None else request_codes[request_idx] if idx is None: # Requested channel does not exist in our data stack comp = None inner = 0 if curr is not None and curr.comp is None: inner = curr.stop else: # Requested channel exists in our data stack outer, inner = subindexer.unravel(idx) comp = self.parts[outer] # Do we extend the current inner accumulator or start a new one? if curr is None: # Start the first segment curr = _InnerAccumSegment(comp) elif curr.comp is not comp: # accept previous segment and start a new one outer_accum.append(curr) curr = _InnerAccumSegment(comp) # extend this inner segment curr.add_inner(inner, code) # Accumulate final segment if curr is not None: outer_accum.append(curr) # Gather the subcomponents into a new delayed concat new_components = [curr.get_subcomponent(self.dsize) for curr in outer_accum] new = DelayedChannelConcat(new_components) return new
def __getitem__(self, sl): if not isinstance(sl, tuple): raise TypeError('slice must be given as tuple') if len(sl) == 2: sl_y, sl_x = sl chan_idxs = None elif len(sl) == 3: sl_y, sl_x, chan_idxs = sl else: raise ValueError('Slice must have 2 or 3 dims') space_slice = (sl_y, sl_x) return self.crop(space_slice, chan_idxs) @property def num_overviews(self): """ Returns: int """ num_overviews = self.meta.get('num_overviews', None) if num_overviews is None and self.parts is not None: cand = [p.num_overviews for p in self.parts] if ub.allsame(cand): num_overviews = cand[0] else: import warnings warnings.warn('inconsistent overviews') num_overviews = None return num_overviews
[docs] def as_xarray(self): """ Returns: DelayedAsXarray """ return DelayedAsXarray(self)
[docs] def _push_operation_under(self, op, kwargs): # Note: we can't do this with a crop that has band selection # But spatial operations should be ok. return self.__class__([op(p, **kwargs) for p in self.parts])
[docs] def _validate(self): """ Check that the delayed metadata corresponds with the finalized data """ final = self._finalize() # meta_dsize = self.dsize meta_shape = self.shape final_shape = final.shape correspondences = { 'shape': (final_shape, meta_shape) } for k, tup in correspondences.items(): v1, v2 = tup if v1 != v2: raise AssertionError( f'final and meta {k} does not agree {v1!r} != {v2!r}') return self
[docs] def undo_warps(self, remove=None, retain=None, squash_nans=False, return_warps=False): """ Attempts to "undo" warping for each concatenated channel and returns a list of delayed operations that are cropped to the right regions. Typically you will retrain offset, theta, and shear to remove scale. This ensures the data is spatially aligned up to a scale factor. Args: remove (List[str]): if specified, list components of the warping to remove. Can include: "offset", "scale", "shearx", "theta". Typically set this to ["scale"]. retain (List[str]): if specified, list components of the warping to retain. Can include: "offset", "scale", "shearx", "theta". Mutually exclusive with "remove". If neither remove or retain is specified, retain is set to ``[]``. squash_nans (bool): if True, pure nan channels are squashed into a 1x1 array as they do not correspond to a real source. return_warps (bool): if True, return the transforms we applied. I.e. the transform from the ``self`` to the returned ``parts``. This is useful when you need to warp objects in the original space into the jagged space. Returns: List[DelayedImage] | Tuple[List[DelayedImage] | List[kwimage.Affine]]: The List[DelayedImage] are the ``parts`` i.e. the new images with the warping undone. The List[kwimage.Affine]: is the transforms from ``self`` to each item in ``parts`` Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> from delayed_image.delayed_leafs import DelayedNans >>> import ubelt as ub >>> import kwimage >>> import kwarray >>> import numpy as np >>> # Demo case where we have different channels at different resolutions >>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255}) >>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate() >>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate() >>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate() >>> bandN = DelayedNans((600, 600), channels='N') >>> # Make a concatenation of images of different underlying native resolutions >>> delayed_vidspace = DelayedChannelConcat([ >>> bandR.scale(6, dsize=(600, 600)).optimize(), >>> bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize(), >>> bandB.scale(1, dsize=(600, 600)).optimize(), >>> bandN, >>> ]).warp({'scale': 0.7}).optimize() >>> vidspace_box = kwimage.Boxes([[100, 10, 270, 160]], 'ltrb') >>> vidspace_poly = vidspace_box.to_polygons()[0] >>> vidspace_slice = vidspace_box.to_slices()[0] >>> self = delayed_vidspace[vidspace_slice].optimize() >>> print('--- Aligned --- ') >>> self.write_network_text() >>> squash_nans = True >>> undone_all_parts, tfs1 = self.undo_warps(squash_nans=squash_nans, return_warps=True) >>> undone_scale_parts, tfs2 = self.undo_warps(remove=['scale'], squash_nans=squash_nans, return_warps=True) >>> stackable_aligned = self.finalize().transpose(2, 0, 1) >>> stackable_undone_all = [] >>> stackable_undone_scale = [] >>> print('--- Undone All --- ') >>> for undone in undone_all_parts: ... undone.write_network_text() ... stackable_undone_all.append(undone.finalize()) >>> print('--- Undone Scale --- ') >>> for undone in undone_scale_parts: ... undone.write_network_text() ... stackable_undone_scale.append(undone.finalize()) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1) >>> canvas1 = kwimage.stack_images(stackable_undone_all, axis=1) >>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1) >>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Aligned Channels') >>> canvas1 = kwimage.draw_header_text(canvas1, 'Unwarped Channels') >>> canvas2 = kwimage.draw_header_text(canvas2, 'Unscaled Channels') >>> canvas = kwimage.stack_images([canvas0, canvas1, canvas2], axis=0) >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas) """ retain = _rectify_retain(remove, retain) unwarped_parts = [] if return_warps: jagged_warps = [] for part in self.parts: undone_part, undo_warp = part.undo_warp( retain=retain, squash_nans=squash_nans, return_warp=return_warps) unwarped_parts.append(undone_part) jagged_warps.append(undo_warp) return unwarped_parts, jagged_warps else: for part in self.parts: undone_part = part.undo_warp( retain=retain, squash_nans=squash_nans) unwarped_parts.append(undone_part) return unwarped_parts
[docs] class DelayedImage(ImageOpsMixin, DelayedArray): """ For the case where an array represents a 2D image with multiple channels """ def __init__(self, subdata=None, dsize=None, channels=None): """ Args: subdata (DelayedArray): dsize (None | Tuple[int | None, int | None]): overrides subdata dsize channels (None | int | FusedChannelSpec): overrides subdata channels """ super().__init__(subdata) self.channels = channels self.meta['dsize'] = dsize def __nice__(self): """ Returns: str """ if self.channels is None: return '{}'.format(self.shape) else: return '{}, {}'.format(self.shape, self.channels) @property def shape(self): """ Returns: None | Tuple[int | None, int | None, int | None] """ dsize = self.dsize if dsize is None: dsize = (None, None) w, h = dsize c = self.num_channels return (h, w, c) @property def num_channels(self): """ Returns: None | int """ num_channels = self.meta.get('num_channels', None) if num_channels is None and self.subdata is not None: num_channels = self.subdata.num_channels return num_channels @property def dsize(self): """ Returns: None | Tuple[int | None, int | None] """ # return self.meta.get('dsize', None) dsize = self.meta.get('dsize', None) if dsize is None and self.subdata is not None: dsize = self.subdata.dsize return dsize @property def channels(self): """ Returns: None | FusedChannelSpec """ channels = self.meta.get('channels', None) if channels is None and self.subdata is not None: channels = self.subdata.channels return channels @channels.setter def channels(self, channels): if channels is None: num_channels = None else: if isinstance(channels, int): num_channels = channels channels = None else: import delayed_image channels = delayed_image.FusedChannelSpec.coerce(channels) num_channels = channels.normalize().numel() self.meta['channels'] = channels self.meta['num_channels'] = num_channels @property def num_overviews(self): """ Returns: int """ num_overviews = self.meta.get('num_overviews', None) if num_overviews is None and self.subdata is not None: num_overviews = self.subdata.num_overviews return num_overviews def __getitem__(self, sl): if not isinstance(sl, tuple): raise TypeError('slice must be given as tuple') if len(sl) == 2: sl_y, sl_x = sl chan_idxs = None elif len(sl) == 3: sl_y, sl_x, chan_idxs = sl else: raise ValueError('Slice must have 2 or 3 dims') space_slice = (sl_y, sl_x) return self.crop(space_slice, chan_idxs)
[docs] def take_channels(self, channels): """ This method returns a subset of the vision data with only the specified bands / channels. Args: channels (List[int] | slice | channel_spec.FusedChannelSpec): List of integers indexes, a slice, or a channel spec, which is typically a pipe (`|`) delimited list of channel codes. See ChannelSpec for more detials. Returns: DelayedCrop: a new delayed load with a fused take channel operation Note: The channel subset must exist here or it will raise an error. A better implementation (via pymbolic) might be able to do better Example: >>> # >>> # Test Channel Select Via Code >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare() >>> channels = 'r|b' >>> new = self.take_channels(channels)._validate() >>> new2 = new[:, :, [1, 0]]._validate() >>> new3 = new2[:, :, [1]]._validate() Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo('astro').prepare() >>> channels = [2, 0] >>> new = self.take_channels(channels) >>> new3 = new.take_channels([1, 0]) >>> new._validate() >>> new3._validate() >>> final1 = self.finalize() >>> final2 = new.finalize() >>> final3 = new3.finalize() >>> assert np.all(final1[..., 2] == final2[..., 0]) >>> assert np.all(final1[..., 0] == final2[..., 1]) >>> assert final2.shape[2] == 2 >>> assert np.all(final1[..., 2] == final3[..., 1]) >>> assert np.all(final1[..., 0] == final3[..., 0]) >>> assert final3.shape[2] == 2 Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare() >>> # Case where a channel doesn't exist >>> channels = 'r|b|magic' >>> new = self.take_channels(channels) >>> assert len(new.parts) == 2 >>> new._validate() """ if isinstance(channels, list): top_idx_mapping = channels else: current_channels = self.channels if current_channels is None: raise ValueError( 'The channel spec for this node are unknown. ' 'Cannot use a spec to select them' ) channels = channel_spec.FusedChannelSpec.coerce(channels) # Computer subindex integer mapping request_codes = channels.as_list() top_codes = current_channels.as_oset() try: top_idx_mapping = [ top_codes.index(code) for code in request_codes ] except KeyError: # If a requested channel doesn't exist we have to break this # node up into a concat node with nans wrp = DelayedChannelConcat([self], dsize=self.dsize) new = wrp.take_channels(channels) return new new_chan_ixs = top_idx_mapping new = self.crop(None, new_chan_ixs) return new
[docs] def _validate(self): """ Check that the delayed metadata corresponds with the finalized data """ opt = self.optimize() opt_shape = opt.shape final = self._finalize() # meta_dsize = self.dsize meta_shape = self.shape final_shape = final.shape correspondences = { 'opt_chans': (self.channels, opt.channels), 'opt_nbands': (self.num_channels, opt.num_channels), 'final_shape': (final_shape, meta_shape), 'opt_shape': (opt_shape, meta_shape), } for k, tup in correspondences.items(): v1, v2 = tup if v1 != v2: raise AssertionError( f'final and meta {k} does not agree {v1!r} != {v2!r}') return self
[docs] def _transform_from_subdata(self): raise NotImplementedError
[docs] def get_transform_from_leaf(self): """ Returns the transformation that would align data with the leaf """ subdata_from_leaf = self.subdata.get_transform_from_leaf() self_from_subdata = self._transform_from_subdata() self_from_leaf = self_from_subdata @ subdata_from_leaf return self_from_leaf
[docs] def evaluate(self): """ Evaluate this node and return the data as an identity. Returns: DelayedIdentity """ final = self.finalize() new = delayed_leafs.DelayedIdentity(final, dsize=self.dsize, channels=self.channels) return new
[docs] @profile def _opt_push_under_concat(self): """ Push this node under its child node if it is a concatenation operation """ assert isinstance2(self.subdata, DelayedChannelConcat) kwargs = ub.compatible(self.meta, self.__class__.__init__) new = self.subdata._push_operation_under(self.__class__, kwargs) return new
[docs] def undo_warp(self, remove=None, retain=None, squash_nans=False, return_warp=False): """ Attempts to "undo" warping for each concatenated channel and returns a list of delayed operations that are cropped to the right regions. Typically you will retrain offset, theta, and shear to remove scale. This ensures the data is spatially aligned up to a scale factor. Args: remove (List[str]): if specified, list components of the warping to remove. Can include: "offset", "scale", "shearx", "theta". Typically set this to ["scale"]. retain (List[str]): if specified, list components of the warping to retain. Can include: "offset", "scale", "shearx", "theta". Mutually exclusive with "remove". If neither remove or retain is specified, retain is set to ``[]``. squash_nans (bool): if True, pure nan channels are squashed into a 1x1 array as they do not correspond to a real source. return_warp (bool): if True, return the transform we applied. This is useful when you need to warp objects in the original space into the jagged space. SeeAlso: DelayedChannelConcat.undo_warps Example: >>> # Test similar to undo_warps, but on each channel separately >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> from delayed_image.delayed_leafs import DelayedNans >>> import ubelt as ub >>> import kwimage >>> import kwarray >>> import numpy as np >>> # Demo case where we have different channels at different resolutions >>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255}) >>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate() >>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate() >>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate() >>> bandN = DelayedNans((600, 600), channels='N') >>> B0 = bandR.scale(6, dsize=(600, 600)).optimize() >>> B1 = bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize() >>> B2 = bandB.scale(1, dsize=(600, 600)).optimize() >>> vidspace_box = kwimage.Boxes([[-10, -10, 270, 160]], 'ltrb').scale(1 / .7).quantize() >>> vidspace_poly = vidspace_box.to_polygons()[0] >>> vidspace_slice = vidspace_box.to_slices()[0] >>> # Test with the padded crop >>> self0 = B0.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize() >>> self1 = B1.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize() >>> self2 = B2.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize() >>> parts = [self0, self1, self2] >>> # Run the undo on each channel >>> undone_scale_parts = [d.undo_warp(remove=['scale']) for d in parts] >>> print('--- Aligned --- ') >>> stackable_aligned = [] >>> for d in parts: >>> d.write_network_text() >>> stackable_aligned.append(d.finalize()) >>> print('--- Undone Scale --- ') >>> stackable_undone_scale = [] >>> for undone in undone_scale_parts: ... undone.write_network_text() ... stackable_undone_scale.append(undone.finalize()) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1, pad=5, bg_value='kw_darkgray') >>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1, pad=5, bg_value='kw_darkgray') >>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Channels') >>> canvas2 = kwimage.draw_header_text(canvas2, 'Native Scale Channels') >>> canvas = kwimage.stack_images([canvas0, canvas2], axis=0, bg_value='kw_darkgray') >>> canvas = kwimage.fill_nans_with_checkers(canvas) >>> kwplot.imshow(canvas) """ retain = _rectify_retain(remove, retain) part = self tf_root_from_leaf = part.get_transform_from_leaf() tf_leaf_from_root = tf_root_from_leaf.inv() undo_all = tf_leaf_from_root all_undo_components = undo_all.concise() undo_components = ub.dict_diff(all_undo_components, retain) undo_warp = kwimage.Affine.coerce(undo_components) undone_part = part.warp(undo_warp).optimize() if squash_nans: if return_warp: # hack the return undo_warp w, h = undone_part.dsize undo_warp = kwimage.Affine.scale((1 / w, 1 / h)) @ undo_warp if isinstance2(undone_part, delayed_leafs.DelayedNans): undone_part = undone_part[0:1, 0:1].optimize() if return_warp: return undone_part, undo_warp else: return undone_part
[docs] class DelayedAsXarray(DelayedImage): """ Casts the data to an xarray object in the finalize step Example; >>> # xdoctest: +REQUIRES(module:xarray) >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> # without channels >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> self = base.as_xarray() >>> final = self._validate().finalize() >>> assert len(final.coords) == 0 >>> assert final.dims == ('y', 'x', 'c') >>> # with channels >>> base = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare() >>> self = base.as_xarray() >>> final = self._validate().finalize() >>> assert final.coords.indexes['c'].tolist() == ['r', 'g', 'b'] >>> assert final.dims == ('y', 'x', 'c') """
[docs] def _finalize(self): """ Returns: ArrayLike """ import xarray as xr subfinal = np.asarray(self.subdata._finalize()) channels = self.subdata.channels coords = {} if channels is not None: coords['c'] = channels.code_list() final = xr.DataArray(subfinal, dims=('y', 'x', 'c'), coords=coords) return final
[docs] @profile def optimize(self): """ Returns: DelayedImage """ return self.subdata.optimize().as_xarray()
[docs] class DelayedWarp(DelayedImage): """ Applies an affine transform to an image. Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> self = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> warp1 = self.warp({'scale': 3}) >>> warp2 = warp1.warp({'theta': 0.1}) >>> warp3 = warp2._opt_fuse_warps() >>> warp3._validate() >>> print(ub.urepr(warp2.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(warp3.nesting(), nl=-1, sort=0)) """ def __init__(self, subdata, transform, dsize='auto', antialias=True, interpolation='linear', border_value='auto', noop_eps=0): """ Args: subdata (DelayedArray): data to operate on transform (ndarray | dict | kwimage.Affine): a coercable affine matrix. See :class:`kwimage.Affine` for details on what can be coerced. dsize (Tuple[int, int] | str): The width / height of the output canvas. If 'auto', dsize is computed such that the positive coordinates of the warped image will fit in the new canvas. In this case, any pixel that maps to a negative coordinate will be clipped. This has the property that the input transformation is not modified. antialias (bool): if True determines if the transform is downsampling and applies antialiasing via gaussian a blur. Defaults to False interpolation (str): interpolation code or cv2 integer. Interpolation codes are linear, nearest, cubic, lancsoz, and area. Defaults to "linear". noop_eps (float): This is the tolerance for optimizing a warp away. If the transform has all of its decomposed parameters (i.e. scale, rotation, translation, shear) less than this value, the warp node can be optimized away. Defaults to 0. """ super().__init__(subdata) transform = kwimage.Affine.coerce(transform) if dsize == 'auto': from delayed_image.helpers import _auto_dsize dsize = _auto_dsize(transform, self.subdata.dsize) self.meta['transform'] = transform self.meta['dsize'] = dsize self.meta['antialias'] = antialias self.meta['interpolation'] = interpolation self.meta['border_value'] = border_value self.meta['noop_eps'] = noop_eps # Mark which keys need to be passed around and for what reason self._data_keys = ['transform', 'dsize'] self._algo_keys = [ 'interpolation', 'antialias', 'border_value', 'noop_eps'] @property def transform(self): """ Returns: kwimage.Affine """ return self.meta['transform']
[docs] def _finalize(self): """ Returns: ArrayLike """ dsize = self.dsize if dsize == (None, None): dsize = None antialias = self.meta['antialias'] transform = self.meta['transform'] interpolation = self.meta['interpolation'] prewarp = self.subdata._finalize() prewarp = np.asanyarray(prewarp) # TODO: we could configure this, but forcing nans on floats seems like # a pretty nice default border behavior. It would be even nicer to have # masked arrays for ints. # The scalar / explicit functionality will be handled inside warp_affine # in the future, so some of this can be removed. num_chan = kwimage.num_channels(prewarp) if self.meta['border_value'] == 'auto': if prewarp.dtype.kind == 'f': border_value = np.nan else: if isinstance(prewarp, np.ma.MaskedArray): border_value = int(prewarp.fill_value) else: border_value = 0 else: border_value = self.meta['border_value'] if not ub.iterable(border_value): # Odd OpenCV behavior: https://github.com/opencv/opencv/issues/22283 # Can only have at most 4 components to border_value and # then they start to wrap around. This is fine if we are only # specifying a single number for all channels border_value = (border_value,) * min(4, num_chan) if len(border_value) > 4: raise ValueError('borderValue cannot have more than 4 components. ' 'OpenCV #22283 describes why') # HACK: # the border value only correctly applies to the first 4 channels for # whatever reason. border_value = border_value[0:4] from delayed_image.helpers import _ensure_valid_dsize dsize = _ensure_valid_dsize(dsize) M = np.asarray(transform) final = kwimage.warp_affine(prewarp, M, dsize=dsize, interpolation=interpolation, antialias=antialias, border_value=border_value) # final = kwimage.warp_projective(sub_data_, M, dsize=dsize, flags=flags) # Ensure that the last dimension is channels final = kwarray.atleast_nd(final, 3, front=False) return final
[docs] @profile def optimize(self): """ Returns: DelayedImage Example: >>> # Demo optimization that removes a noop warp >>> from delayed_image import DelayedLoad >>> import kwimage >>> base = DelayedLoad.demo(channels='r|g|b').prepare() >>> self = base.warp(kwimage.Affine.eye()) >>> new = self.optimize() >>> assert len(self.as_graph().nodes) == 2 >>> assert len(new.as_graph().nodes) == 1 Example: >>> # Test optimize nans >>> from delayed_image import DelayedNans >>> import kwimage >>> base = DelayedNans(dsize=(100, 100), channels='a|b|c') >>> self = base.warp(kwimage.Affine.scale(0.1)) >>> # Should simply return a new nan generator >>> new = self.optimize() >>> assert len(new.as_graph().nodes) == 1 Example: >>> # Test optimize nans >>> from delayed_image import DelayedLoad >>> import kwimage >>> base = DelayedLoad.demo(channels='r|g|b').prepare() >>> transform = kwimage.Affine.scale(1.0 + 1e-7) >>> self = base.warp(transform, dsize=base.dsize) >>> # An optimize will not remove a warp if there is any >>> # doubt if it is the identity. >>> new = self.optimize() >>> assert len(self.as_graph().nodes) == 2 >>> assert len(new.as_graph().nodes) == 2 >>> # But we can specify a threshold where it will >>> self._set_nested_params(noop_eps=1e-6) >>> new = self.optimize() >>> assert len(self.as_graph().nodes) == 2 >>> assert len(new.as_graph().nodes) == 1 """ new = copy.copy(self) new.subdata = self.subdata.optimize() if isinstance2(new.subdata, DelayedWarp): new = new._opt_fuse_warps() # Check if the transform is close enough to identity to be considered # negligable. noop_eps = new.meta['noop_eps'] is_negligable = ( new.dsize == new.subdata.dsize and new.transform.isclose_identity(rtol=noop_eps, atol=noop_eps) ) if is_negligable: new = new.subdata if TRACE_OPTIMIZE: new._opt_logs.append('Contract identity warp') elif isinstance2(new.subdata, DelayedChannelConcat): new = new._opt_push_under_concat().optimize() elif hasattr(new.subdata, '_optimized_warp'): # The subdata knows how to optimize itself wrt a warp warp_kwargs = ub.dict_isect( self.meta, self._data_keys + self._algo_keys) new = new.subdata._optimized_warp(**warp_kwargs).optimize() else: split = new._opt_split_warp_overview() if new is not split: new = split new.subdata = new.subdata.optimize() new = new.optimize() else: new = new._opt_absorb_overview() return new
[docs] def _transform_from_subdata(self): return self.transform
[docs] @profile def _opt_fuse_warps(self): """ Combine two consecutive warps into a single operation. """ assert isinstance2(self.subdata, DelayedWarp) inner_data = self.subdata.subdata tf1 = self.subdata.meta['transform'] tf2 = self.meta['transform'] # TODO: could ensure the metadata is compatable, for now just take the # most recent dsize = self.meta['dsize'] common_meta = ub.dict_isect(self.meta, self._algo_keys) new_transform = tf2 @ tf1 new = self.__class__(inner_data, new_transform, dsize=dsize, **common_meta) if TRACE_OPTIMIZE: new._opt_logs.append('Fused warps') return new
[docs] @profile def _opt_absorb_overview(self): """ Remove any deeper overviews that would be undone by this warp. Given this warp node, if it has a scale component could undo an overview (i.e. the scale factor is greater than 2), we want to: 1. determine if there is an overview deeper in the tree. 2. remove that overview and that scale factor from this warp 3. modify any intermediate nodes that will be changed by having the deeper overview removed. NOTE: This optimization is currently the most dubious one in the code, and is likely where some of the bugs are coming from. Help wanted. CommandLine: xdoctest -m delayed_image.delayed_nodes DelayedWarp._opt_absorb_overview Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> import kwimage >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> base = DelayedLoad(fpath, channels='r|g|b').prepare() >>> # Case without any operations between the overview and warp >>> self = base.get_overview(1).warp({'scale': 4}) >>> self.write_network_text() >>> opt = self._opt_absorb_overview()._validate() >>> opt.write_network_text() >>> opt_data = [d for n, d in opt.as_graph().nodes(data=True)] >>> assert 'DelayedOverview' not in [d['type'] for d in opt_data] >>> # Case with a chain of operations between overview and warp >>> self = base.get_overview(1)[0:101, 0:100].warp({'scale': 4}) >>> self.write_network_text() >>> opt = self._opt_absorb_overview()._validate() >>> opt.write_network_text() >>> opt_data = [d for n, d in opt.as_graph().nodes(data=True)] >>> #assert opt_data[1]['meta']['space_slice'] == (slice(0, 202, None), slice(0, 200, None)) >>> assert opt_data[1]['meta']['space_slice'] == (slice(0, 204, None), slice(0, 202, None)) >>> # Any sort of complex chain does prevents this optimization >>> # from running. >>> self = base.get_overview(1)[0:101, 0:100][0:50, 0:50].warp({'scale': 4}) >>> opt = self._opt_absorb_overview()._validate() >>> opt.write_network_text() >>> opt_data = [d for n, d in opt.as_graph().nodes(data=True)] >>> assert 'DelayedOverview' in [d['type'] for d in opt_data] """ DEBUG = 0 # Check if there is a strict downsampling component transform = self.meta['transform'] params = transform.decompose() sx, sy = params['scale'] eps = 1e-8 twoish = (2 - eps) if sx < twoish and sy < twoish: return self # Lookahead to see if there is a nearby overview operation that can be # absorbed, remember the chain of operations between the warp and the # overview, as it will need to be modified. # !!! FIXME !!! # The number of nodes we lookahead is hard coded based on reasonable # cases we expect in the real world. A correct implementation would not # depend on a hard coded number like this. However, this optimization # may be able to be completely refactored to avoid this approach all # together. LOOKAHEAD_NUMBER = 4 parent = self subdata = None chain = [] num_dcrops = 0 for i in range(LOOKAHEAD_NUMBER): subdata = parent.subdata if subdata is None: break elif isinstance2(subdata, DelayedWarp): subdata = None break elif isinstance2(subdata, DelayedOverview): # We found an overview node overview = subdata break elif isinstance2(subdata, DelayedDequantize): pass elif isinstance2(subdata, DelayedCrop): num_dcrops += 1 else: subdata = None break chain.append(subdata) parent = subdata else: subdata = None if subdata is None: return self if num_dcrops > 1: # The following logic does not work there is more than a single # crop between the warp and the overview. Punt and just return. # Might be better to fuse the crops. return self if DEBUG: print('---------') print('ORIG:') self.print_graph() # At this point we have some chain: # [Warp, Something, ..., Overview] # The chain is the [Something, ...] part # Replace the overview node with a warp node that mimics it. # This has no impact on the function of the operation stack. mimic_overview = overview._opt_overview_as_warp() tf1 = mimic_overview.meta['transform'] # Handle any nodes between the warp and the overview. # This should be at most one quantization and one crop operation, # but we may generalize that in the future. if DEBUG: print('chain = {}'.format(ub.urepr(chain, nl=1))) if not chain: # The overview is directly after this warp new_head = mimic_overview.subdata if TRACE_OPTIMIZE: new_head._opt_logs.extend(mimic_overview._opt_logs) new_head._opt_logs.append('_opt_absorb_overview:absorb_neighbor') else: # Copy the chain so this does not mutate the input chain = [copy.copy(n) for n in chain] for u, v in ub.iter_window(chain, 2): u.subdata = v tail = chain[-1] if TRACE_OPTIMIZE: mimic_overview._opt_logs.extend(tail.subdata._opt_logs) tail.subdata = mimic_overview # Check if the tail of the chain is a crop. if hasattr(tail, '_opt_warp_after_crop'): # This modifies the tail so it is now a warp followed by a # crop. Note that the warp may be different than the mimicked # overview, so use this new transform instead. # (Actually, I think this can't make the new crop non integral, # so it probably wont matter) if DEBUG: print('HAS CROP OP') print('Tail') tail.print_graph() modified_tail = tail._opt_warp_after_crop() if DEBUG: print('Modified Tail') modified_tail.print_graph() print('modified_tail = {}'.format(ub.urepr(modified_tail, nl=1))) # Previous code used the "modified tail" to grab the new dsize # that will be applied to the rest of the chain. However, the # modified tail corresponds to the warp we are going to remove. # We need to grab it from the modified crop instead, which is # going to be the child of the modified tail. # Thus we comment out this line: # new_chain_dsize = modified_tail.meta['dsize'] # And use this as the fixed way to grab the dsize. Tests pass, # but I'm leaving this note in here in case this change causes # unforeseen issues. new_chain_dsize = modified_tail.subdata.meta['dsize'] tf1 = modified_tail.meta['transform'] # Remove the modified warp tail_parent = chain[-2] if len(chain) > 1 else self new_tail = modified_tail.subdata if DEBUG: print('tail_parent = {}'.format(ub.urepr(tail_parent, nl=1))) print('new_tail = {}'.format(ub.urepr(new_tail, nl=1))) if TRACE_OPTIMIZE: new_tail._opt_logs.extend(modified_tail._opt_logs) new_tail._opt_logs.extend(tail_parent.subdata._opt_logs) new_tail._opt_logs.append('_opt_absorb_overview:modify-tail') tail_parent.subdata = new_tail chain[-1] = new_tail for notcrop in chain[:-1]: notcrop.meta['dsize'] = new_chain_dsize else: # The chain does not contain a crop operation, we can safely # remove it. Finally remove the overview transform entirely if DEBUG: print('NO CROP OP') tail.subdata = mimic_overview.subdata new_chain_dsize = mimic_overview.subdata.meta['dsize'] for notcrop in chain: notcrop.meta['dsize'] = new_chain_dsize if TRACE_OPTIMIZE: tail._opt_logs.extend(mimic_overview._opt_logs) tail._opt_logs.append('_opt_absorb_overview:safe-to-remove') # The dsize within the chain might be wrong due to our # modification. I **think** its ok to just directly set it to the # new dsize as it should only be operations that do not change the # dsize, but it would be nice to find a more elegant # implementation. if DEBUG: print(f'new_chain_dsize={new_chain_dsize}') for notcrop in chain[:-1]: notcrop.meta['dsize'] = new_chain_dsize new_head = chain[0] if TRACE_OPTIMIZE: new_head._opt_logs.append('_opt_absorb_overview:absorb_chain') warp_meta = ub.dict_isect(self.meta, self._algo_keys) tf2 = self.meta['transform'] dsize = self.meta['dsize'] new_transform = tf2 @ tf1 new = self.__class__(new_head, new_transform, dsize=dsize, **warp_meta) if TRACE_OPTIMIZE: new._opt_logs.append('_opt_absorb_overview:finish') if DEBUG: print('NEW:') new.print_graph() print('---------') return new
[docs] @profile def _opt_split_warp_overview(self): """ Split this node into a warp and an overview if possible Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> import kwimage >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> self = DelayedLoad(fpath, channels='r|g|b').prepare() >>> print(f'self={self}') >>> print('self.meta = {}'.format(ub.urepr(self.meta, nl=1))) >>> warp0 = self.warp({'scale': 0.2}) >>> warp1 = warp0._opt_split_warp_overview() >>> warp2 = self.warp({'scale': 0.25})._opt_split_warp_overview() >>> print(ub.urepr(warp0.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(warp1.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(warp2.nesting(), nl=-1, sort=0)) >>> warp0_nodes = [d['type'] for d in warp0.as_graph().nodes.values()] >>> warp1_nodes = [d['type'] for d in warp1.as_graph().nodes.values()] >>> warp2_nodes = [d['type'] for d in warp2.as_graph().nodes.values()] >>> assert warp0_nodes == ['DelayedWarp', 'DelayedLoad'] >>> assert warp1_nodes == ['DelayedWarp', 'DelayedOverview', 'DelayedLoad'] >>> assert warp2_nodes == ['DelayedOverview', 'DelayedLoad'] Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> import kwimage >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> self = DelayedLoad(fpath, channels='r|g|b').prepare() >>> warp0 = self.warp({'scale': 1 / 2 ** 6}) >>> opt = warp0.optimize() >>> print(ub.urepr(warp0.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(opt.nesting(), nl=-1, sort=0)) >>> warp0_nodes = [d['type'] for d in warp0.as_graph().nodes.values()] >>> opt_nodes = [d['type'] for d in opt.as_graph().nodes.values()] >>> assert warp0_nodes == ['DelayedWarp', 'DelayedLoad'] >>> assert opt_nodes == ['DelayedWarp', 'DelayedOverview', 'DelayedLoad'] """ inner_data = self.subdata num_overviews = inner_data.num_overviews if not num_overviews: return self # Check if there is a strict downsampling component transform = self.meta['transform'] params = transform.decompose() sx, sy = params['scale'] if sx > 0.5 or sy > 0.5: return self # Check how many pyramid downs we could replace downsampling with from kwimage.im_cv2 import _prepare_scale_residual num_downs_possible, _, _ = _prepare_scale_residual(sx, sy, fudge=0) # But only use as many downs as we have overviews num_downs = min(num_overviews, num_downs_possible) if num_downs == 0: return self # Given the overview, find the residual to reconstruct the original overview_transform = kwimage.Affine.scale(1 / (2 ** num_downs)) # Let T=original, O=overview, R=residual # T = R @ O # T @ O.inv = R @ O @ O.inv # T @ O.inv = R residual_transform = transform @ overview_transform.inv() new_transform = residual_transform dsize = self.meta['dsize'] overview = inner_data.get_overview(num_downs) if new_transform.isclose_identity(): new = overview else: common_meta = ub.dict_isect(self.meta, self._algo_keys) new = overview.warp(new_transform, dsize=dsize, **common_meta) if TRACE_OPTIMIZE: new._opt_logs.append('Split overviews') return new
[docs] class DelayedDequantize(DelayedImage): """ Rescales image intensities from int to floats. The output is usually between 0 and 1. This also handles transforming nodata into nan values. """ def __init__(self, subdata, quantization): """ Args: subdata (DelayedArray): data to operate on quantization (Dict): see :func:`delayed_image.helpers.dequantize` """ super().__init__(subdata) self.meta['quantization'] = quantization self.meta['dsize'] = subdata.dsize
[docs] def _finalize(self): """ Returns: ArrayLike """ from delayed_image.helpers import dequantize quantization = self.meta['quantization'] final = self.subdata._finalize() final = kwarray.atleast_nd(final, 3, front=False) if quantization is not None: final = dequantize(final, quantization) return final
[docs] @profile def optimize(self): """ Returns: DelayedImage Example: >>> # Test a case that caused an error in development >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> fpath = kwimage.grab_test_image_fpath() >>> base = DelayedLoad(fpath, channels='r|g|b').prepare() >>> quantization = {'quant_max': 255, 'nodata': 0} >>> self = base.get_overview(1).dequantize(quantization) >>> self.write_network_text() >>> opt = self.optimize() """ new = copy.copy(self) new.subdata = self.subdata.optimize() if isinstance2(new.subdata, DelayedDequantize): raise AssertionError('Dequantization is only allowed once') if isinstance2(new.subdata, DelayedWarp): # Swap order so quantize is before the warp new = new._opt_dequant_before_other() new = new.optimize() if isinstance2(new.subdata, DelayedChannelConcat): new = new._opt_push_under_concat().optimize() return new
[docs] @profile def _opt_dequant_before_other(self): quantization = self.meta['quantization'] new = copy.copy(self.subdata) new.subdata = new.subdata.dequantize(quantization) if TRACE_OPTIMIZE: new._opt_logs.append('_opt_dequant_before_other') return new
[docs] def _transform_from_subdata(self): return kwimage.Affine.eye()
[docs] class DelayedCrop(DelayedImage): """ Crops an image along integer pixel coordinates. Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> # Test Fuse Crops Space Only >>> crop1 = base[4:12, 0:16] >>> self = crop1[2:6, 0:8] >>> opt = self._opt_fuse_crops() >>> self.write_network_text() >>> opt.write_network_text() >>> # >>> # Test Channel Select Via Index >>> self = base[:, :, [0]] >>> self.write_network_text() >>> final = self._finalize() >>> assert final.shape == (16, 16, 1) >>> assert base[:, :, [0, 1]].finalize().shape == (16, 16, 2) >>> assert base[:, :, [2, 0, 1]].finalize().shape == (16, 16, 3) Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image import DelayedLoad >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> # Test Discontiguous Channel Select Via Index >>> self = base[:, :, [0, 2]] >>> self.write_network_text() >>> final = self._finalize() >>> assert final.shape == (16, 16, 2) """ @profile def __init__(self, subdata, space_slice=None, chan_idxs=None): """ Args: subdata (DelayedArray): data to operate on space_slice (Tuple[slice, slice]): if speficied, take this y-slice and x-slice. chan_idxs (List[int] | None): if specified, take these channels / bands """ super().__init__(subdata) # TODO: are we doing infinite padding or clipping? # This assumes clipping in_w, in_h = subdata.dsize if space_slice is not None: space_dims = (in_h, in_w) slice_box = kwimage.Boxes.from_slice( space_slice, space_dims, wrap=True, clip=True) space_slice = slice_box.to_slices()[0] space_slice, _pad = kwarray.embed_slice(space_slice, space_dims) sl_y, sl_x = space_slice[0:2] width = sl_x.stop - sl_x.start height = sl_y.stop - sl_y.start self.meta['dsize'] = (width, height) else: space_slice = (slice(0, in_h), slice(0, in_w)) self.meta['dsize'] = (in_w, in_h) if chan_idxs is not None: current_channels = self.channels if current_channels is not None: new_channels = current_channels[chan_idxs] else: new_channels = len(chan_idxs) self.channels = new_channels self.meta['space_slice'] = space_slice self.meta['chan_idxs'] = chan_idxs
[docs] def _finalize(self): """ Returns: ArrayLike """ space_slice = self.meta['space_slice'] chan_idxs = self.meta['chan_idxs'] sub_final = self.subdata._finalize() if chan_idxs is None: full_slice = space_slice else: full_slice = space_slice + (chan_idxs,) # final = sub_final[space_slice] final = sub_final[full_slice] final = kwarray.atleast_nd(final, 3) return final
[docs] def _transform_from_subdata(self): sl_y, sl_x = self.meta['space_slice'] offset = -sl_x.start, -sl_y.start self_from_subdata = kwimage.Affine.translate(offset) return self_from_subdata
[docs] @profile def optimize(self): """ Returns: DelayedImage Example: >>> # Test optimize nans >>> from delayed_image import DelayedNans >>> import kwimage >>> base = DelayedNans(dsize=(100, 100), channels='a|b|c') >>> self = base[0:10, 0:5] >>> # Should simply return a new nan generator >>> new = self.optimize() >>> self.write_network_text() >>> new.write_network_text() >>> assert len(new.as_graph().nodes) == 1 """ new = copy.copy(self) new.subdata = self.subdata.optimize() if isinstance2(new.subdata, DelayedCrop): new = new._opt_fuse_crops() if hasattr(new.subdata, '_optimized_crop'): # The subdata knows how to optimize itself wrt this node crop_kwargs = ub.dict_isect(self.meta, {'space_slice', 'chan_idxs'}) new = new.subdata._optimized_crop(**crop_kwargs).optimize() if isinstance2(new.subdata, DelayedWarp): new = new._opt_warp_after_crop() new = new.optimize() elif isinstance2(new.subdata, DelayedDequantize): new = new._opt_dequant_after_crop() new = new.optimize() if isinstance2(new.subdata, DelayedChannelConcat): if isinstance2(new, DelayedCrop): # We have to be careful if there we have band selection chan_idxs = new.meta.get('chan_idxs', None) space_slice = new.meta.get('space_slice', None) taken = new.subdata if TRACE_OPTIMIZE: _new_logs = [] if chan_idxs is not None: if TRACE_OPTIMIZE: _new_logs.extend(new.subdata._opt_logs) _new_logs.extend(new._opt_logs) _new_logs.append('concat-chan-crop-interact') taken = new.subdata.take_channels(chan_idxs).optimize() if space_slice is not None: if TRACE_OPTIMIZE: _new_logs.append('concat-space-crop-interact') taken = taken.crop(space_slice)._opt_push_under_concat().optimize() new = taken if TRACE_OPTIMIZE: new._opt_logs.extend(_new_logs) else: new = new._opt_push_under_concat().optimize() return new
[docs] @profile def _opt_fuse_crops(self): """ Combine two consecutive crops into a single operation. Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> # Test Fuse Crops Space Only >>> crop1 = base[4:12, 0:16] >>> crop2 = self = crop1[2:6, 0:8] >>> opt = crop2._opt_fuse_crops() >>> self.write_network_text() >>> opt.write_network_text() >>> opt._validate() >>> self._validate() Example: >>> # Test Fuse Crops Channels Only >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> crop1 = base.crop(chan_idxs=[0, 2, 1]) >>> crop2 = crop1.crop(chan_idxs=[1, 2]) >>> crop3 = self = crop2.crop(chan_idxs=[0, 1]) >>> opt = self._opt_fuse_crops()._opt_fuse_crops() >>> self.write_network_text() >>> opt.write_network_text() >>> finalB = base._validate()._finalize() >>> final1 = opt._validate()._finalize() >>> final2 = self._validate()._finalize() >>> assert np.all(final2[..., 0] == finalB[..., 2]) >>> assert np.all(final2[..., 1] == finalB[..., 1]) >>> assert np.all(final2[..., 0] == final1[..., 0]) >>> assert np.all(final2[..., 1] == final1[..., 1]) Example: >>> # Test Fuse Crops Space And Channels >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> base = DelayedLoad.demo(dsize=(16, 16)).prepare() >>> crop1 = base[4:12, 0:16, [1, 2]] >>> self = crop1[2:6, 0:8, [1]] >>> opt = self._opt_fuse_crops() >>> self.write_network_text() >>> opt.write_network_text() >>> self._validate() >>> crop1._validate() """ assert isinstance2(self.subdata, DelayedCrop) # Inner is the data closer to the leaf (disk), outer is the data closer # to the user (output). inner_data = self.subdata.subdata inner_slices = self.subdata.meta['space_slice'] outer_slices = self.meta['space_slice'] outer_ysl, outer_xsl = outer_slices inner_ysl, inner_xsl = inner_slices # Apply the new relative slice to the current absolute slice new_xstart = min(inner_xsl.start + outer_xsl.start, inner_xsl.stop) new_xstop = min(inner_xsl.start + outer_xsl.stop, inner_xsl.stop) new_ystart = min(inner_ysl.start + outer_ysl.start, inner_ysl.stop) new_ystop = min(inner_ysl.start + outer_ysl.stop, inner_ysl.stop) # Handle bands inner_chan_idxs = self.subdata.meta['chan_idxs'] outer_chan_idxs = self.meta['chan_idxs'] if outer_chan_idxs is None and inner_chan_idxs is None: new_chan_idxs = None elif outer_chan_idxs is None: new_chan_idxs = inner_chan_idxs elif inner_chan_idxs is None: new_chan_idxs = outer_chan_idxs else: new_chan_idxs = list(ub.take(inner_chan_idxs, outer_chan_idxs)) new_crop = (slice(new_ystart, new_ystop), slice(new_xstart, new_xstop)) new = self.__class__(inner_data, new_crop, new_chan_idxs) if TRACE_OPTIMIZE: new._opt_logs.append('fuse crops') return new
[docs] @profile def _opt_warp_after_crop(self): """ If the child node is a warp, move it after the crop. This is more efficient because: 1. The crop is closer to the load. 2. we are warping with less data. Example: >>> from delayed_image.delayed_nodes import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> fpath = kwimage.grab_test_image_fpath() >>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare() >>> node1 = node0.warp({'scale': 0.432, >>> 'theta': np.pi / 3, >>> 'about': (80, 80), >>> 'shearx': .3, >>> 'offset': (-50, -50)}) >>> node2 = node1[10:50, 1:40] >>> self = node2 >>> new_outer = node2._opt_warp_after_crop() >>> print(ub.urepr(node2.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(new_outer.nesting(), nl=-1, sort=0)) >>> final0 = self._finalize() >>> final1 = new_outer._finalize() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(final0, pnum=(2, 2, 1), fnum=1, title='raw') >>> kwplot.imshow(final1, pnum=(2, 2, 2), fnum=1, title='optimized') Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> from delayed_image import * # NOQA >>> from delayed_image.delayed_leafs import DelayedLoad >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare() >>> node1 = node0.warp({'scale': 1000 / 512}) >>> node2 = node1[250:750, 0:500] >>> self = node2 >>> new_outer = node2._opt_warp_after_crop() >>> print(ub.urepr(node2.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(new_outer.nesting(), nl=-1, sort=0)) """ assert isinstance2(self.subdata, DelayedWarp) # Inner is the data closer to the leaf (disk), outer is the data closer # to the user (output). outer_slices = self.meta['space_slice'] outer_chan_idxs = self.meta['chan_idxs'] inner_transform = self.subdata.meta['transform'] outer_region = kwimage.Boxes.from_slice(outer_slices) outer_region = outer_region.to_polygons()[0] from delayed_image.helpers import _swap_warp_after_crop inner_slice, outer_transform = _swap_warp_after_crop( outer_region, inner_transform) warp_meta = ub.dict_isect(self.meta, {'dsize'}) warp_meta.update(ub.dict_isect( self.subdata.meta, self.subdata._algo_keys)) new_inner = self.subdata.subdata.crop(inner_slice, outer_chan_idxs) new_outer = new_inner.warp(outer_transform, **warp_meta) if TRACE_OPTIMIZE: new_outer._opt_logs.extend(self.subdata.subdata._opt_logs) new_outer._opt_logs.extend(self.subdata._opt_logs) new_outer._opt_logs.append('_opt_warp_after_crop') return new_outer
[docs] @profile def _opt_dequant_after_crop(self): # Swap order so dequantize is after the crop assert isinstance2(self.subdata, DelayedDequantize) quantization = self.subdata.meta['quantization'] new = copy.copy(self) new.subdata = self.subdata.subdata # Remove the dequantization new = new.dequantize(quantization) # Push it after the crop if TRACE_OPTIMIZE: new._opt_logs.append('_opt_dequant_after_crop') return new
[docs] class DelayedOverview(DelayedImage): """ Downsamples an image by a factor of two. If the underlying image being loaded has precomputed overviews it simply loads these instead of downsampling the original image, which is more efficient. Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> # Make a complex chain of operations and optimize it >>> from delayed_image import * # NOQA >>> import kwimage >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> dimg = DelayedLoad(fpath, channels='r|g|b').prepare() >>> dimg = dimg.get_overview(1) >>> dimg = dimg.get_overview(1) >>> dimg = dimg.get_overview(1) >>> dopt = dimg.optimize() >>> if 1: >>> import networkx as nx >>> dimg.write_network_text() >>> dopt.write_network_text() >>> print(ub.urepr(dopt.nesting(), nl=-1, sort=0)) >>> final0 = dimg._finalize()[:] >>> final1 = dopt._finalize()[:] >>> assert final0.shape == final1.shape >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw') >>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized') """ def __init__(self, subdata, overview): """ Args: subdata (DelayedArray): data to operate on overview (int): the overview to use (assuming it exists) """ super().__init__(subdata) self.meta['overview'] = overview w, h = subdata.dsize sf = 1 / (2 ** overview) """ Ignore: # Check how gdal handles overviews for odd sized images. imdata = np.random.rand(31, 29) kwimage.imwrite('foo.tif', imdata, backend='gdal', overviews=3) ub.cmd('gdalinfo foo.tif', verbose=3) """ # The rounding operation for gdal overviews is ceiling def iceil(x): return int(np.ceil(x)) w = iceil(sf * w) h = iceil(sf * h) self.meta['dsize'] = (w, h) @property def num_overviews(self): """ Returns: int """ # This operation reduces the number of available overviews num_remain = self.subdata.num_overviews - self.meta['overview'] return num_remain
[docs] def _finalize(self): """ Returns: ArrayLike """ sub_final = self.subdata._finalize() overview = self.meta['overview'] if hasattr(sub_final, 'get_overview'): # This should be a lazy gdal frame if sub_final.num_overviews >= overview: final = sub_final.get_overview(overview) return final warnings.warn(ub.paragraph( ''' The underlying data does not have overviews. Simulating the overview using a imresize operation. ''' )) sub_final = np.asarray(sub_final) final = kwimage.imresize( sub_final, scale=1 / 2 ** overview, interpolation='nearest', # antialias=True ) return final
[docs] @profile def optimize(self): """ Returns: DelayedImage """ new = copy.copy(self) new.subdata = self.subdata.optimize() if isinstance2(new.subdata, DelayedOverview): new = new._opt_fuse_overview() if new.meta['overview'] == 0: new = new.subdata elif isinstance2(new.subdata, DelayedCrop): new = new._opt_crop_after_overview() new = new.optimize() elif isinstance2(new.subdata, DelayedWarp): new = new._opt_warp_after_overview() new = new.optimize() elif isinstance2(new.subdata, DelayedDequantize): new = new._opt_dequant_after_overview() new = new.optimize() if isinstance2(new.subdata, DelayedChannelConcat): new = new._opt_push_under_concat().optimize() return new
[docs] def _transform_from_subdata(self): scale = 1 / 2 ** self.meta['overview'] return kwimage.Affine.scale(scale)
[docs] @profile def _opt_overview_as_warp(self): """ Sometimes it is beneficial to replace an overview with a warp as an intermediate optimization step. """ transform = self._transform_from_subdata() dsize = self.meta['dsize'] new = self.subdata.warp(transform, dsize=dsize) if TRACE_OPTIMIZE: new._opt_logs.append(self._opt_logs) new._opt_logs.append('_opt_overview_as_warp') return new
[docs] @profile def _opt_crop_after_overview(self): """ Given an outer overview and an inner crop, switch places. We want the overview to be as close to the load as possible. Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> from delayed_image import * # NOQA >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare() >>> node1 = node0[100:400, 120:450] >>> node2 = node1.get_overview(2) >>> self = node2 >>> new_outer = node2.optimize() >>> print(ub.urepr(node2.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(new_outer.nesting(), nl=-1, sort=0)) >>> final0 = self._finalize() >>> final1 = new_outer._finalize() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw') >>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized') Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> from delayed_image import * # NOQA >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare() >>> node1 = node0[:, :, 0:2] >>> node2 = node1.get_overview(2) >>> self = node2 >>> new_outer = node2.optimize() >>> node2.write_network_text() >>> new_outer.write_network_text() >>> assert node2.shape[2] == 2 >>> assert new_outer.shape[2] == 2 """ from delayed_image.helpers import _swap_crop_after_warp assert isinstance2(self.subdata, DelayedCrop) # Inner is the data closer to the leaf (disk), outer is the data closer # to the user (output). outer_overview = self.meta['overview'] inner_slices = self.subdata.meta['space_slice'] chan_idxs = self.subdata.meta['chan_idxs'] sf = 1 / 2 ** outer_overview outer_transform = kwimage.Affine.scale(sf) inner_region = kwimage.Boxes.from_slice(inner_slices) inner_region = inner_region.to_polygons()[0] new_inner_warp, outer_crop, new_outer_warp = _swap_crop_after_warp( inner_region, outer_transform) # Move the overview to the inside, it should be unchanged new = self.subdata.subdata.get_overview(outer_overview) # Move the crop to the outside new = new.crop(outer_crop, chan_idxs=chan_idxs) if not np.all(np.isclose(np.eye(3), new_outer_warp)): # we might have to apply an additional warp at the end. new = new.warp(new_outer_warp) if TRACE_OPTIMIZE: new._opt_logs.append('_opt_crop_after_overview') return new
[docs] @profile def _opt_fuse_overview(self): assert isinstance2(self.subdata, DelayedOverview) outer_overview = self.meta['overview'] inner_overrview = self.subdata.meta['overview'] new_overview = inner_overrview + outer_overview new = self.subdata.subdata.get_overview(new_overview) if TRACE_OPTIMIZE: new._opt_logs.append('_opt_fuse_overview') return new
[docs] @profile def _opt_dequant_after_overview(self): # Swap order so dequantize is after the crop assert isinstance2(self.subdata, DelayedDequantize) quantization = self.subdata.meta['quantization'] new = copy.copy(self) new.subdata = self.subdata.subdata # Remove the dequantization new = new.dequantize(quantization) # Push it after the crop if TRACE_OPTIMIZE: new._opt_logs.append('_opt_dequant_after_overview') return new
[docs] @profile def _opt_warp_after_overview(self): """ Given an warp followed by an overview, move the warp to the outer scope such that the overview is first. Example: >>> # xdoctest: +REQUIRES(module:osgeo) >>> from delayed_image import * # NOQA >>> fpath = kwimage.grab_test_image_fpath(overviews=3) >>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare() >>> node1 = node0.warp({'scale': (2.1, .7), 'offset': (20, 40)}) >>> node2 = node1.get_overview(2) >>> self = node2 >>> new_outer = node2.optimize() >>> print(ub.urepr(node2.nesting(), nl=-1, sort=0)) >>> print(ub.urepr(new_outer.nesting(), nl=-1, sort=0)) >>> final0 = self._finalize() >>> final1 = new_outer._finalize() >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw') >>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized') """ assert isinstance2(self.subdata, DelayedWarp) outer_overview = self.meta['overview'] inner_transform = self.subdata.meta['transform'] outer_transform = self._transform_from_subdata() A = outer_transform B = inner_transform # We have: A @ B, and we want that to equal C @ A # where the overview A left as-is and moved inside, we modify the new # outer transform C to accomodate this. # So C = A @ B @ A.inv() C = A @ B @ A.inv() new_outer = C new_inner_overview = outer_overview new = self.subdata.subdata.get_overview(new_inner_overview) new = new.warp(new_outer) if TRACE_OPTIMIZE: new._opt_logs.extend(self.subdata._opt_logs) new._opt_logs.append('_opt_warp_after_overview') return new
def _rectify_retain(remove, retain): if remove is not None or retain is None: valid_keys = {"offset", "scale", "shearx", "theta"} if remove is not None and retain is not None: raise ValueError('retain and remove are mutually exclusive') if remove is not None: retain = valid_keys - set(remove) else: if retain is None: retain = set() return retain class CoordinateCompatibilityError(ValueError): """ Error when trying to perform operations on data in different coordinate systems. """ pass class _InnerAccumSegment: # (ub.NiceRepr): """ Gather the indexes we need to take from an inner component """ def __init__(curr, comp): curr.comp = comp curr.start = None curr.stop = None curr.codes = None curr.indexes = [] # def __nice__(curr): # return f'{curr.start}, {curr.stop}, {curr.indexes}, {curr.codes}' def add_inner(curr, inner, code): if curr.start is None: curr.start = inner curr.stop = inner + 1 if code is not None: curr.codes = [] else: if code is None: curr.codes = None # Can we take a contiguous slice? if curr.stop is not None and curr.stop == inner: curr.stop = inner + 1 else: # Contiguous input is broken, fallback to indexes only. curr.stop = None # Accumulate the codes if we can if curr.codes is not None: curr.codes.append(code) # cast to int to prevent numpy types curr.indexes.append(int(inner)) def get_indexer(curr): """ Return an list of indexes into the subcomponent that will form the new contiguous out-component """ if curr.stop is not None: if curr.start == 0 and curr.stop == curr.comp.num_channels: # We can take the entire component sub_idxs = Ellipsis assert curr.indexes[0] == 0 assert curr.indexes[-1] == curr.stop - 1 else: # In this case we could take the parts as a contiguous # slice, but just return indexes for now sub_idxs = list(range(curr.start, curr.stop)) assert sub_idxs == curr.indexes else: sub_idxs = curr.indexes return sub_idxs def get_subcomponent(curr, dsize): """ Finalize the subcomponent """ from delayed_image.delayed_leafs import DelayedNans comp = curr.comp if comp is None: # There is no component to get a subcomponent from. # Instead, return nans that correspond to the requested # codes. if curr.codes is None: nan_chan = None else: nan_chan = channel_spec.FusedChannelSpec(curr.codes) sub_comp = DelayedNans(dsize, channels=nan_chan) else: sub_idxs = curr.get_indexer() if sub_idxs is Ellipsis: # Entire component is valid, no need for sub-operation sub_comp = comp else: sub_comp = comp.take_channels(sub_idxs) return sub_comp def isinstance2(inst, cls): """ In production regular isinstance works fine, but when debugging in IPython reloading classes will causes it to break, so we special case it here. Args: item (object): instance to check cls (type): class to check against Returns: bool Ignore: from delayed_image.delayed_nodes import * # NOQA from delayed_image.delayed_leafs import DelayedNans inst = DelayedNans() cls = DelayedImage isinstance2(inst, cls) isinstance2(inst, DelayedWarp) """ import sys USE_REAL_ISINSTANCE = 'IPython' not in sys.modules if USE_REAL_ISINSTANCE: return isinstance(inst, cls) else: return any(inst_cls.__name__ == cls.__name__ for inst_cls in inst.__class__.__mro__)