kwcoco.util.delayed_ops package

Submodules

Module contents

A rewrite of the delayed operations

Note

The classes in this submodule will have their names changed when the old POC delayed operations are deprecated.

Todo

The optimize logic could likley be better expressed as some sort of AST transformer.

Example

>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> dimg = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> quantization = {'quant_max': 255, 'nodata': 0}
>>> #
>>> # Make a complex chain of operations
>>> dimg = dimg.dequantize(quantization)
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg[0:400, 1:400]
>>> dimg = dimg.warp({'scale': 0.5})
>>> dimg = dimg[0:800, 1:800]
>>> dimg = dimg.warp({'scale': 0.5})
>>> dimg = dimg[0:800, 1:800]
>>> dimg = dimg.warp({'scale': 0.5})
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg.warp({'scale': 1.1})
>>> dimg = dimg.warp({'scale': 2.1})
>>> dimg = dimg[0:200, 1:200]
>>> dimg = dimg[1:200, 2:200]
>>> dimg.write_network_text()
╙── Crop dsize=(128,130),space_slice=(slice(1,131,None),slice(2,130,None))
    └─╼ Crop dsize=(130,131),space_slice=(slice(0,131,None),slice(1,131,None))
        └─╼ Warp dsize=(131,131),transform={scale=2.1000}
            └─╼ Warp dsize=(62,62),transform={scale=1.1000}
                └─╼ Warp dsize=(56,56),transform={scale=1.1000}
                    └─╼ Warp dsize=(50,50),transform={scale=0.5000}
                        └─╼ Crop dsize=(99,100),space_slice=(slice(0,100,None),slice(1,100,None))
                            └─╼ Warp dsize=(100,100),transform={scale=0.5000}
                                └─╼ Crop dsize=(199,200),space_slice=(slice(0,200,None),slice(1,200,None))
                                    └─╼ Warp dsize=(200,200),transform={scale=0.5000}
                                        └─╼ Crop dsize=(399,400),space_slice=(slice(0,400,None),slice(1,400,None))
                                            └─╼ Warp dsize=(621,621),transform={scale=1.1000}
                                                └─╼ Warp dsize=(564,564),transform={scale=1.1000}
                                                    └─╼ Dequantize dsize=(512,512),quantization={quant_max=255,nodata=0}
                                                        └─╼ Load channels=r|g|b,dsize=(512,512),num_overviews=3,fname=astro_overviews=3.tif
>>> # Optimize the chain
>>> dopt = dimg.optimize()
>>> dopt.write_network_text()
╙── Warp dsize=(128,130),transform={offset=(-0.6...,-1.0...),scale=1.5373}
    └─╼ Dequantize dsize=(80,83),quantization={quant_max=255,nodata=0}
        └─╼ Crop dsize=(80,83),space_slice=(slice(0,83,None),slice(3,83,None))
            └─╼ Overview dsize=(128,128),overview=2
                └─╼ Load channels=r|g|b,dsize=(512,512),num_overviews=3,fname=astro_overviews=3.tif
>>> final0 = dimg.finalize(optimize=False)
>>> final1 = dopt.finalize()
>>> assert final0.shape == final1.shape
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw')
>>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized')
_images/fig_kwcoco_util_delayed_ops_002.jpeg

Example

>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import ubelt as ub
>>> import kwimage
>>> # Sometimes we want to manipulate data in a space, but then remove all
>>> # warps in order to get a sample without any data artifacts.  This is
>>> # handled by adding a new transform that inverts everything and optimizing
>>> # it, which results in all warps canceling each other out.
>>> fpath = kwimage.grab_test_image_fpath()
>>> base = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> warp = kwimage.Affine.random(rng=321, offset=0)
>>> warp = kwimage.Affine.scale(0.5)
>>> orig = base.get_overview(1).warp(warp)[16:96, 24:128]
>>> delayed = orig.optimize()
>>> print('Orig')
>>> orig.write_network_text()
>>> print('Delayed')
>>> delayed.write_network_text()
>>> # Get the transform that would bring us back to the leaf
>>> tf_root_from_leaf = delayed.get_transform_from_leaf()
>>> print('tf_root_from_leaf =\n{}'.format(ub.repr2(tf_root_from_leaf, nl=1)))
>>> undo_all = tf_root_from_leaf.inv()
>>> print('undo_all =\n{}'.format(ub.repr2(undo_all, nl=1)))
>>> undo_scale = kwimage.Affine.coerce(ub.dict_diff(undo_all.concise(), ['offset']))
>>> print('undo_scale =\n{}'.format(ub.repr2(undo_scale, nl=1)))
>>> print('Undone All')
>>> undone_all = delayed.warp(undo_all).optimize()
>>> undone_all.write_network_text()
>>> # Discard translation components
>>> print('Undone Scale')
>>> undone_scale = delayed.warp(undo_scale).optimize()
>>> undone_scale.write_network_text()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> to_stack = []
>>> to_stack.append(base.finalize(optimize=False))
>>> to_stack.append(orig.finalize(optimize=False))
>>> to_stack.append(delayed.finalize(optimize=False))
>>> to_stack.append(undone_all.finalize(optimize=False))
>>> to_stack.append(undone_scale.finalize(optimize=False))
>>> kwplot.autompl()
>>> stack = kwimage.stack_images(to_stack, axis=1, bg_value=(5, 100, 10), pad=10)
>>> kwplot.imshow(stack)
_images/fig_kwcoco_util_delayed_ops_003.jpeg

CommandLine

xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/delayed_ops/__init__.py __doc__:2

Example

>>> # xdoctest: +REQUIRES(module:osgeo)
>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import ubelt as ub
>>> import kwimage
>>> import kwarray
>>> import numpy as np
>>> # Demo case where we have different channels at different resolutions
>>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255})
>>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate()
>>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate()
>>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate()
>>> # Align the bands in "video" space
>>> delayed_vidspace = DelayedChannelConcat([
>>>     bandR.scale(6, dsize=(600, 600)).optimize(),
>>>     bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize(),
>>>     bandB.scale(1, dsize=(600, 600)).optimize(),
>>> ]).warp(
>>>   #{'scale': 0.35, 'theta': 0.3, 'about': (30, 50), 'offset': (-10, -80)}
>>>   {'scale': 0.7}
>>> )
>>> #delayed_vidspace._set_nested_params(border_value=0)
>>> vidspace_box = kwimage.Boxes([[100, 10, 270, 160]], 'ltrb')
>>> vidspace_poly = vidspace_box.to_polygons()[0]
>>> vidspace_slice = vidspace_box.to_slices()[0]
>>> crop_vidspace = delayed_vidspace[vidspace_slice]
>>> crop_vidspace._set_nested_params(interpolation='lanczos')
>>> # Note: this only works because the graph is lazilly optimized
>>> crop_vidspace_box = vidspace_box.warp(crop_vidspace._transform_from_subdata())
>>> crop_vidspace_poly = vidspace_poly.warp(crop_vidspace._transform_from_subdata())
>>> opt_crop_vidspace = crop_vidspace.optimize()
>>> print('Original: Video Space')
>>> delayed_vidspace.write_network_text()
>>> print('Original Crop: Video Space')
>>> crop_vidspace.write_network_text()
>>> print('Optimized Crop: Video Space')
>>> opt_crop_vidspace.write_network_text()
>>> tostack_grid = []
>>> # Drop boxes in asset space
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='Underlying asset bands (imagine these are on disk)'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> delayed_vidspace_opt = delayed_vidspace.optimize()
>>> tf_vidspace_to_rband = delayed_vidspace_opt.parts[0].get_transform_from_leaf().inv()
>>> tf_vidspace_to_gband = delayed_vidspace_opt.parts[1].get_transform_from_leaf().inv()
>>> tf_vidspace_to_bband = delayed_vidspace_opt.parts[2].get_transform_from_leaf().inv()
>>> rband_box = vidspace_box.warp(tf_vidspace_to_rband)
>>> gband_box = vidspace_box.warp(tf_vidspace_to_gband)
>>> bband_box = vidspace_box.warp(tf_vidspace_to_bband)
>>> rband_poly = vidspace_poly.warp(tf_vidspace_to_rband)
>>> gband_poly = vidspace_poly.warp(tf_vidspace_to_gband)
>>> bband_poly = vidspace_poly.warp(tf_vidspace_to_bband)
>>> row.append(kwimage.draw_header_text(rband_poly.draw_on(rband_box.draw_on(bandR.finalize()), edgecolor='b', fill=0), 'R'))
>>> row.append(kwimage.draw_header_text(gband_poly.draw_on(gband_box.draw_on(bandG.finalize()), edgecolor='b', fill=0), 'asset G-band'))
>>> row.append(kwimage.draw_header_text(bband_poly.draw_on(bband_box.draw_on(bandB.finalize()), edgecolor='b', fill=0), 'asset B-band'))
>>> # Draw the box in image space
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='A Box in Virtual Video Space (This space is conceptually easy to work in)'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> def _tocanvas(img):
...     if img.dtype.kind == 'u':
...         return img
...     return kwimage.ensure_uint255(kwimage.fill_nans_with_checkers(img))
>>> row.append(kwimage.draw_header_text(vidspace_box.draw_on(_tocanvas(delayed_vidspace.finalize())), 'vidspace'))
>>> # Draw finalized aligned crops
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='Finalized delayed warp/crop. Left-to-Right: Original, Optimized, Difference'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> crop_opt_final = opt_crop_vidspace.finalize()
>>> crop_raw_final = crop_vidspace.finalize(optimize=False)
>>> row.append(crop_raw_final)
>>> row.append(crop_opt_final)
>>> row.append(kwimage.ensure_uint255(kwarray.normalize(np.linalg.norm(kwimage.ensure_float01(crop_opt_final) - kwimage.ensure_float01(crop_raw_final), axis=2))))
>>> # Get the transform that would bring us back to the leaf
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> row.append(kwimage.draw_text_on_image(None, text='The "Unwarped" / "Unscaled" cropped regions'))
>>> tostack_grid.append([]); row = tostack_grid[-1]
>>> for chosen_band in opt_crop_vidspace.parts:
>>>     spec = chosen_band.channels.spec
>>>     lut = {c[0]: c for c in ['red', 'green', 'blue']}
>>>     color = lut[spec]
>>>     print(ub.color_text('============', color))
>>>     print(ub.color_text(spec, color))
>>>     print(ub.color_text('============', color))
>>>     chosen_band.write_network_text()
>>>     tf_root_from_leaf = chosen_band.get_transform_from_leaf()
>>>     tf_leaf_from_root = tf_root_from_leaf.inv()
>>>     undo_all = tf_leaf_from_root
>>>     undo_scale = kwimage.Affine.coerce(ub.dict_diff(undo_all.concise(), ['offset', 'theta']))
>>>     print('tf_root_from_leaf = {}'.format(ub.repr2(tf_root_from_leaf.concise(), nl=1)))
>>>     print('undo_all = {}'.format(ub.repr2(undo_all.concise(), nl=1)))
>>>     print('undo_scale = {}'.format(ub.repr2(undo_scale.concise(), nl=1)))
>>>     print('Undone All')
>>>     undone_all = chosen_band.warp(undo_all, interpolation='lanczos').optimize()
>>>     undone_all.write_network_text()
>>>     # Discard translation components
>>>     print('Undone Scale')
>>>     undone_scale = chosen_band.warp(undo_scale).optimize()
>>>     undone_scale.write_network_text()
>>>     undone_all_canvas = undone_all.finalize()
>>>     undone_scale_canvas = undone_scale.finalize()
>>>     undone_all_canvas = crop_vidspace_box.warp(undo_all).draw_on(undone_all_canvas)
>>>     undone_scale_canvas = crop_vidspace_box.warp(undo_scale).draw_on(undone_scale_canvas)
>>>     undone_all_canvas = crop_vidspace_poly.warp(undo_all).draw_on(undone_all_canvas, edgecolor='b', fill=0)
>>>     undone_scale_canvas = crop_vidspace_poly.warp(undo_scale).draw_on(undone_scale_canvas, edgecolor='b', fill=0)
>>>     #row.append(kwimage.stack_images([undone_all_canvas, undone_scale_canvas], axis=0, bg_value=(5, 100, 10), pad=10))
>>>     row.append(undone_all_canvas)
>>>     row.append(undone_scale_canvas)
>>>     print(ub.color_text('============', color))
>>> #
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> tostack_grid = [[_tocanvas(c) for c in cols] for cols in tostack_grid]
>>> tostack_rows  = [kwimage.stack_images(cols, axis=1, bg_value=(5, 100, 10), pad=10) for cols in tostack_grid if cols]
>>> stack = kwimage.stack_images(tostack_rows, axis=0, bg_value=(5, 100, 10), pad=10)
>>> kwplot.imshow(stack, title='notice how the "undone all" crops are shifted to the right such that they align with the original image')
>>> kwplot.show_if_requested()
_images/fig_kwcoco_util_delayed_ops_004.jpeg
class kwcoco.util.delayed_ops.DelayedArray(subdata=None)[source]

Bases: DelayedUnaryOperation

A generic NDArray.

property shape

Returns: None | Tuple[int | None, …]

kwcoco.util.delayed_ops.DelayedArray2

alias of DelayedArray

class kwcoco.util.delayed_ops.DelayedAsXarray(subdata=None, dsize=None, channels=None)[source]

Bases: DelayedImage

Casts the data to an xarray object in the finalize step

Example;
>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> # without channels
>>> base = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> self = base.as_xarray()
>>> final = self._validate().finalize()
>>> assert len(final.coords) == 0
>>> assert final.dims == ('y', 'x', 'c')
>>> # with channels
>>> base = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare()
>>> self = base.as_xarray()
>>> final = self._validate().finalize()
>>> assert final.coords.indexes['c'].tolist() == ['r', 'g', 'b']
>>> assert final.dims == ('y', 'x', 'c')
optimize()[source]
Returns

DelayedImage

kwcoco.util.delayed_ops.DelayedAsXarray2

alias of DelayedAsXarray

class kwcoco.util.delayed_ops.DelayedChannelConcat(parts, dsize=None)[source]

Bases: ImageOpsMixin, DelayedConcat

Stacks multiple arrays together.

CommandLine

xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/delayed_ops/delayed_nodes.py DelayedChannelConcat:1

Example

>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> import kwcoco
>>> dsize = (307, 311)
>>> c1 = DelayedNans(dsize=dsize, channels='foo')
>>> c2 = DelayedLoad.demo('astro', dsize=dsize, channels='R|G|B').prepare()
>>> cat = DelayedChannelConcat([c1, c2])
>>> warped_cat = cat.warp({'scale': 1.07}, dsize=(328, 332))
>>> warped_cat._validate()
>>> warped_cat.finalize()

Example

>>> # Test case that failed in initial implementation
>>> # Due to incorrectly pushing channel selection under the concat
>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath()
>>> base1 = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> base2 = DelayedLoad(fpath, channels='x|y|z').prepare().scale(2)
>>> base3 = DelayedLoad(fpath, channels='i|j|k').prepare().scale(2)
>>> bands = [base2, base1[:, :, 0].scale(2).evaluate(),
>>>          base1[:, :, 1].evaluate().scale(2),
>>>          base1[:, :, 2].evaluate().scale(2), base3]
>>> delayed = DelayedChannelConcat(bands)
>>> delayed = delayed.warp({'scale': 2})
>>> delayed = delayed[0:100, 0:55, [0, 2, 4]]
>>> delayed.write_network_text()
>>> delayed.optimize()
property channels

Returns: None | kwcoco.FusedChannelSpec

property shape

Returns: Tuple[int | None, int | None, int | None]

optimize()[source]
Returns

DelayedImage

take_channels(channels)[source]

This method returns a subset of the vision data with only the specified bands / channels.

Parameters

channels (List[int] | slice | channel_spec.FusedChannelSpec) – List of integers indexes, a slice, or a channel spec, which is typically a pipe (|) delimited list of channel codes. See kwcoco.ChannelSpec for more detials.

Returns

a delayed vision operation that only operates on the following channels.

Return type

DelayedArray

Example

>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> self = delayed = dset.coco_image(1).delay(mode=1)
>>> channels = 'B11|B8|B1|B10'
>>> new = self.take_channels(channels)

Example

>>> # Complex case
>>> import kwcoco
>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = dset.coco_image(1).delay(mode=1)
>>> astro = DelayedLoad.demo('astro', channels='r|g|b').prepare()
>>> aligned = astro.warp(kwimage.Affine.scale(600 / 512), dsize='auto')
>>> self = combo = DelayedChannelConcat(delayed.parts + [aligned])
>>> channels = 'B1|r|B8|g'
>>> new = self.take_channels(channels)
>>> new_cropped = new.crop((slice(10, 200), slice(12, 350)))
>>> new_opt = new_cropped.optimize()
>>> datas = new_opt.finalize()
>>> if 1:
>>>     new_cropped.write_network_text(with_labels='name')
>>>     new_opt.write_network_text(with_labels='name')
>>> vizable = kwimage.normalize_intensity(datas, axis=2)
>>> self._validate()
>>> new._validate()
>>> new_cropped._validate()
>>> new_opt._validate()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> stacked = kwimage.stack_images(vizable.transpose(2, 0, 1))
>>> kwplot.imshow(stacked)
_images/fig_kwcoco_util_delayed_ops_DelayedChannelConcat_take_channels_002.jpeg

Example

>>> # Test case where requested channel does not exist
>>> import kwcoco
>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', use_cache=1, verbose=100)
>>> self = delayed = dset.coco_image(1).delay(mode=1)
>>> channels = 'B1|foobar|bazbiz|B8'
>>> new = self.take_channels(channels)
>>> new_cropped = new.crop((slice(10, 200), slice(12, 350)))
>>> fused = new_cropped.finalize()
>>> assert fused.shape == (190, 338, 4)
>>> assert np.all(np.isnan(fused[..., 1:3]))
>>> assert not np.any(np.isnan(fused[..., 0]))
>>> assert not np.any(np.isnan(fused[..., 3]))
property num_overviews

Returns: int

as_xarray()[source]
Returns

DelayedAsXarray

undo_warps(remove=None, retain=None, squash_nans=False, return_warps=False)[source]

Attempts to “undo” warping for each concatenated channel and returns a list of delayed operations that are cropped to the right regions.

Typically you will retrain offset, theta, and shear to remove scale. This ensures the data is spatially aligned up to a scale factor.

Parameters
  • remove (List[str]) – if specified, list components of the warping to remove. Can include: “offset”, “scale”, “shearx”, “theta”. Typically set this to [“scale”].

  • retain (List[str]) – if specified, list components of the warping to retain. Can include: “offset”, “scale”, “shearx”, “theta”. Mutually exclusive with “remove”. If neither remove or retain is specified, retain is set to [].

  • squash_nans (bool) – if True, pure nan channels are squashed into a 1x1 array as they do not correspond to a real source.

  • return_warps (bool) – if True, return the transforms we applied. This is useful when you need to warp objects in the original space into the jagged space.

Example

>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedNans
>>> import ubelt as ub
>>> import kwimage
>>> import kwarray
>>> import numpy as np
>>> # Demo case where we have different channels at different resolutions
>>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255})
>>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate()
>>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate()
>>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate()
>>> bandN = DelayedNans((600, 600), channels='N')
>>> # Make a concatenation of images of different underlying native resolutions
>>> delayed_vidspace = DelayedChannelConcat([
>>>     bandR.scale(6, dsize=(600, 600)).optimize(),
>>>     bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize(),
>>>     bandB.scale(1, dsize=(600, 600)).optimize(),
>>>     bandN,
>>> ]).warp({'scale': 0.7}).optimize()
>>> vidspace_box = kwimage.Boxes([[100, 10, 270, 160]], 'ltrb')
>>> vidspace_poly = vidspace_box.to_polygons()[0]
>>> vidspace_slice = vidspace_box.to_slices()[0]
>>> self = delayed_vidspace[vidspace_slice].optimize()
>>> print('--- Aligned --- ')
>>> self.write_network_text()
>>> squash_nans = True
>>> undone_all_parts, tfs1 = self.undo_warps(squash_nans=squash_nans, return_warps=True)
>>> undone_scale_parts, tfs2 = self.undo_warps(remove=['scale'], squash_nans=squash_nans, return_warps=True)
>>> stackable_aligned = self.finalize().transpose(2, 0, 1)
>>> stackable_undone_all = []
>>> stackable_undone_scale = []
>>> print('--- Undone All --- ')
>>> for undone in undone_all_parts:
...     undone.write_network_text()
...     stackable_undone_all.append(undone.finalize())
>>> print('--- Undone Scale --- ')
>>> for undone in undone_scale_parts:
...     undone.write_network_text()
...     stackable_undone_scale.append(undone.finalize())
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1)
>>> canvas1 = kwimage.stack_images(stackable_undone_all, axis=1)
>>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1)
>>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Aligned Channels')
>>> canvas1 = kwimage.draw_header_text(canvas1, 'Unwarped Channels')
>>> canvas2 = kwimage.draw_header_text(canvas2, 'Unscaled Channels')
>>> canvas = kwimage.stack_images([canvas0, canvas1, canvas2], axis=0)
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas)
_images/fig_kwcoco_util_delayed_ops_DelayedChannelConcat_undo_warps_002.jpeg
kwcoco.util.delayed_ops.DelayedChannelConcat2

alias of DelayedChannelConcat

class kwcoco.util.delayed_ops.DelayedConcat(parts, axis)[source]

Bases: DelayedNaryOperation

Stacks multiple arrays together.

property shape

Returns: None | Tuple[int | None, …]

kwcoco.util.delayed_ops.DelayedConcat2

alias of DelayedConcat

class kwcoco.util.delayed_ops.DelayedCrop(subdata, space_slice=None, chan_idxs=None)[source]

Bases: DelayedImage

Crops an image along integer pixel coordinates.

Example

>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> base = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> # Test Fuse Crops Space Only
>>> crop1 = base[4:12, 0:16]
>>> self = crop1[2:6, 0:8]
>>> opt = self._opt_fuse_crops()
>>> self.write_network_text()
>>> opt.write_network_text()
>>> #
>>> # Test Channel Select Via Index
>>> self = base[:, :, [0]]
>>> self.write_network_text()
>>> final = self._finalize()
>>> assert final.shape == (16, 16, 1)
>>> assert base[:, :, [0, 1]].finalize().shape == (16, 16, 2)
>>> assert base[:, :, [2, 0, 1]].finalize().shape == (16, 16, 3)
optimize()[source]
Returns

DelayedImage

Example

>>> # Test optimize nans
>>> from kwcoco.util.delayed_ops import DelayedNans
>>> import kwimage
>>> base = DelayedNans(dsize=(100, 100), channels='a|b|c')
>>> self = base[0:10, 0:5]
>>> # Should simply return a new nan generator
>>> new = self.optimize()
>>> self.write_network_text()
>>> new.write_network_text()
>>> assert len(new.as_graph().nodes) == 1
kwcoco.util.delayed_ops.DelayedCrop2

alias of DelayedCrop

class kwcoco.util.delayed_ops.DelayedDequantize(subdata, quantization)[source]

Bases: DelayedImage

Rescales image intensities from int to floats.

The output is usually between 0 and 1. This also handles transforming nodata into nan values.

optimize()[source]
Returns

DelayedImage

Example

>>> # Test a case that caused an error in development
>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> fpath = kwimage.grab_test_image_fpath()
>>> base = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> quantization = {'quant_max': 255, 'nodata': 0}
>>> self = base.get_overview(1).dequantize(quantization)
>>> self.write_network_text()
>>> opt = self.optimize()
kwcoco.util.delayed_ops.DelayedDequantize2

alias of DelayedDequantize

class kwcoco.util.delayed_ops.DelayedFrameStack(parts)[source]

Bases: DelayedStack

Stacks multiple arrays together.

kwcoco.util.delayed_ops.DelayedFrameStack2

alias of DelayedFrameStack

class kwcoco.util.delayed_ops.DelayedIdentity(data, channels=None, dsize=None)[source]

Bases: DelayedImageLeaf

Returns an ndarray as-is

Example

self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce(‘rgb’)) region_slices = (slice(5, 10), slice(1, 12)) delayed = self.crop(region_slices)

Example

>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import kwcoco
>>> arr = kwimage.checkerboard()
>>> self = DelayedIdentity(arr, channels='gray')
>>> warp = self.warp({'scale': 1.07})
>>> warp.optimize().finalize()
kwcoco.util.delayed_ops.DelayedIdentity2

alias of DelayedIdentity

class kwcoco.util.delayed_ops.DelayedImage(subdata=None, dsize=None, channels=None)[source]

Bases: ImageOpsMixin, DelayedArray

For the case where an array represents a 2D image with multiple channels

property shape

Returns: None | Tuple[int | None, int | None, int | None]

property num_channels

Returns: None | int

property dsize

Returns: None | Tuple[int | None, int | None]

property channels

Returns: None | kwcoco.FusedChannelSpec

property num_overviews

Returns: int

take_channels(channels)[source]

This method returns a subset of the vision data with only the specified bands / channels.

Parameters

channels (List[int] | slice | channel_spec.FusedChannelSpec) – List of integers indexes, a slice, or a channel spec, which is typically a pipe (|) delimited list of channel codes. See kwcoco.ChannelSpec for more detials.

Returns

a new delayed load with a fused take channel operation

Return type

DelayedCrop

Note

The channel subset must exist here or it will raise an error. A better implementation (via pymbolic) might be able to do better

Example

>>> #
>>> # Test Channel Select Via Code
>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> self = DelayedLoad.demo(dsize=(16, 16), channels='r|g|b').prepare()
>>> channels = 'r|b'
>>> new = self.take_channels(channels)._validate()
>>> new2 = new[:, :, [1, 0]]._validate()
>>> new3 = new2[:, :, [1]]._validate()

Example

>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwcoco
>>> self = DelayedLoad.demo('astro').prepare()
>>> channels = [2, 0]
>>> new = self.take_channels(channels)
>>> new3 = new.take_channels([1, 0])
>>> new._validate()
>>> new3._validate()
>>> final1 = self.finalize()
>>> final2 = new.finalize()
>>> final3 = new3.finalize()
>>> assert np.all(final1[..., 2] == final2[..., 0])
>>> assert np.all(final1[..., 0] == final2[..., 1])
>>> assert final2.shape[2] == 2
>>> assert np.all(final1[..., 2] == final3[..., 1])
>>> assert np.all(final1[..., 0] == final3[..., 0])
>>> assert final3.shape[2] == 2
get_transform_from_leaf()[source]

Returns the transformation that would align data with the leaf

evaluate()[source]

Evaluate this node and return the data as an identity.

Returns

DelayedIdentity

undo_warp(remove=None, retain=None, squash_nans=False, return_warp=False)[source]

Attempts to “undo” warping for each concatenated channel and returns a list of delayed operations that are cropped to the right regions.

Typically you will retrain offset, theta, and shear to remove scale. This ensures the data is spatially aligned up to a scale factor.

Parameters
  • remove (List[str]) – if specified, list components of the warping to remove. Can include: “offset”, “scale”, “shearx”, “theta”. Typically set this to [“scale”].

  • retain (List[str]) – if specified, list components of the warping to retain. Can include: “offset”, “scale”, “shearx”, “theta”. Mutually exclusive with “remove”. If neither remove or retain is specified, retain is set to [].

  • squash_nans (bool) – if True, pure nan channels are squashed into a 1x1 array as they do not correspond to a real source.

  • return_warp (bool) – if True, return the transform we applied. This is useful when you need to warp objects in the original space into the jagged space.

SeeAlso:

DelayedChannelConcat.undo_warps

Example

>>> # Test similar to undo_warps, but on each channel separately
>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedLoad
>>> from kwcoco.util.delayed_ops.delayed_leafs import DelayedNans
>>> import ubelt as ub
>>> import kwimage
>>> import kwarray
>>> import numpy as np
>>> # Demo case where we have different channels at different resolutions
>>> base = DelayedLoad.demo(channels='r|g|b').prepare().dequantize({'quant_max': 255})
>>> bandR = base[:, :, 0].scale(100 / 512)[:, :-50].evaluate()
>>> bandG = base[:, :, 1].scale(300 / 512).warp({'theta': np.pi / 8, 'about': (150, 150)}).evaluate()
>>> bandB = base[:, :, 2].scale(600 / 512)[:150, :].evaluate()
>>> bandN = DelayedNans((600, 600), channels='N')
>>> B0 = bandR.scale(6, dsize=(600, 600)).optimize()
>>> B1 = bandG.warp({'theta': -np.pi / 8, 'about': (150, 150)}).scale(2, dsize=(600, 600)).optimize()
>>> B2 = bandB.scale(1, dsize=(600, 600)).optimize()
>>> vidspace_box = kwimage.Boxes([[-10, -10, 270, 160]], 'ltrb').scale(1 / .7).quantize()
>>> vidspace_poly = vidspace_box.to_polygons()[0]
>>> vidspace_slice = vidspace_box.to_slices()[0]
>>> # Test with the padded crop
>>> self0 = B0.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize()
>>> self1 = B1.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize()
>>> self2 = B2.crop(vidspace_slice, wrap=0, clip=0, pad=10).optimize()
>>> parts = [self0, self1, self2]
>>> # Run the undo on each channel
>>> undone_scale_parts = [d.undo_warp(remove=['scale']) for d in parts]
>>> print('--- Aligned --- ')
>>> stackable_aligned = []
>>> for d in parts:
>>>     d.write_network_text()
>>>     stackable_aligned.append(d.finalize())
>>> print('--- Undone Scale --- ')
>>> stackable_undone_scale = []
>>> for undone in undone_scale_parts:
...     undone.write_network_text()
...     stackable_undone_scale.append(undone.finalize())
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas0 = kwimage.stack_images(stackable_aligned, axis=1, pad=5, bg_value='kw_darkgray')
>>> canvas2 = kwimage.stack_images(stackable_undone_scale, axis=1, pad=5, bg_value='kw_darkgray')
>>> canvas0 = kwimage.draw_header_text(canvas0, 'Rescaled Channels')
>>> canvas2 = kwimage.draw_header_text(canvas2, 'Native Scale Channels')
>>> canvas = kwimage.stack_images([canvas0, canvas2], axis=0, bg_value='kw_darkgray')
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas)
_images/fig_kwcoco_util_delayed_ops_DelayedImage_undo_warp_002.jpeg
kwcoco.util.delayed_ops.DelayedImage2

alias of DelayedImage

class kwcoco.util.delayed_ops.DelayedImageLeaf(subdata=None, dsize=None, channels=None)[source]

Bases: DelayedImage

get_transform_from_leaf()[source]

Returns the transformation that would align data with the leaf

Returns

kwimage.Affine

optimize()[source]
kwcoco.util.delayed_ops.DelayedImageLeaf2

alias of DelayedImageLeaf

class kwcoco.util.delayed_ops.DelayedLoad(fpath, channels=None, dsize=None, nodata_method=None)[source]

Bases: DelayedImageLeaf

Reads an image from disk.

If a gdal backend is available, and the underlying image is in the appropriate formate (e.g. COG) this will return a lazy reference that enables fast overviews and crops.

Example

>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> self = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> data1 = self.finalize()

Example

>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Demo code to develop support for overviews
>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import kwimage
>>> import ubelt as ub
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> self = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> print(f'self={self}')
>>> print('self.meta = {}'.format(ub.repr2(self.meta, nl=1)))
>>> quantization = {
>>>     'quant_max': 255,
>>>     'nodata': 0,
>>> }
>>> node0 = self
>>> node1 = node0.get_overview(2)
>>> node2 = node1[13:900, 11:700]
>>> node3 = node2.dequantize(quantization)
>>> node4 = node3.warp({'scale': 0.05})
>>> #
>>> data0 = node0._validate().finalize()
>>> data1 = node1._validate().finalize()
>>> data2 = node2._validate().finalize()
>>> data3 = node3._validate().finalize()
>>> data4 = node4._validate().finalize()
>>> node4.write_network_text()

Example

>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test delayed ops with int16 and nodata values
>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import kwimage
>>> from kwcoco.util.delayed_ops.helpers import quantize_float01
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('kwcoco/tests/test_delay_nodata').ensuredir()
>>> fpath = dpath / 'data.tif'
>>> data = kwimage.ensure_float01(kwimage.grab_test_image())
>>> poly = kwimage.Polygon.random(rng=321032).scale(data.shape[0])
>>> poly.fill(data, np.nan)
>>> data_uint16, quantization = quantize_float01(data)
>>> nodata = quantization['nodata']
>>> kwimage.imwrite(fpath, data_uint16, nodata=nodata, backend='gdal', overviews=3)
>>> # Test loading the data
>>> self = DelayedLoad(fpath, channels='r|g|b', nodata_method='float').prepare()
>>> node0 = self
>>> node1 = node0.dequantize(quantization)
>>> node2 = node1.warp({'scale': 0.51}, interpolation='lanczos')
>>> node3 = node2[13:900, 11:700]
>>> node4 = node3.warp({'scale': 0.9}, interpolation='lanczos')
>>> node4.write_network_text()
>>> node5 = node4.optimize()
>>> node5.write_network_text()
>>> node6 = node5.warp({'scale': 8}, interpolation='lanczos').optimize()
>>> node6.write_network_text()
>>> #
>>> data0 = node0._validate().finalize()
>>> data1 = node1._validate().finalize()
>>> data2 = node2._validate().finalize()
>>> data3 = node3._validate().finalize()
>>> data4 = node4._validate().finalize()
>>> data5 = node5._validate().finalize()
>>> data6 = node6._validate().finalize()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> stack1 = kwimage.stack_images([data1, data2, data3, data4, data5])
>>> stack2 = kwimage.stack_images([stack1, data6], axis=1)
>>> kwplot.imshow(stack2)
_images/fig_kwcoco_util_delayed_ops_DelayedLoad_002.jpeg
property fpath
classmethod demo(key='astro', dsize=None, channels=None)[source]
prepare()[source]

If metadata is missing, perform minimal IO operations in order to prepopulate metadata that could help us better optimize the operation tree.

Returns

DelayedLoad

kwcoco.util.delayed_ops.DelayedLoad2

alias of DelayedLoad

class kwcoco.util.delayed_ops.DelayedNans(dsize=None, channels=None)[source]

Bases: DelayedImageLeaf

Constructs nan channels as needed

Example

self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce(‘rgb’)) region_slices = (slice(5, 10), slice(1, 12)) delayed = self.crop(region_slices)

Example

>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import kwcoco
>>> dsize = (307, 311)
>>> c1 = DelayedNans(dsize=dsize, channels='foo')
>>> c2 = DelayedLoad.demo('astro', dsize=dsize, channels='R|G|B').prepare()
>>> cat = DelayedChannelConcat([c1, c2])
>>> warped_cat = cat.warp({'scale': 1.07}, dsize=(328, 332))._validate()
>>> warped_cat._validate().optimize().finalize()
kwcoco.util.delayed_ops.DelayedNans2

alias of DelayedNans

class kwcoco.util.delayed_ops.DelayedNaryOperation(parts)[source]

Bases: DelayedOperation

For operations that have multiple input arrays

children()[source]
Yields

Any

kwcoco.util.delayed_ops.DelayedNaryOperation2

alias of DelayedNaryOperation

class kwcoco.util.delayed_ops.DelayedOperation[source]

Bases: NiceRepr

nesting()[source]
Returns

Dict[str, dict]

as_graph()[source]
Returns

networkx.DiGraph

write_network_text(with_labels=True)[source]
property shape

Returns: None | Tuple[int | None, …]

children()[source]
Yields

Any

prepare()[source]

If metadata is missing, perform minimal IO operations in order to prepopulate metadata that could help us better optimize the operation tree.

Returns

DelayedOperation2

finalize(prepare=True, optimize=True, **kwargs)[source]

Evaluate the operation tree in full.

Parameters
  • prepare (bool) – ensure prepare is called to ensure metadata exists if possible before optimizing. Defaults to True.

  • optimize (bool) – ensure the graph is optimized before loading. Default to True.

  • **kwargs – for backwards compatibility, these will allow for in-place modification of select nested parameters. In general these should not be used, and may be deprecated.

Returns

ArrayLike

Notes

Do not overload this method. Overload DelayedOperation2._finalize() instead.

optimize()[source]
Returns

DelayedOperation2

kwcoco.util.delayed_ops.DelayedOperation2

alias of DelayedOperation

class kwcoco.util.delayed_ops.DelayedOverview(subdata, overview)[source]

Bases: DelayedImage

Downsamples an image by a factor of two.

If the underlying image being loaded has precomputed overviews it simply loads these instead of downsampling the original image, which is more efficient.

Example

>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Make a complex chain of operations and optimize it
>>> from kwcoco.util.delayed_ops import *  # NOQA
>>> import kwimage
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> dimg = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> dimg = dimg.get_overview(1)
>>> dimg = dimg.get_overview(1)
>>> dimg = dimg.get_overview(1)
>>> dopt = dimg.optimize()
>>> if 1:
>>>     import networkx as nx
>>>     dimg.write_network_text()
>>>     dopt.write_network_text()
>>> print(ub.repr2(dopt.nesting(), nl=-1, sort=0))
>>> final0 = dimg._finalize()[:]
>>> final1 = dopt._finalize()[:]
>>> assert final0.shape == final1.shape
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final0, pnum=(1, 2, 1), fnum=1, title='raw')
>>> kwplot.imshow(final1, pnum=(1, 2, 2), fnum=1, title='optimized')
_images/fig_kwcoco_util_delayed_ops_DelayedOverview_002.jpeg
property num_overviews

Returns: int

optimize()[source]
Returns

DelayedImage

kwcoco.util.delayed_ops.DelayedOverview2

alias of DelayedOverview

class kwcoco.util.delayed_ops.DelayedStack(parts, axis)[source]

Bases: DelayedNaryOperation

Stacks multiple arrays together.

property shape

Returns: None | Tuple[int | None, …]

kwcoco.util.delayed_ops.DelayedStack2

alias of DelayedStack

class kwcoco.util.delayed_ops.DelayedUnaryOperation(subdata)[source]

Bases: DelayedOperation

For operations that have a single input array

children()[source]
Yields

Any

kwcoco.util.delayed_ops.DelayedUnaryOperation2

alias of DelayedUnaryOperation

class kwcoco.util.delayed_ops.DelayedWarp(subdata, transform, dsize='auto', antialias=True, interpolation='linear', border_value='auto')[source]

Bases: DelayedImage

Applies an affine transform to an image.

Example

>>> from kwcoco.util.delayed_ops.delayed_nodes import *  # NOQA
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> self = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> warp1 = self.warp({'scale': 3})
>>> warp2 = warp1.warp({'theta': 0.1})
>>> warp3 = warp2._opt_fuse_warps()
>>> warp3._validate()
>>> print(ub.repr2(warp2.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(warp3.nesting(), nl=-1, sort=0))
property transform

Returns: kwimage.Affine

optimize()[source]
Returns

DelayedImage

Example

>>> # Demo optimization that removes a noop warp
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> base = DelayedLoad.demo(channels='r|g|b').prepare()
>>> self = base.warp(kwimage.Affine.eye())
>>> new = self.optimize()
>>> assert len(self.as_graph().nodes) == 2
>>> assert len(new.as_graph().nodes) == 1

Example

>>> # Test optimize nans
>>> from kwcoco.util.delayed_ops import DelayedNans
>>> import kwimage
>>> base = DelayedNans(dsize=(100, 100), channels='a|b|c')
>>> self = base.warp(kwimage.Affine.scale(0.1))
>>> # Should simply return a new nan generator
>>> new = self.optimize()
>>> assert len(new.as_graph().nodes) == 1
kwcoco.util.delayed_ops.DelayedWarp2

alias of DelayedWarp

class kwcoco.util.delayed_ops.ImageOpsMixin[source]

Bases: object

crop(space_slice=None, chan_idxs=None, clip=True, wrap=True, pad=0)[source]

Crops an image along integer pixel coordinates.

Parameters
  • space_slice (Tuple[slice, slice]) – y-slice and x-slice.

  • chan_idxs (List[int]) – indexes of bands to take

  • clip (bool) – if True, the slice is interpreted normally, where it won’t go past the image extent, otherwise slicing into negative regions or past the image bounds will result in padding. Defaults to True.

  • wrap (bool) – if True, negative indexes “wrap around”, otherwise they are treated as is. Defaults to True.

  • pad (int | List[Tuple[int, int]]) – if specified, applies extra padding

Returns

DelayedImage

Example

>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> self = DelayedLoad.demo().prepare()
>>> self = self.dequantize({'quant_max': 255})
>>> self = self.warp({'scale': 1 / 2})
>>> pad = 0
>>> h, w = space_dims = self.dsize[::-1]
>>> grid = list(ub.named_product({
>>>     'left': [0, -64], 'right': [0, 64],
>>>     'top': [0, -64], 'bot': [0, 64],}))
>>> grid += [
>>>     {'left': 64, 'right': -64, 'top': 0, 'bot': 0},
>>>     {'left': 64, 'right': 64, 'top': 0, 'bot': 0},
>>>     {'left': 0, 'right': 0, 'top': 64, 'bot': -64},
>>>     {'left': 64, 'right': -64, 'top': 64, 'bot': -64},
>>> ]
>>> crops = []
>>> for pads in grid:
>>>     space_slice = (slice(pads['top'], h + pads['bot']),
>>>                    slice(pads['left'], w + pads['right']))
>>>     delayed = self.crop(space_slice)
>>>     crop = delayed.finalize()
>>>     yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0]
>>>     title = '[{}:{}, {}:{}]'.format(*yyxx)
>>>     crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray')
>>>     crops.append(crop_canvas)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen')
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas, title='Normal Slicing: Cropped Images With Wrap+Clipped Slices', doclf=1, fnum=1)
>>> kwplot.show_if_requested()
_images/fig_kwcoco_util_delayed_ops_ImageOpsMixin_crop_002.jpeg

Example

>>> # Demo the case with pads / no-clips / no-wraps
>>> from kwcoco.util.delayed_ops import DelayedLoad
>>> import kwimage
>>> self = DelayedLoad.demo().prepare()
>>> self = self.dequantize({'quant_max': 255})
>>> self = self.warp({'scale': 1 / 2})
>>> pad = [(64, 128), (32, 96)]
>>> pad = [(0, 20), (0, 0)]
>>> pad = 0
>>> pad = 8
>>> h, w = space_dims = self.dsize[::-1]
>>> grid = list(ub.named_product({
>>>     'left': [0, -64], 'right': [0, 64],
>>>     'top': [0, -64], 'bot': [0, 64],}))
>>> grid += [
>>>     {'left': 64, 'right': -64, 'top': 0, 'bot': 0},
>>>     {'left': 64, 'right': 64, 'top': 0, 'bot': 0},
>>>     {'left': 0, 'right': 0, 'top': 64, 'bot': -64},
>>>     {'left': 64, 'right': -64, 'top': 64, 'bot': -64},
>>> ]
>>> crops = []
>>> for pads in grid:
>>>     space_slice = (slice(pads['top'], h + pads['bot']),
>>>                    slice(pads['left'], w + pads['right']))
>>>     delayed = self._padded_crop(space_slice, pad=pad)
>>>     crop = delayed.finalize(optimize=1)
>>>     yyxx = kwimage.Boxes.from_slice(space_slice, wrap=False, clip=0).toformat('_yyxx').data[0]
>>>     title = '[{}:{}, {}:{}]'.format(*yyxx)
>>>     if pad:
>>>         title += f'{chr(10)}pad={pad}'
>>>     crop_canvas = kwimage.draw_header_text(crop, title, fit=True, bg_color='kw_darkgray')
>>>     crops.append(crop_canvas)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> canvas = kwimage.stack_images_grid(crops, pad=16, bg_value='kw_darkgreen', resize='smaller')
>>> canvas = kwimage.fill_nans_with_checkers(canvas)
>>> kwplot.imshow(canvas, title='Negative Slicing: Cropped Images With clip=False wrap=False', doclf=1, fnum=2)
>>> kwplot.show_if_requested()
_images/fig_kwcoco_util_delayed_ops_ImageOpsMixin_crop_003.jpeg
warp(transform, dsize='auto', antialias=True, interpolation='linear', border_value='auto')[source]

Applys an affine transformation to the image

Parameters
  • transform (ndarray | dict | kwimage.Affine) – a coercable affine matrix. See kwimage.Affine for details on what can be coerced.

  • dsize (Tuple[int, int] | str) – The width / height of the output canvas. If ‘auto’, dsize is computed such that the positive coordinates of the warped image will fit in the new canvas. In this case, any pixel that maps to a negative coordinate will be clipped. This has the property that the input transformation is not modified.

  • antialias (bool) – if True determines if the transform is downsampling and applies antialiasing via gaussian a blur. Defaults to False

  • interpolation (str) – interpolation code or cv2 integer. Interpolation codes are linear, nearest, cubic, lancsoz, and area. Defaults to “linear”.

  • border_value (int | float | str) – if auto will be nan for float and 0 for int.

Returns

DelayedImage

scale(scale, dsize='auto', antialias=True, interpolation='linear', border_value='auto')[source]

An alias for self.warp({“scale”: scale}, …)

dequantize(quantization)[source]

Rescales image intensities from int to floats.

Parameters

quantization (Dict[str, Any]) – see kwcoco.util.delayed_ops.helpers.dequantize()

Returns

DelayedDequantize

get_overview(overview)[source]

Downsamples an image by a factor of two.

Parameters

overview (int) – the overview to use (assuming it exists)

Returns

DelayedOverview

as_xarray()[source]
Returns

DelayedAsXarray