"""
Intermediate operations
"""
import kwarray
import kwimage
import copy
import numpy as np
import ubelt as ub
import warnings
from kwcoco import exceptions
from kwcoco import channel_spec
from kwcoco.util.delayed_ops.delayed_base import DelayedNaryOperation, DelayedUnaryOperation
from kwcoco.util.delayed_ops import delayed_leafs
try:
from xdev import profile
except Exception:
from ubelt import identity as profile
# --------
# Stacking
# --------
[docs]class DelayedStack(DelayedNaryOperation):
"""
Stacks multiple arrays together.
"""
def __init__(self, parts, axis):
"""
Args:
parts (List[DelayedArray]): data to stack
axis (int): axes to stack on
"""
raise NotImplementedError
super().__init__(parts=parts)
self.meta['axis'] = axis
def __nice__(self):
"""
Returns:
str
"""
return '{}'.format(self.shape)
@property
def shape(self):
"""
Returns:
None | Tuple[int | None, ...]
"""
shape = self.subdata.shape
return shape
[docs]class DelayedConcat(DelayedNaryOperation):
"""
Stacks multiple arrays together.
"""
def __init__(self, parts, axis):
"""
Args:
parts (List[DelayedArray]): data to concat
axis (int): axes to concat on
"""
super().__init__(parts=parts)
self.meta['axis'] = axis
def __nice__(self):
return '{}'.format(self.shape)
@property
def shape(self):
"""
Returns:
None | Tuple[int | None, ...]
"""
shape = self.subdata.shape
return shape
[docs]class DelayedFrameStack(DelayedStack):
"""
Stacks multiple arrays together.
"""
def __init__(self, parts):
"""
Args:
parts (List[DelayedArray]): data to stack
"""
raise NotImplementedError
super().__init__(parts=parts, axis=0)
# ------
# Images
# ------
[docs]class ImageOpsMixin:
[docs] def crop(self, space_slice=None, chan_idxs=None, clip=True, wrap=True, pad=0):
"""
Crops an image along integer pixel coordinates.
Args:
space_slice (Tuple[slice, slice]):
y-slice and x-slice.
chan_idxs (List[int]):
indexes of bands to take
clip (bool):
if True, the slice is interpreted normally, where it won't go
past the image extent, otherwise slicing into negative regions
or past the image bounds will result in padding. Defaults to
True.
wrap (bool):
if True, negative indexes "wrap around", otherwise they are
treated as is. Defaults to True.
pad (int | List[Tuple[int, int]]):
if specified, applies extra padding
Returns:
DelayedImage
Example:
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> self = DelayedLoad.demo().prepare()
>>> self = self.dequantize({'quant_max': 255})
>>> self = self.warp({'scale': 1 / 2})
>>> pad = 0
>>> h, w = space_dims = self.dsize[::-1]
>>> grid = list(ub.named_product({
>>> 'left': [0, -64], 'right': [0, 64],
>>> 'top': [0, -64], 'bot': [0, 64],}))
>>> grid += [
>>> {'left': 64, 'right': -64, 'top': 0, 'bot': 0},
>>> {'left': 64, 'right': 64, 'top': 0, 'bot': 0},
>>> {'left': 0, 'right': 0, 'top': 64, 'bot': -64},
>>> {'left': 64, 'right': -64, 'top': 64, 'bot': -64},
>>> ]
>>> crops = []
>>> for pads in grid:
>>> space_slice = (slice(pads['top'], h + pads['bot']),
>>> slice(pads['left'], w + pads['right']))
>>> delayed = self.crop(space_slice)
>>> crop = delayed.finalize()
>>> yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0]
>>> title = '[{}:{}, {}:{}]'.format(*yyxx)
>>> crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray')
>>> crops.append(crop_canvas)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen')
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas, title='Normal Slicing: Cropped Images With Wrap+Clipped Slices', doclf=1, fnum=1)
>>> kwplot.show_if_requested()
Example:
>>> # Demo the case with pads / no-clips / no-wraps
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> self = DelayedLoad.demo().prepare()
>>> self = self.dequantize({'quant_max': 255})
>>> self = self.warp({'scale': 1 / 2})
>>> pad = [(64, 128), (32, 96)]
>>> pad = [(0, 20), (0, 0)]
>>> pad = 0
>>> pad = 8
>>> h, w = space_dims = self.dsize[::-1]
>>> grid = list(ub.named_product({
>>> 'left': [0, -64], 'right': [0, 64],
>>> 'top': [0, -64], 'bot': [0, 64],}))
>>> grid += [
>>> {'left': 64, 'right': -64, 'top': 0, 'bot': 0},
>>> {'left': 64, 'right': 64, 'top': 0, 'bot': 0},
>>> {'left': 0, 'right': 0, 'top': 64, 'bot': -64},
>>> {'left': 64, 'right': -64, 'top': 64, 'bot': -64},
>>> ]
>>> crops = []
>>> for pads in grid:
>>> space_slice = (slice(pads['top'], h + pads['bot']),
>>> slice(pads['left'], w + pads['right']))
>>> delayed = self._padded_crop(space_slice, pad=pad)
>>> crop = delayed.finalize(optimize=1)
>>> yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0]
>>> title = '[{}:{}, {}:{}]'.format(*yyxx)
>>> if pad:
>>> title += f'{chr(10)}pad={pad}'
>>> crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray')
>>> crops.append(crop_canvas)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen', resize='smaller')
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas, title='Negative Slicing: Cropped Images With clip=False wrap=False', doclf=1, fnum=2)
>>> kwplot.show_if_requested()
"""
if not clip or not wrap or pad:
if clip or wrap:
raise NotImplementedError(
ub.paragraph(
'''
Currently, in "negative slice mode" both clip and wrap
params must be set to False if padding is given or
either of clip or wrap is False.
'''))
# Currently padding doesn't really work with crops, so its not
# efficient, but we can hack it to work with warps.
new = self._padded_crop(space_slice, pad=pad)
else:
# Normal efficient case
new = DelayedCrop(self, space_slice, chan_idxs)
return new
def _padded_crop(self, space_slice, pad=0):
"""
Does the type of padded crop we want, but inefficiently using a warp.
Reimplementing would be good, but this is good enough for now.
"""
if self.dsize is None:
raise Exception('dsize must be populated to do a padded crop')
data_dims = self.dsize[::-1]
_data_slice, _extra_padding = kwarray.embed_slice(
space_slice, data_dims, pad)
offset_d0, extra_d0 = _extra_padding[0]
offset_d1, extra_d1 = _extra_padding[1]
pad_warp = {'offset': (offset_d1, offset_d0)}
data_crop_box = kwimage.Boxes.from_slice(
_data_slice, clip=False, wrap=False)
dsize = (data_crop_box.width.ravel()[0] + offset_d1 + extra_d1,
data_crop_box.height.ravel()[0] + offset_d0 + extra_d0)
new = self.crop(_data_slice)
if any([offset_d0, extra_d0, offset_d1, extra_d1]):
# Use a warp to accomplish padding.
# Having an explicit padding node would be better.
new = new.warp(pad_warp, dsize=dsize)
return new
[docs] def warp(self, transform, dsize='auto', antialias=True,
interpolation='linear', border_value='auto'):
"""
Applys an affine transformation to the image
Args:
transform (ndarray | dict | kwimage.Affine):
a coercable affine matrix. See :class:`kwimage.Affine` for
details on what can be coerced.
dsize (Tuple[int, int] | str):
The width / height of the output canvas. If 'auto', dsize is
computed such that the positive coordinates of the warped image
will fit in the new canvas. In this case, any pixel that maps
to a negative coordinate will be clipped. This has the
property that the input transformation is not modified.
antialias (bool):
if True determines if the transform is downsampling and applies
antialiasing via gaussian a blur. Defaults to False
interpolation (str):
interpolation code or cv2 integer. Interpolation codes are linear,
nearest, cubic, lancsoz, and area. Defaults to "linear".
border_value (int | float | str):
if auto will be nan for float and 0 for int.
Returns:
DelayedImage
"""
new = DelayedWarp(self, transform, dsize=dsize, antialias=antialias,
interpolation=interpolation)
return new
[docs] def scale(self, scale, dsize='auto', antialias=True,
interpolation='linear', border_value='auto'):
"""
An alias for self.warp({"scale": scale}, ...)
"""
transform = {'scale': scale}
return self.warp(transform, dsize=dsize, antialias=antialias,
interpolation=interpolation,
border_value=border_value)
[docs] def dequantize(self, quantization):
"""
Rescales image intensities from int to floats.
Args:
quantization (Dict[str, Any]):
see :func:`kwcoco.util.delayed_ops.helpers.dequantize`
Returns:
DelayedDequantize
"""
new = DelayedDequantize(self, quantization)
return new
[docs] def get_overview(self, overview):
"""
Downsamples an image by a factor of two.
Args:
overview (int): the overview to use (assuming it exists)
Returns:
DelayedOverview
"""
new = DelayedOverview(self, overview)
return new
[docs] def as_xarray(self):
"""
Returns:
DelayedAsXarray
"""
return DelayedAsXarray(self)
[docs]class DelayedChannelConcat(ImageOpsMixin, DelayedConcat):
"""
Stacks multiple arrays together.
CommandLine:
xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/delayed_ops/delayed_nodes.py DelayedChannelConcat:1
Example:
>>> from kwcoco.util.delayed_ops import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> import kwcoco
>>> dsize = (307, 311)
>>> c1 = DelayedNans(dsize=dsize, channels='foo')
>>> c2 = DelayedLoad.demo('astro', dsize=dsize, channels='R|G|B').prepare()
>>> cat = DelayedChannelConcat([c1, c2])
>>> warped_cat = cat.warp({'scale': 1.07}, dsize=(328, 332))
>>> warped_cat._validate()
>>> warped_cat.finalize()
Example:
>>> # Test case that failed in initial implementation
>>> # Due to incorrectly pushing channel selection under the concat
>>> from kwcoco.util.delayed_ops import * # NOQA
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath()
>>> base1 = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> base2 = DelayedLoad(fpath, channels='x|y|z').prepare().scale(2)
>>> base3 = DelayedLoad(fpath, channels='i|j|k').prepare().scale(2)
>>> bands = [base2, base1[:, :, 0].scale(2).evaluate(),
>>> base1[:, :, 1].evaluate().scale(2),
>>> base1[:, :, 2].evaluate().scale(2), base3]
>>> delayed = DelayedChannelConcat(bands)
>>> delayed = delayed.warp({'scale': 2})
>>> delayed = delayed[0:100, 0:55, [0, 2, 4]]
>>> delayed.write_network_text()
>>> delayed.optimize()
"""
def __init__(self, parts, dsize=None):
"""
Args:
parts (List[DelayedArray]): data to concat
dsize (Tuple[int, int] | None): size if known a-priori
"""
super().__init__(parts=parts, axis=2)
if dsize is None:
dsize_cands = [comp.dsize for comp in self.parts]
if not ub.allsame(dsize_cands):
raise exceptions.CoordinateCompatibilityError(
# 'parts must all have the same delayed size')
'parts must all have the same delayed size: got {}'.format(dsize_cands))
if len(dsize_cands) == 0:
dsize = None
else:
dsize = dsize_cands[0]
self.dsize = dsize
try:
self.num_channels = sum(comp.num_channels for comp in self.parts)
except TypeError:
if any(comp.num_channels is None for comp in self.parts):
self.num_channels = None
else:
raise
def __nice__(self):
"""
Returns:
str
"""
if self.channels is None:
return '{}'.format(self.shape)
else:
return '{}, {}'.format(self.shape, self.channels)
@property
def channels(self):
"""
Returns:
None | kwcoco.FusedChannelSpec
"""
import kwcoco
sub_channs = []
for comp in self.parts:
comp_channels = comp.channels
if comp_channels is None:
return None
sub_channs.append(comp_channels)
channs = kwcoco.FusedChannelSpec.concat(sub_channs)
return channs
@property
def shape(self):
"""
Returns:
Tuple[int | None, int | None, int | None]
"""
w, h = self.dsize
return (h, w, self.num_channels)
def _finalize(self):
"""
Returns:
ArrayLike
"""
stack = [comp._finalize() for comp in self.parts]
if len(stack) == 1:
final = stack[0]
else:
stack = [kwarray.atleast_nd(s, 3) for s in stack]
final = np.concatenate(stack, axis=2)
return final
[docs] def optimize(self):
"""
Returns:
DelayedImage
"""
new_parts = [part.optimize() for part in self.parts]
kw = ub.dict_isect(self.meta, ['dsize'])
new = self.__class__(new_parts, **kw)
return new
[docs] @profile
def take_channels(self, channels):
"""
This method returns a subset of the vision data with only the
specified bands / channels.
Args:
channels (List[int] | slice | channel_spec.FusedChannelSpec):
List of integers indexes, a slice, or a channel spec, which is
typically a pipe (`|`) delimited list of channel codes. See
:class:`kwcoco.ChannelSpec` for more detials.
Returns:
DelayedArray:
a delayed vision operation that only operates on the following
channels.
Example:
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> self = delayed = dset.coco_image(1).delay(mode=1)
>>> channels = 'B11|B8|B1|B10'
>>> new = self.take_channels(channels)
Example:
>>> # Complex case
>>> import kwcoco
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = dset.coco_image(1).delay(mode=1)
>>> astro = DelayedLoad.demo('astro', channels='r|g|b').prepare()
>>> aligned = astro.warp(kwimage.Affine.scale(600 / 512), dsize='auto')
>>> self = combo = DelayedChannelConcat(delayed.parts + [aligned])
>>> channels = 'B1|r|B8|g'
>>> new = self.take_channels(channels)
>>> new_cropped = new.crop((slice(10, 200), slice(12, 350)))
>>> new_opt = new_cropped.optimize()
>>> datas = new_opt.finalize()
>>> if 1:
>>> new_cropped.write_network_text(with_labels='name')
>>> new_opt.write_network_text(with_labels='name')
>>> vizable = kwimage.normalize_intensity(datas, axis=2)
>>> self._validate()
>>> new._validate()
>>> new_cropped._validate()
>>> new_opt._validate()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> stacked = kwimage.stack_images(vizable.transpose(2, 0, 1))
>>> kwplot.imshow(stacked)
Example:
>>> # Test case where requested channel does not exist
>>> import kwcoco
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', use_cache=1, verbose=100)
>>> self = delayed = dset.coco_image(1).delay(mode=1)
>>> channels = 'B1|foobar|bazbiz|B8'
>>> new = self.take_channels(channels)
>>> new_cropped = new.crop((slice(10, 200), slice(12, 350)))
>>> fused = new_cropped.finalize()
>>> assert fused.shape == (190, 338, 4)
>>> assert np.all(np.isnan(fused[..., 1:3]))
>>> assert not np.any(np.isnan(fused[..., 0]))
>>> assert not np.any(np.isnan(fused[..., 3]))
"""
if channels is None:
return self
from kwcoco.util.delayed_ops.delayed_leafs import DelayedNans
current_channels = self.channels
if isinstance(channels, list):
top_idx_mapping = channels
top_codes = self.channels.as_list()
request_codes = None
else:
channels = channel_spec.FusedChannelSpec.coerce(channels)
# Computer subindex integer mapping
request_codes = channels.as_list()
top_codes = current_channels.as_oset()
top_idx_mapping = []
for code in request_codes:
try:
top_idx_mapping.append(top_codes.index(code))
except KeyError:
top_idx_mapping.append(None)
# Rearange subcomponents into the specified channel representation
# I am not confident that this logic is the best way to do this.
# This may be a bottleneck
subindexer = kwarray.FlatIndexer([
comp.num_channels for comp in self.parts])
accum = []
class _ContiguousSegment(object):
def __init__(self, comp, start):
self.comp = comp
self.start = start
self.stop = start + 1
self.codes = []
curr = None
for request_idx, idx in enumerate(top_idx_mapping):
if idx is None:
# Requested channel does not exist in our data stack
comp = None
inner = 0
if curr is not None and curr.comp is None:
inner = curr.stop
else:
# Requested channel exists in our data stack
outer, inner = subindexer.unravel(idx)
comp = self.parts[outer]
if curr is None:
curr = _ContiguousSegment(comp, inner)
else:
is_contiguous = curr.comp is comp and (inner == curr.stop)
if is_contiguous:
# extend the previous contiguous segment
curr.stop = inner + 1
else:
# accept previous segment and start a new one
accum.append(curr)
curr = _ContiguousSegment(comp, inner)
# Hack for nans
if request_codes is not None:
curr.codes.append(request_codes[request_idx])
# Accumulate final segment
if curr is not None:
accum.append(curr)
# Execute the delayed operation
new_components = []
for curr in accum:
comp = curr.comp
if comp is None:
# Requested component did not exist, return nans
if request_codes is not None:
nan_chan = channel_spec.FusedChannelSpec(curr.codes)
else:
nan_chan = None
comp = DelayedNans(self.dsize, channels=nan_chan)
new_components.append(comp)
else:
if curr.start == 0 and curr.stop == comp.num_channels:
# Entire component is valid, no need for sub-operation
new_components.append(comp)
else:
# Only part of the component is taken, need to sub-operate
# It would be nice if we only loaded the file once if we need
# multiple parts discontiguously.
sub_idxs = list(range(curr.start, curr.stop))
sub_comp = comp.take_channels(sub_idxs)
new_components.append(sub_comp)
new = DelayedChannelConcat(new_components)
return new
def __getitem__(self, sl):
if not isinstance(sl, tuple):
raise TypeError('slice must be given as tuple')
if len(sl) == 2:
sl_y, sl_x = sl
chan_idxs = None
elif len(sl) == 3:
sl_y, sl_x, chan_idxs = sl
else:
raise ValueError('Slice must have 2 or 3 dims')
space_slice = (sl_y, sl_x)
return self.crop(space_slice, chan_idxs)
@property
def num_overviews(self):
"""
Returns:
int
"""
num_overviews = self.meta.get('num_overviews', None)
if num_overviews is None and self.parts is not None:
cand = [p.num_overviews for p in self.parts]
if ub.allsame(cand):
num_overviews = cand[0]
else:
import warnings
warnings.warn('inconsistent overviews')
num_overviews = None
return num_overviews
[docs] def as_xarray(self):
"""
Returns:
DelayedAsXarray
"""
return DelayedAsXarray(self)
def _push_operation_under(self, op, kwargs):
# Note: we can't do this with a crop that has band selection
# But spatial operations should be ok.
return self.__class__([op(p, **kwargs) for p in self.parts])
def _validate(self):
"""
Check that the delayed metadata corresponds with the finalized data
"""
final = self._finalize()
# meta_dsize = self.dsize
meta_shape = self.shape
final_shape = final.shape
correspondences = {
'shape': (final_shape, meta_shape)
}
for k, tup in correspondences.items():
v1, v2 = tup
if v1 != v2:
raise AssertionError(
f'final and meta {k} does not agree {v1!r} != {v2!r}')
return self
[docs] def undo_warps(self, remove=None, retain=None, squash_nans=False, return_warps=False):
"""
Attempts to "undo" warping for each concatenated channel and returns a
list of delayed operations that are cropped to the right regions.
Typically you will retrain offset, theta, and shear to remove scale.
This ensures the data is spatially aligned up to a scale factor.
Args:
remove (List[str]): if specified, list components of the warping to
remove. Can include: "offset", "scale", "shearx", "theta".
Typically set this to ["scale"].
retain (List[str]): if specified, list components of the warping to
retain. Can include: "offset", "scale", "shearx", "theta".
Mutually exclusive with "remove". If neither remove or retain
is specified, retain is set to ``[]``.
squash_nans (bool):
if True, pure nan channels are squashed into a 1x1 array as
they do not correspond to a real source.
return_warps (bool):
if True, return the transforms we applied.
This is useful when you need to warp objects in the original
space into the jagged space.
Example:
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedNans
>>> import ubelt as ub
>>> import kwimage
>>> import kwarray
>>> import numpy as np
>>> # Demo case where we have different channels at different resolutions
>>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255})
>>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate()
>>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate()
>>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate()
>>> bandN = DelayedNans((600, 600), channels='N')
>>> # Make a concatenation of images of different underlying native resolutions
>>> delayed_vidspace = DelayedChannelConcat([
>>> bandR.scale(6, dsize=(600, 600)).optimize(),
>>> bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize(),
>>> bandB.scale(1, dsize=(600, 600)).optimize(),
>>> bandN,
>>> ]).warp({'scale': 0.7}).optimize()
>>> vidspace_box = kwimage.Boxes([[100, 10, 270, 160]], 'ltrb')
>>> vidspace_poly = vidspace_box.to_polygons()[0]
>>> vidspace_slice = vidspace_box.to_slices()[0]
>>> self = delayed_vidspace[vidspace_slice].optimize()
>>> print('--- Aligned --- ')
>>> self.write_network_text()
>>> squash_nans = True
>>> undone_all_parts, tfs1 = self.undo_warps(squash_nans=squash_nans, return_warps=True)
>>> undone_scale_parts, tfs2 = self.undo_warps(remove=['scale'], squash_nans=squash_nans, return_warps=True)
>>> stackable_aligned = self.finalize().transpose(2, 0, 1)
>>> stackable_undone_all = []
>>> stackable_undone_scale = []
>>> print('--- Undone All --- ')
>>> for undone in undone_all_parts:
... undone.write_network_text()
... stackable_undone_all.append(undone.finalize())
>>> print('--- Undone Scale --- ')
>>> for undone in undone_scale_parts:
... undone.write_network_text()
... stackable_undone_scale.append(undone.finalize())
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1)
>>> canvas1 = kwimage.stack_images(stackable_undone_all, axis=1)
>>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1)
>>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Aligned Channels')
>>> canvas1 = kwimage.draw_header_text(canvas1, 'Unwarped Channels')
>>> canvas2 = kwimage.draw_header_text(canvas2, 'Unscaled Channels')
>>> canvas = kwimage.stack_images([canvas0, canvas1, canvas2], axis=0)
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas)
"""
retain = _rectify_retain(remove, retain)
unwarped_parts = []
if return_warps:
jagged_warps = []
for part in self.parts:
undone_part, undo_warp = part.undo_warp(
retain=retain, squash_nans=squash_nans,
return_warp=return_warps)
unwarped_parts.append(undone_part)
jagged_warps.append(undo_warp)
return unwarped_parts, jagged_warps
else:
for part in self.parts:
undone_part = part.undo_warp(
retain=retain, squash_nans=squash_nans)
unwarped_parts.append(undone_part)
return unwarped_parts
[docs]class DelayedArray(DelayedUnaryOperation):
"""
A generic NDArray.
"""
def __init__(self, subdata=None):
"""
Args:
subdata (DelayedArray):
"""
super().__init__(subdata=subdata)
def __nice__(self):
"""
Returns:
str
"""
return '{}'.format(self.shape)
@property
def shape(self):
"""
Returns:
None | Tuple[int | None, ...]
"""
shape = self.subdata.shape
return shape
[docs]class DelayedImage(ImageOpsMixin, DelayedArray):
"""
For the case where an array represents a 2D image with multiple channels
"""
def __init__(self, subdata=None, dsize=None, channels=None):
"""
Args:
subdata (DelayedArray):
dsize (None | Tuple[int | None, int | None]): overrides subdata dsize
channels (None | int | kwcoco.FusedChannelSpec): overrides subdata channels
"""
super().__init__(subdata)
self.channels = channels
self.meta['dsize'] = dsize
def __nice__(self):
"""
Returns:
str
"""
if self.channels is None:
return '{}'.format(self.shape)
else:
return '{}, {}'.format(self.shape, self.channels)
@property
def shape(self):
"""
Returns:
None | Tuple[int | None, int | None, int | None]
"""
dsize = self.dsize
if dsize is None:
dsize = (None, None)
w, h = dsize
c = self.num_channels
return (h, w, c)
@property
def num_channels(self):
"""
Returns:
None | int
"""
num_channels = self.meta.get('num_channels', None)
if num_channels is None and self.subdata is not None:
num_channels = self.subdata.num_channels
return num_channels
@property
def dsize(self):
"""
Returns:
None | Tuple[int | None, int | None]
"""
# return self.meta.get('dsize', None)
dsize = self.meta.get('dsize', None)
if dsize is None and self.subdata is not None:
dsize = self.subdata.dsize
return dsize
@property
def channels(self):
"""
Returns:
None | kwcoco.FusedChannelSpec
"""
channels = self.meta.get('channels', None)
if channels is None and self.subdata is not None:
channels = self.subdata.channels
return channels
@channels.setter
def channels(self, channels):
if channels is None:
num_channels = None
else:
if isinstance(channels, int):
num_channels = channels
channels = None
else:
import kwcoco
channels = kwcoco.FusedChannelSpec.coerce(channels)
num_channels = channels.normalize().numel()
self.meta['channels'] = channels
self.meta['num_channels'] = num_channels
@property
def num_overviews(self):
"""
Returns:
int
"""
num_overviews = self.meta.get('num_overviews', None)
if num_overviews is None and self.subdata is not None:
num_overviews = self.subdata.num_overviews
return num_overviews
def __getitem__(self, sl):
if not isinstance(sl, tuple):
raise TypeError('slice must be given as tuple')
if len(sl) == 2:
sl_y, sl_x = sl
chan_idxs = None
elif len(sl) == 3:
sl_y, sl_x, chan_idxs = sl
else:
raise ValueError('Slice must have 2 or 3 dims')
space_slice = (sl_y, sl_x)
return self.crop(space_slice, chan_idxs)
[docs] def take_channels(self, channels):
"""
This method returns a subset of the vision data with only the
specified bands / channels.
Args:
channels (List[int] | slice | channel_spec.FusedChannelSpec):
List of integers indexes, a slice, or a channel spec, which is
typically a pipe (`|`) delimited list of channel codes. See
kwcoco.ChannelSpec for more detials.
Returns:
DelayedCrop:
a new delayed load with a fused take channel operation
Note:
The channel subset must exist here or it will raise an error.
A better implementation (via pymbolic) might be able to do better
Example:
>>> #
>>> # Test Channel Select Via Code
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> self = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare()
>>> channels = 'r|b'
>>> new = self.take_channels(channels)._validate()
>>> new2 = new[:, :, [1, 0]]._validate()
>>> new3 = new2[:, :, [1]]._validate()
Example:
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwcoco
>>> self = DelayedLoad.demo('astro').prepare()
>>> channels = [2, 0]
>>> new = self.take_channels(channels)
>>> new3 = new.take_channels([1, 0])
>>> new._validate()
>>> new3._validate()
>>> final1 = self.finalize()
>>> final2 = new.finalize()
>>> final3 = new3.finalize()
>>> assert np.all(final1[..., 2] == final2[..., 0])
>>> assert np.all(final1[..., 0] == final2[..., 1])
>>> assert final2.shape[2] == 2
>>> assert np.all(final1[..., 2] == final3[..., 1])
>>> assert np.all(final1[..., 0] == final3[..., 0])
>>> assert final3.shape[2] == 2
"""
if isinstance(channels, list):
top_idx_mapping = channels
else:
current_channels = self.channels
if current_channels is None:
raise ValueError(
'The channel spec for this node are unknown. '
'Cannot use a spec to select them'
)
channels = channel_spec.FusedChannelSpec.coerce(channels)
# Computer subindex integer mapping
request_codes = channels.as_list()
top_codes = current_channels.as_oset()
top_idx_mapping = [
top_codes.index(code)
for code in request_codes
]
new_chan_ixs = top_idx_mapping
new = self.crop(None, new_chan_ixs)
return new
def _validate(self):
"""
Check that the delayed metadata corresponds with the finalized data
"""
opt = self.optimize()
opt_shape = opt.shape
final = self._finalize()
# meta_dsize = self.dsize
meta_shape = self.shape
final_shape = final.shape
correspondences = {
'opt_chans': (self.channels, opt.channels),
'opt_nbands': (self.num_channels, opt.num_channels),
'final_shape': (final_shape, meta_shape),
'opt_shape': (opt_shape, meta_shape),
}
for k, tup in correspondences.items():
v1, v2 = tup
if v1 != v2:
raise AssertionError(
f'final and meta {k} does not agree {v1!r} != {v2!r}')
return self
def _transform_from_subdata(self):
raise NotImplementedError
[docs] def evaluate(self):
"""
Evaluate this node and return the data as an identity.
Returns:
DelayedIdentity
"""
from kwcoco.util.delayed_ops.delayed_leafs import DelayedIdentity
final = self.finalize()
new = DelayedIdentity(final, dsize=self.dsize, channels=self.channels)
return new
def _opt_push_under_concat(self):
assert isinstance(self.subdata, DelayedChannelConcat)
kwargs = ub.compatible(self.meta, self.__class__.__init__)
new = self.subdata._push_operation_under(self.__class__, kwargs)
return new
[docs] def undo_warp(self, remove=None, retain=None, squash_nans=False, return_warp=False):
"""
Attempts to "undo" warping for each concatenated channel and returns a
list of delayed operations that are cropped to the right regions.
Typically you will retrain offset, theta, and shear to remove scale.
This ensures the data is spatially aligned up to a scale factor.
Args:
remove (List[str]): if specified, list components of the warping to
remove. Can include: "offset", "scale", "shearx", "theta".
Typically set this to ["scale"].
retain (List[str]): if specified, list components of the warping to
retain. Can include: "offset", "scale", "shearx", "theta".
Mutually exclusive with "remove". If neither remove or retain
is specified, retain is set to ``[]``.
squash_nans (bool):
if True, pure nan channels are squashed into a 1x1 array as
they do not correspond to a real source.
return_warp (bool):
if True, return the transform we applied.
This is useful when you need to warp objects in the original
space into the jagged space.
SeeAlso:
DelayedChannelConcat.undo_warps
Example:
>>> # Test similar to undo_warps, but on each channel separately
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedNans
>>> import ubelt as ub
>>> import kwimage
>>> import kwarray
>>> import numpy as np
>>> # Demo case where we have different channels at different resolutions
>>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255})
>>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate()
>>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate()
>>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate()
>>> bandN = DelayedNans((600, 600), channels='N')
>>> B0 = bandR.scale(6, dsize=(600, 600)).optimize()
>>> B1 = bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize()
>>> B2 = bandB.scale(1, dsize=(600, 600)).optimize()
>>> vidspace_box = kwimage.Boxes([[-10, -10, 270, 160]], 'ltrb').scale(1 / .7).quantize()
>>> vidspace_poly = vidspace_box.to_polygons()[0]
>>> vidspace_slice = vidspace_box.to_slices()[0]
>>> # Test with the padded crop
>>> self0 = B0.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize()
>>> self1 = B1.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize()
>>> self2 = B2.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize()
>>> parts = [self0, self1, self2]
>>> # Run the undo on each channel
>>> undone_scale_parts = [d.undo_warp(remove=['scale']) for d in parts]
>>> print('--- Aligned --- ')
>>> stackable_aligned = []
>>> for d in parts:
>>> d.write_network_text()
>>> stackable_aligned.append(d.finalize())
>>> print('--- Undone Scale --- ')
>>> stackable_undone_scale = []
>>> for undone in undone_scale_parts:
... undone.write_network_text()
... stackable_undone_scale.append(undone.finalize())
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1, pad=5, bg_value='kw_darkgray')
>>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1, pad=5, bg_value='kw_darkgray')
>>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Channels')
>>> canvas2 = kwimage.draw_header_text(canvas2, 'Native Scale Channels')
>>> canvas = kwimage.stack_images([canvas0, canvas2], axis=0, bg_value='kw_darkgray')
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas)
"""
retain = _rectify_retain(remove, retain)
part = self
tf_root_from_leaf = part.get_transform_from_leaf()
tf_leaf_from_root = tf_root_from_leaf.inv()
undo_all = tf_leaf_from_root
all_undo_components = undo_all.concise()
undo_components = ub.dict_diff(all_undo_components, retain)
undo_warp = kwimage.Affine.coerce(undo_components)
undone_part = part.warp(undo_warp).optimize()
if squash_nans:
if return_warp:
# hack the return undo_warp
w, h = undone_part.dsize
undo_warp = kwimage.Affine.scale((1 / w, 1 / h)) @ undo_warp
if isinstance(undone_part, delayed_leafs.DelayedNans):
undone_part = undone_part[0:1, 0:1].optimize()
if return_warp:
return undone_part, undo_warp
else:
return undone_part
[docs]class DelayedAsXarray(DelayedImage):
"""
Casts the data to an xarray object in the finalize step
Example;
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> # without channels
>>> base = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> self = base.as_xarray()
>>> final = self._validate().finalize()
>>> assert len(final.coords) == 0
>>> assert final.dims == ('y', 'x', 'c')
>>> # with channels
>>> base = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare()
>>> self = base.as_xarray()
>>> final = self._validate().finalize()
>>> assert final.coords.indexes['c'].tolist() == ['r', 'g', 'b']
>>> assert final.dims == ('y', 'x', 'c')
"""
def _finalize(self):
"""
Returns:
ArrayLike
"""
import xarray as xr
subfinal = np.asarray(self.subdata._finalize())
channels = self.subdata.channels
coords = {}
if channels is not None:
coords['c'] = channels.code_list()
final = xr.DataArray(subfinal, dims=('y', 'x', 'c'), coords=coords)
return final
[docs] def optimize(self):
"""
Returns:
DelayedImage
"""
return self.subdata.optimize().as_xarray()
[docs]class DelayedWarp(DelayedImage):
"""
Applies an affine transform to an image.
Example:
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> self = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> warp1 = self.warp({'scale': 3})
>>> warp2 = warp1.warp({'theta': 0.1})
>>> warp3 = warp2._opt_fuse_warps()
>>> warp3._validate()
>>> print(ub.repr2(warp2.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(warp3.nesting(), nl=-1, sort=0))
"""
def __init__(self, subdata, transform, dsize='auto', antialias=True,
interpolation='linear', border_value='auto'):
"""
Args:
subdata (DelayedArray): data to operate on
transform (ndarray | dict | kwimage.Affine):
a coercable affine matrix. See :class:`kwimage.Affine` for
details on what can be coerced.
dsize (Tuple[int, int] | str):
The width / height of the output canvas. If 'auto', dsize is
computed such that the positive coordinates of the warped image
will fit in the new canvas. In this case, any pixel that maps
to a negative coordinate will be clipped. This has the
property that the input transformation is not modified.
antialias (bool):
if True determines if the transform is downsampling and applies
antialiasing via gaussian a blur. Defaults to False
interpolation (str):
interpolation code or cv2 integer. Interpolation codes are linear,
nearest, cubic, lancsoz, and area. Defaults to "linear".
"""
super().__init__(subdata)
transform = kwimage.Affine.coerce(transform)
if dsize == 'auto':
from kwcoco.util.delayed_ops.helpers import _auto_dsize
dsize = _auto_dsize(transform, self.subdata.dsize)
self.meta['transform'] = transform
self.meta['antialias'] = antialias
self.meta['interpolation'] = interpolation
self.meta['dsize'] = dsize
self.meta['border_value'] = border_value
@property
def transform(self):
"""
Returns:
kwimage.Affine
"""
return self.meta['transform']
def _finalize(self):
"""
Returns:
ArrayLike
"""
dsize = self.dsize
if dsize == (None, None):
dsize = None
antialias = self.meta['antialias']
transform = self.meta['transform']
interpolation = self.meta['interpolation']
prewarp = self.subdata._finalize()
prewarp = np.asanyarray(prewarp)
# TODO: we could configure this, but forcing nans on floats seems like
# a pretty nice default border behavior. It would be even nicer to have
# masked arrays for ints.
# The scalar / explicit functionality will be handled inside warp_affine
# in the future, so some of this can be removed.
num_chan = kwimage.num_channels(prewarp)
if self.meta['border_value'] == 'auto':
if prewarp.dtype.kind == 'f':
border_value = np.nan
else:
if isinstance(prewarp, np.ma.MaskedArray):
border_value = int(prewarp.fill_value)
else:
border_value = 0
else:
border_value = self.meta['border_value']
if not ub.iterable(border_value):
# Odd OpenCV behavior: https://github.com/opencv/opencv/issues/22283
# Can only have at most 4 components to border_value and
# then they start to wrap around. This is fine if we are only
# specifying a single number for all channels
border_value = (border_value,) * min(4, num_chan)
if len(border_value) > 4:
raise ValueError('borderValue cannot have more than 4 components. '
'OpenCV #22283 describes why')
# HACK:
# the border value only correctly applies to the first 4 channels for
# whatever reason.
border_value = border_value[0:4]
M = np.asarray(transform)
final = kwimage.warp_affine(prewarp, M, dsize=dsize,
interpolation=interpolation,
antialias=antialias,
border_value=border_value)
# final = cv2.warpPerspective(sub_data_, M, dsize=dsize, flags=flags)
# Ensure that the last dimension is channels
final = kwarray.atleast_nd(final, 3, front=False)
return final
[docs] @profile
def optimize(self):
"""
Returns:
DelayedImage
Example:
>>> # Demo optimization that removes a noop warp
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> base = DelayedLoad.demo(channels='r|g|b').prepare()
>>> self = base.warp(kwimage.Affine.eye())
>>> new = self.optimize()
>>> assert len(self.as_graph().nodes) == 2
>>> assert len(new.as_graph().nodes) == 1
Example:
>>> # Test optimize nans
>>> from kwcoco.util.delayed_ops import DelayedNans
>>> import kwimage
>>> base = DelayedNans(dsize=(100, 100), channels='a|b|c')
>>> self = base.warp(kwimage.Affine.scale(0.1))
>>> # Should simply return a new nan generator
>>> new = self.optimize()
>>> assert len(new.as_graph().nodes) == 1
"""
new = copy.copy(self)
new.subdata = self.subdata.optimize()
if isinstance(new.subdata, DelayedWarp):
new = new._opt_fuse_warps()
### The tolerance should be very strict by default, but
### we also might want to be able to parameterize it
if new.transform.isclose_identity(rtol=0, atol=0) and new.dsize == new.subdata.dsize:
new = new.subdata
elif isinstance(new.subdata, DelayedChannelConcat):
new = new._opt_push_under_concat().optimize()
elif hasattr(new.subdata, '_optimized_warp'):
# The subdata knows how to optimize itself wrt a warp
warp_kwargs = ub.dict_isect(self.meta, {
'transform', 'dsize', 'antialias', 'interpolation',
'border_value'})
new = new.subdata._optimized_warp(**warp_kwargs).optimize()
else:
split = new._opt_split_warp_overview()
if new is not split:
new = split
new.subdata = new.subdata.optimize()
new = new.optimize()
else:
new = new._opt_absorb_overview()
return new
def _transform_from_subdata(self):
return self.transform
def _opt_fuse_warps(self):
"""
Combine two consecutive warps into a single operation.
"""
assert isinstance(self.subdata, DelayedWarp)
inner_data = self.subdata.subdata
tf1 = self.subdata.meta['transform']
tf2 = self.meta['transform']
# TODO: could ensure the metadata is compatable, for now just take the
# most recent
dsize = self.meta['dsize']
common_meta = ub.dict_isect(self.meta, {'antialias', 'interpolation', 'border_value'})
new_transform = tf2 @ tf1
new = self.__class__(inner_data, new_transform, dsize=dsize,
**common_meta)
return new
def _opt_absorb_overview(self):
"""
Remove the overview if we can get a higher resolution without it
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> base = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> # Case without any operations between the overview and warp
>>> self = base.get_overview(1).warp({'scale': 4})
>>> self.write_network_text()
>>> opt = self._opt_absorb_overview()._validate()
>>> opt.write_network_text()
>>> opt_data = [d for n, d in opt.as_graph().nodes(data=True)]
>>> assert 'DelayedOverview' not in [d['type'] for d in opt_data]
>>> # Case with a chain of operations between overview and warp
>>> self = base.get_overview(1)[0:101, 0:100].warp({'scale': 4})
>>> self.write_network_text()
>>> opt = self._opt_absorb_overview()._validate()
>>> opt.write_network_text()
>>> opt_data = [d for n, d in opt.as_graph().nodes(data=True)]
>>> assert opt_data[1]['meta']['space_slice'] == (slice(0, 202, None), slice(0, 200, None))
>>> # Any sort of complex chain does prevents this optimization
>>> # from running.
>>> self = base.get_overview(1)[0:101, 0:100][0:50, 0:50].warp({'scale': 4})
>>> opt = self._opt_absorb_overview()._validate()
>>> opt.write_network_text()
>>> opt_data = [d for n, d in opt.as_graph().nodes(data=True)]
>>> assert 'DelayedOverview' in [d['type'] for d in opt_data]
"""
# Check if there is a strict downsampling component
transform = self.meta['transform']
params = transform.decompose()
sx, sy = params['scale']
if sx < 2 and sy < 2:
return self
# Lookahead to see if there is a nearby overview operation that can be
# absorbed, remember the chain of operations between the warp and the
# overview, as it will need to be modified.
parent = self
subdata = None
chain = []
num_dc = 0
for i in range(4):
subdata = parent.subdata
if subdata is None:
break
elif isinstance(subdata, DelayedWarp):
subdata = None
break
elif isinstance(subdata, DelayedOverview):
# We found an overview node
overview = subdata
break
elif isinstance(subdata, DelayedDequantize):
pass
elif isinstance(subdata, DelayedCrop):
num_dc += 1
else:
subdata = None
break
chain.append(subdata)
parent = subdata
else:
subdata = None
if subdata is None:
return self
if num_dc > 1:
return self
# Replace the overview node with a warp node that mimics it.
# This has no impact on the function of the operation stack.
mimic_overview = overview._opt_overview_as_warp()
tf1 = mimic_overview.meta['transform']
# Handle any nodes between the warp and the overview.
# This should be at most one quantization and one crop operation,
# but we may generalize that in the future.
if not chain:
# The overview is directly after this warp
new_head = mimic_overview.subdata
else:
# Copy the chain so this does not mutate the input
chain = [copy.copy(n) for n in chain]
for u, v in ub.iter_window(chain, 2):
u.subdata = v
tail = chain[-1]
tail.subdata = mimic_overview
# Check if the tail of the chain is a crop.
if hasattr(tail, '_opt_warp_after_crop'):
# This modifies the tail so it is now a warp followed by a
# crop. Note that the warp may be different than the mimiced
# overview, so use this new transform instead.
# (Actually, I think this can't make the new crop non integral,
# so it probably wont matter)
modified_tail = tail._opt_warp_after_crop()
new_chain_dsize = modified_tail.meta['dsize']
tf1 = modified_tail.meta['transform']
# Remove the modified warp
tail_parent = chain[-2] if len(chain) > 1 else self
new_tail = modified_tail.subdata
tail_parent.subdata = new_tail
chain[-1] = new_tail
for notcrop in chain[:-1]:
notcrop.meta['dsize'] = new_chain_dsize
else:
# The chain does not contain a crop operation, we can safely
# remove it.
# Finally remove the overview transform entirely
tail.subdata = mimic_overview.subdata
new_chain_dsize = mimic_overview.subdata.meta['dsize']
# The dsize within the chain might be wrong due to our
# modification. I **think** its ok to just directly set it to the
# new dsize as it should only be operations that do not change the
# dsize, but it would be nice to find a more ellegant
# implementation.
for notcrop in chain[:-1]:
notcrop.meta['dsize'] = new_chain_dsize
new_head = chain[0]
warp_meta = ub.dict_isect(self.meta, {'antialias', 'interpolation', 'border_value'})
tf2 = self.meta['transform']
dsize = self.meta['dsize']
new_transform = tf2 @ tf1
new = self.__class__(new_head, new_transform, dsize=dsize, **warp_meta)
return new
def _opt_split_warp_overview(self):
"""
Split this node into a warp and an overview if possible
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> self = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> print(f'self={self}')
>>> print('self.meta = {}'.format(ub.repr2(self.meta, nl=1)))
>>> warp0 = self.warp({'scale': 0.2})
>>> warp1 = warp0._opt_split_warp_overview()
>>> warp2 = self.warp({'scale': 0.25})._opt_split_warp_overview()
>>> print(ub.repr2(warp0.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(warp1.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(warp2.nesting(), nl=-1, sort=0))
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> self = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> warp0 = self.warp({'scale': 1 / 2 ** 6})
>>> opt = warp0.optimize()
>>> print(ub.repr2(warp0.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(opt.nesting(), nl=-1, sort=0))
"""
inner_data = self.subdata
num_overviews = inner_data.num_overviews
if not num_overviews:
return self
# Check if there is a strict downsampling component
transform = self.meta['transform']
params = transform.decompose()
sx, sy = params['scale']
if sx > 0.5 or sy > 0.5:
return self
# Check how many pyramid downs we could replace downsampling with
from kwimage.im_cv2 import _prepare_scale_residual
num_downs_possible, _, _ = _prepare_scale_residual(sx, sy, fudge=0)
# But only use as many downs as we have overviews
num_downs = min(num_overviews, num_downs_possible)
if num_downs == 0:
return self
# Given the overview, find the residual to reconstruct the original
overview_transform = kwimage.Affine.scale(1 / (2 ** num_downs))
# Let T=original, O=overview, R=residual
# T = R @ O
# T @ O.inv = R @ O @ O.inv
# T @ O.inv = R
residual_transform = transform @ overview_transform.inv()
new_transform = residual_transform
dsize = self.meta['dsize']
overview = inner_data.get_overview(num_downs)
if new_transform.isclose_identity():
new = overview
else:
common_meta = ub.dict_isect(self.meta, {
'antialias', 'interpolation', 'border_value'})
new = overview.warp(new_transform, dsize=dsize, **common_meta)
return new
[docs]class DelayedDequantize(DelayedImage):
"""
Rescales image intensities from int to floats.
The output is usually between 0 and 1. This also handles transforming
nodata into nan values.
"""
def __init__(self, subdata, quantization):
"""
Args:
subdata (DelayedArray): data to operate on
quantization (Dict):
see :func:`kwcoco.util.delayed_ops.helpers.dequantize`
"""
super().__init__(subdata)
self.meta['quantization'] = quantization
self.meta['dsize'] = subdata.dsize
def _finalize(self):
"""
Returns:
ArrayLike
"""
from kwcoco.util.delayed_ops.helpers import dequantize
quantization = self.meta['quantization']
final = self.subdata._finalize()
final = kwarray.atleast_nd(final, 3, front=False)
if quantization is not None:
final = dequantize(final, quantization)
return final
[docs] @profile
def optimize(self):
"""
Returns:
DelayedImage
Example:
>>> # Test a case that caused an error in development
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> fpath = kwimage.grab_test_image_fpath()
>>> base = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> quantization = {'quant_max': 255, 'nodata': 0}
>>> self = base.get_overview(1).dequantize(quantization)
>>> self.write_network_text()
>>> opt = self.optimize()
"""
new = copy.copy(self)
new.subdata = self.subdata.optimize()
if isinstance(new.subdata, DelayedDequantize):
raise AssertionError('Dequantization is only allowed once')
if isinstance(new.subdata, DelayedWarp):
# Swap order so quantize is before the warp
new = new._opt_dequant_before_other()
new = new.optimize()
if isinstance(new.subdata, DelayedChannelConcat):
new = new._opt_push_under_concat().optimize()
return new
def _opt_dequant_before_other(self):
quantization = self.meta['quantization']
new = copy.copy(self.subdata)
new.subdata = new.subdata.dequantize(quantization)
return new
def _transform_from_subdata(self):
return kwimage.Affine.eye()
[docs]class DelayedCrop(DelayedImage):
"""
Crops an image along integer pixel coordinates.
Example:
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> base = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> # Test Fuse Crops Space Only
>>> crop1 = base[4:12, 0:16]
>>> self = crop1[2:6, 0:8]
>>> opt = self._opt_fuse_crops()
>>> self.write_network_text()
>>> opt.write_network_text()
>>> #
>>> # Test Channel Select Via Index
>>> self = base[:, :, [0]]
>>> self.write_network_text()
>>> final = self._finalize()
>>> assert final.shape == (16, 16, 1)
>>> assert base[:, :, [0, 1]].finalize().shape == (16, 16, 2)
>>> assert base[:, :, [2, 0, 1]].finalize().shape == (16, 16, 3)
"""
def __init__(self, subdata, space_slice=None, chan_idxs=None):
"""
Args:
subdata (DelayedArray): data to operate on
space_slice (Tuple[slice, slice]):
if speficied, take this y-slice and x-slice.
chan_idxs (List[int] | None):
if specified, take these channels / bands
"""
super().__init__(subdata)
# TODO: are we doing infinite padding or clipping?
# This assumes clipping
in_w, in_h = subdata.dsize
if space_slice is not None:
space_dims = (in_h, in_w)
slice_box = kwimage.Boxes.from_slice(
space_slice, space_dims, wrap=True, clip=True)
space_slice = slice_box.to_slices()[0]
# width = slice_box.width.ravel()[0]
# height = slice_box.height.ravel()[0]
space_slice, _pad = kwarray.embed_slice(space_slice, space_dims)
sl_y, sl_x = space_slice[0:2]
width = sl_x.stop - sl_x.start
height = sl_y.stop - sl_y.start
self.meta['dsize'] = (width, height)
else:
space_slice = (slice(0, in_h), slice(0, in_w))
self.meta['dsize'] = (in_w, in_h)
if chan_idxs is not None:
current_channels = self.channels
if current_channels is not None:
new_channels = current_channels[chan_idxs]
else:
new_channels = len(chan_idxs)
self.channels = new_channels
self.meta['space_slice'] = space_slice
self.meta['chan_idxs'] = chan_idxs
def _finalize(self):
"""
Returns:
ArrayLike
"""
space_slice = self.meta['space_slice']
chan_idxs = self.meta['chan_idxs']
sub_final = self.subdata._finalize()
if chan_idxs is None:
full_slice = space_slice
else:
full_slice = space_slice + (chan_idxs,)
# final = sub_final[space_slice]
final = sub_final[full_slice]
final = kwarray.atleast_nd(final, 3)
return final
def _transform_from_subdata(self):
sl_y, sl_x = self.meta['space_slice']
offset = -sl_x.start, -sl_y.start
self_from_subdata = kwimage.Affine.translate(offset)
return self_from_subdata
[docs] @profile
def optimize(self):
"""
Returns:
DelayedImage
Example:
>>> # Test optimize nans
>>> from kwcoco.util.delayed_ops import DelayedNans
>>> import kwimage
>>> base = DelayedNans(dsize=(100, 100), channels='a|b|c')
>>> self = base[0:10, 0:5]
>>> # Should simply return a new nan generator
>>> new = self.optimize()
>>> self.write_network_text()
>>> new.write_network_text()
>>> assert len(new.as_graph().nodes) == 1
"""
new = copy.copy(self)
new.subdata = self.subdata.optimize()
if isinstance(new.subdata, DelayedCrop):
new = new._opt_fuse_crops()
if hasattr(new.subdata, '_optimized_crop'):
# The subdata knows how to optimize itself wrt this node
crop_kwargs = ub.dict_isect(self.meta, {'space_slice', 'chan_idxs'})
new = new.subdata._optimized_crop(**crop_kwargs).optimize()
if isinstance(new.subdata, DelayedWarp):
new = new._opt_warp_after_crop()
new = new.optimize()
elif isinstance(new.subdata, DelayedDequantize):
new = new._opt_dequant_after_crop()
new = new.optimize()
if isinstance(new.subdata, DelayedChannelConcat):
if isinstance(new, DelayedCrop):
# We have to be careful if there we have band selection
chan_idxs = new.meta.get('chan_idxs', None)
space_slice = new.meta.get('space_slice', None)
taken = new.subdata
if chan_idxs is not None:
taken = new.subdata.take_channels(chan_idxs).optimize()
if space_slice is not None:
taken = taken.crop(space_slice)._opt_push_under_concat().optimize()
new = taken
else:
new = new._opt_push_under_concat().optimize()
return new
def _opt_fuse_crops(self):
"""
Combine two consecutive crops into a single operation.
Example:
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> base = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> # Test Fuse Crops Space Only
>>> crop1 = base[4:12, 0:16]
>>> crop2 = self = crop1[2:6, 0:8]
>>> opt = crop2._opt_fuse_crops()
>>> self.write_network_text()
>>> opt.write_network_text()
>>> opt._validate()
>>> self._validate()
Example:
>>> # Test Fuse Crops Channels Only
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> base = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> crop1 = base.crop(chan_idxs=[0, 2, 1])
>>> crop2 = crop1.crop(chan_idxs=[1, 2])
>>> crop3 = self = crop2.crop(chan_idxs=[0, 1])
>>> opt = self._opt_fuse_crops()._opt_fuse_crops()
>>> self.write_network_text()
>>> opt.write_network_text()
>>> finalB = base._validate()._finalize()
>>> final1 = opt._validate()._finalize()
>>> final2 = self._validate()._finalize()
>>> assert np.all(final2[..., 0] == finalB[..., 2])
>>> assert np.all(final2[..., 1] == finalB[..., 1])
>>> assert np.all(final2[..., 0] == final1[..., 0])
>>> assert np.all(final2[..., 1] == final1[..., 1])
Example:
>>> # Test Fuse Crops Space And Channels
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> base = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> crop1 = base[4:12, 0:16, [1, 2]]
>>> self = crop1[2:6, 0:8, [1]]
>>> opt = self._opt_fuse_crops()
>>> self.write_network_text()
>>> opt.write_network_text()
>>> self._validate()
>>> crop1._validate()
"""
assert isinstance(self.subdata, DelayedCrop)
# Inner is the data closer to the leaf (disk), outer is the data closer
# to the user (output).
inner_data = self.subdata.subdata
inner_slices = self.subdata.meta['space_slice']
outer_slices = self.meta['space_slice']
outer_ysl, outer_xsl = outer_slices
inner_ysl, inner_xsl = inner_slices
# Apply the new relative slice to the current absolute slice
new_xstart = min(inner_xsl.start + outer_xsl.start, inner_xsl.stop)
new_xstop = min(inner_xsl.start + outer_xsl.stop, inner_xsl.stop)
new_ystart = min(inner_ysl.start + outer_ysl.start, inner_ysl.stop)
new_ystop = min(inner_ysl.start + outer_ysl.stop, inner_ysl.stop)
# Handle bands
inner_chan_idxs = self.subdata.meta['chan_idxs']
outer_chan_idxs = self.meta['chan_idxs']
if outer_chan_idxs is None and inner_chan_idxs is None:
new_chan_idxs = None
elif outer_chan_idxs is None:
new_chan_idxs = inner_chan_idxs
elif inner_chan_idxs is None:
new_chan_idxs = outer_chan_idxs
else:
new_chan_idxs = list(ub.take(inner_chan_idxs, outer_chan_idxs))
new_crop = (slice(new_ystart, new_ystop), slice(new_xstart, new_xstop))
new = self.__class__(inner_data, new_crop, new_chan_idxs)
return new
def _opt_warp_after_crop(self):
"""
If the child node is a warp, move it after the crop.
This is more efficient because:
1. The crop is closer to the load.
2. we are warping with less data.
Example:
>>> from kwcoco.util.delayed_ops.delayed_nodes import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> fpath = kwimage.grab_test_image_fpath()
>>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> node1 = node0.warp({'scale': 0.432, 'theta': np.pi / 3, 'about': (80, 80), 'shearx': .3, 'offset': (-50, -50)})
>>> node2 = node1[10:50, 1:40]
>>> self = node2
>>> new_outer = node2._opt_warp_after_crop()
>>> print(ub.repr2(node2.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(new_outer.nesting(), nl=-1, sort=0))
>>> final0 = self._finalize()
>>> final1 = new_outer._finalize()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final0, pnum=(2, 2, 1), fnum=1, title='raw')
>>> kwplot.imshow(final1, pnum=(2, 2, 2), fnum=1, title='optimized')
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops import * # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> node1 = node0.warp({'scale': 1000 / 512})
>>> node2 = node1[250:750, 0:500]
>>> self = node2
>>> new_outer = node2._opt_warp_after_crop()
>>> print(ub.repr2(node2.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(new_outer.nesting(), nl=-1, sort=0))
"""
assert isinstance(self.subdata, DelayedWarp)
# Inner is the data closer to the leaf (disk), outer is the data closer
# to the user (output).
outer_slices = self.meta['space_slice']
outer_chan_idxs = self.meta['chan_idxs']
inner_transform = self.subdata.meta['transform']
outer_region = kwimage.Boxes.from_slice(outer_slices)
outer_region = outer_region.to_polygons()[0]
from kwcoco.util.delayed_ops.helpers import _swap_warp_after_crop
inner_slice, outer_transform = _swap_warp_after_crop(
outer_region, inner_transform)
warp_meta = ub.dict_isect(self.meta, {'dsize'})
warp_meta.update(ub.dict_isect(
self.subdata.meta, {'antialias', 'interpolation', 'border_value'}))
new_inner = self.subdata.subdata.crop(inner_slice, outer_chan_idxs)
new_outer = new_inner.warp(outer_transform, **warp_meta)
return new_outer
def _opt_dequant_after_crop(self):
# Swap order so dequantize is after the crop
assert isinstance(self.subdata, DelayedDequantize)
quantization = self.subdata.meta['quantization']
new = copy.copy(self)
new.subdata = self.subdata.subdata # Remove the dequantization
new = new.dequantize(quantization) # Push it after the crop
return new
[docs]class DelayedOverview(DelayedImage):
"""
Downsamples an image by a factor of two.
If the underlying image being loaded has precomputed overviews it simply
loads these instead of downsampling the original image, which is more
efficient.
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Make a complex chain of operations and optimize it
>>> from kwcoco.util.delayed_ops import * # NOQA
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> dimg = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> dimg = dimg.get_overview(1)
>>> dimg = dimg.get_overview(1)
>>> dimg = dimg.get_overview(1)
>>> dopt = dimg.optimize()
>>> if 1:
>>> import networkx as nx
>>> dimg.write_network_text()
>>> dopt.write_network_text()
>>> print(ub.repr2(dopt.nesting(), nl=-1, sort=0))
>>> final0 = dimg._finalize()[:]
>>> final1 = dopt._finalize()[:]
>>> assert final0.shape == final1.shape
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw')
>>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized')
"""
def __init__(self, subdata, overview):
"""
Args:
subdata (DelayedArray): data to operate on
overview (int): the overview to use (assuming it exists)
"""
super().__init__(subdata)
self.meta['overview'] = overview
w, h = subdata.dsize
sf = 1 / (2 ** overview)
"""
Ignore:
# Check how gdal handles overviews for odd sized images.
imdata = np.random.rand(31, 29)
kwimage.imwrite('foo.tif', imdata, backend='gdal', overviews=3)
ub.cmd('gdalinfo foo.tif', verbose=3)
"""
# The rounding operation for gdal overviews is ceiling
def iceil(x):
return int(np.ceil(x))
w = iceil(sf * w)
h = iceil(sf * h)
self.meta['dsize'] = (w, h)
@property
def num_overviews(self):
"""
Returns:
int
"""
# This operation reduces the number of available overviews
num_remain = self.subdata.num_overviews - self.meta['overview']
return num_remain
def _finalize(self):
"""
Returns:
ArrayLike
"""
sub_final = self.subdata._finalize()
overview = self.meta['overview']
if hasattr(sub_final, 'get_overview'):
# This should be a lazy gdal frame
if sub_final.num_overviews >= overview:
final = sub_final.get_overview(overview)
return final
warnings.warn(ub.paragraph(
'''
The underlying data does not have overviews.
Simulating the overview using a imresize operation.
'''
))
sub_final = np.asarray(sub_final)
final = kwimage.imresize(
sub_final,
scale=1 / 2 ** overview,
interpolation='nearest',
# antialias=True
)
return final
[docs] @profile
def optimize(self):
"""
Returns:
DelayedImage
"""
new = copy.copy(self)
new.subdata = self.subdata.optimize()
if isinstance(new.subdata, DelayedOverview):
new = new._opt_fuse_overview()
if new.meta['overview'] == 0:
new = new.subdata
elif isinstance(new.subdata, DelayedCrop):
new = new._opt_crop_after_overview()
new = new.optimize()
elif isinstance(new.subdata, DelayedWarp):
new = new._opt_warp_after_overview()
new = new.optimize()
elif isinstance(new.subdata, DelayedDequantize):
new = new._opt_dequant_after_overview()
new = new.optimize()
if isinstance(new.subdata, DelayedChannelConcat):
new = new._opt_push_under_concat().optimize()
return new
def _transform_from_subdata(self):
scale = 1 / 2 ** self.meta['overview']
return kwimage.Affine.scale(scale)
def _opt_overview_as_warp(self):
"""
Sometimes it is beneficial to replace an overview with a warp as an
intermediate optimization step.
"""
transform = self._transform_from_subdata()
dsize = self.meta['dsize']
new = self.subdata.warp(transform, dsize=dsize)
return new
def _opt_crop_after_overview(self):
"""
Given an outer overview and an inner crop, switch places. We want the
overview to be as close to the load as possible.
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops import * # NOQA
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> node1 = node0[100:400, 120:450]
>>> node2 = node1.get_overview(2)
>>> self = node2
>>> new_outer = node2.optimize()
>>> print(ub.repr2(node2.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(new_outer.nesting(), nl=-1, sort=0))
>>> final0 = self._finalize()
>>> final1 = new_outer._finalize()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw')
>>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized')
"""
from kwcoco.util.delayed_ops.helpers import _swap_crop_after_warp
assert isinstance(self.subdata, DelayedCrop)
# Inner is the data closer to the leaf (disk), outer is the data closer
# to the user (output).
outer_overview = self.meta['overview']
inner_slices = self.subdata.meta['space_slice']
sf = 1 / 2 ** outer_overview
outer_transform = kwimage.Affine.scale(sf)
inner_region = kwimage.Boxes.from_slice(inner_slices)
inner_region = inner_region.to_polygons()[0]
new_inner_warp, outer_crop, new_outer_warp = _swap_crop_after_warp(
inner_region, outer_transform)
# Move the overview to the inside, it should be unchanged
new = self.subdata.subdata.get_overview(outer_overview)
# Move the crop to the outside
new = new.crop(outer_crop)
if not np.all(np.isclose(np.eye(3), new_outer_warp)):
# we might have to apply an additional warp at the end.
new = new.warp(new_outer_warp)
return new
def _opt_fuse_overview(self):
assert isinstance(self.subdata, DelayedOverview)
outer_overview = self.meta['overview']
inner_overrview = self.subdata.meta['overview']
new_overview = inner_overrview + outer_overview
new = self.subdata.subdata.get_overview(new_overview)
return new
def _opt_dequant_after_overview(self):
# Swap order so dequantize is after the crop
assert isinstance(self.subdata, DelayedDequantize)
quantization = self.subdata.meta['quantization']
new = copy.copy(self)
new.subdata = self.subdata.subdata # Remove the dequantization
new = new.dequantize(quantization) # Push it after the crop
return new
def _opt_warp_after_overview(self):
"""
Given an warp followed by an overview, move the warp to the outer scope
such that the overview is first.
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops import * # NOQA
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> node0 = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> node1 = node0.warp({'scale': (2.1, .7), 'offset': (20, 40)})
>>> node2 = node1.get_overview(2)
>>> self = node2
>>> new_outer = node2.optimize()
>>> print(ub.repr2(node2.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(new_outer.nesting(), nl=-1, sort=0))
>>> final0 = self._finalize()
>>> final1 = new_outer._finalize()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw')
>>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized')
"""
assert isinstance(self.subdata, DelayedWarp)
outer_overview = self.meta['overview']
inner_transform = self.subdata.meta['transform']
outer_transform = self._transform_from_subdata()
A = outer_transform
B = inner_transform
# We have: A @ B, and we want that to equal C @ A
# where the overview A left as-is and moved inside, we modify the new
# outer transform C to accomodate this.
# So C = A @ B @ A.inv()
C = A @ B @ A.inv()
new_outer = C
new_inner_overview = outer_overview
new = self.subdata.subdata.get_overview(new_inner_overview)
new = new.warp(new_outer)
return new
def _rectify_retain(remove, retain):
if remove is not None or retain is None:
valid_keys = {"offset", "scale", "shearx", "theta"}
if remove is not None and retain is not None:
raise ValueError('retain and remove are mutually exclusive')
if remove is not None:
retain = valid_keys - set(remove)
else:
if retain is None:
retain = set()
return retain
DelayedOverview2 = DelayedOverview
DelayedCrop2 = DelayedCrop
DelayedDequantize2 = DelayedDequantize
DelayedWarp2 = DelayedWarp
DelayedAsXarray2 = DelayedAsXarray
DelayedImage2 = DelayedImage
DelayedArray2 = DelayedArray
DelayedChannelConcat2 = DelayedChannelConcat
DelayedFrameStack2 = DelayedFrameStack
DelayedConcat2 = DelayedConcat
DelayedStack2 = DelayedStack