kwcoco.util.util_delayed_poc

This module is ported from ndsampler, and will likely eventually move to kwimage and be refactored using pymbolic

The classes in this file represent a tree of delayed operations.

Proof of concept for delayed chainable transforms in Python.

There are several optimizations that could be applied.

This is similar to GDAL’s virtual raster table, but it works in memory and I think it is easier to chain operations.

SeeAlso:

../../dev/symbolic_delayed.py

Warning

As the name implies this is a proof of concept, and the actual implementation was hacked together too quickly. Serious refactoring will be necessary.

Concepts:

Each class should be a layer that adds a new transformation on top of underlying nested layers. Adding new layers should be quick, and there should always be the option to “finalize” a stack of layers, chaining the transforms / operations and then applying one final efficient transform at the end.

Todo

  • [ ] Need to handle masks / nodata values when warping. Might need to

    rely more on gdal / rasterio for this.

Conventions:

  • dsize = (always in width / height), no channels are present

  • shape for images is always (height, width, channels)

  • channels are always the last dimension of each image, if no channel dim is specified, finalize will add it.

  • Videos must be the last process in the stack, and add a leading

    time dimension to the shape. dsize is still width, height, but shape is now: (time, height, width, chan)

Example

>>> # Example demonstrating the modivating use case
>>> # We have multiple aligned frames for a video, but each of
>>> # those frames is in a different resolution. Furthermore,
>>> # each of the frames consists of channels in different resolutions.
>>> # Create raw channels in some "native" resolution for frame 1
>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> f1_chan1 = DelayedIdentity.demo('astro', chan=0, dsize=(300, 300))
>>> f1_chan2 = DelayedIdentity.demo('astro', chan=1, dsize=(200, 200))
>>> f1_chan3 = DelayedIdentity.demo('astro', chan=2, dsize=(10, 10))
>>> # Create raw channels in some "native" resolution for frame 2
>>> f2_chan1 = DelayedIdentity.demo('carl', dsize=(64, 64), chan=0)
>>> f2_chan2 = DelayedIdentity.demo('carl', dsize=(260, 260), chan=1)
>>> f2_chan3 = DelayedIdentity.demo('carl', dsize=(10, 10), chan=2)
>>> #
>>> # Delayed warp each channel into its "image" space
>>> # Note: the images never actually enter this space we transform through it
>>> f1_dsize = np.array((3, 3))
>>> f2_dsize = np.array((2, 2))
>>> f1_img = DelayedChannelConcat([
>>>     f1_chan1.delayed_warp(kwimage.Affine.scale(f1_dsize / f1_chan1.dsize), dsize=f1_dsize),
>>>     f1_chan2.delayed_warp(kwimage.Affine.scale(f1_dsize / f1_chan2.dsize), dsize=f1_dsize),
>>>     f1_chan3.delayed_warp(kwimage.Affine.scale(f1_dsize / f1_chan3.dsize), dsize=f1_dsize),
>>> ])
>>> f2_img = DelayedChannelConcat([
>>>     f2_chan1.delayed_warp(kwimage.Affine.scale(f2_dsize / f2_chan1.dsize), dsize=f2_dsize),
>>>     f2_chan2.delayed_warp(kwimage.Affine.scale(f2_dsize / f2_chan2.dsize), dsize=f2_dsize),
>>>     f2_chan3.delayed_warp(kwimage.Affine.scale(f2_dsize / f2_chan3.dsize), dsize=f2_dsize),
>>> ])
>>> # Combine frames into a video
>>> vid_dsize = np.array((280, 280))
>>> vid = DelayedFrameConcat([
>>>     f1_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f1_img.dsize), dsize=vid_dsize),
>>>     f2_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f2_img.dsize), dsize=vid_dsize),
>>> ])
>>> vid.nesting
>>> print('vid.nesting = {}'.format(ub.repr2(vid.__json__(), nl=-2)))
>>> final = vid.finalize(interpolation='nearest')
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final[0], pnum=(1, 2, 1), fnum=1)
>>> kwplot.imshow(final[1], pnum=(1, 2, 2), fnum=1)

Example

>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = dset.delayed_load(1)
>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> astro = DelayedLoad.demo('astro')
>>> print('MSI = ' + ub.repr2(delayed.__json__(), nl=-3, sort=0))
>>> print('ASTRO = ' + ub.repr2(astro.__json__(), nl=2, sort=0))
>>> subchan = delayed.take_channels('B1|B8')
>>> subcrop = subchan.delayed_crop((slice(10, 80), slice(30, 50)))
>>> #
>>> subcrop.nesting()
>>> subchan.nesting()
>>> subchan.finalize()
>>> subcrop.finalize()
>>> #
>>> msi_crop = delayed.delayed_crop((slice(10, 80), slice(30, 50)))
>>> msi_warp = msi_crop.delayed_warp(kwimage.Affine.scale(3), dsize='auto')
>>> subdata = msi_warp.take_channels('B11|B1')
>>> final = subdata.finalize()
>>> assert final.shape == (210, 60, 2)

Example

>>> # test case where an auxiliary image does not map entirely on the image.
>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> import kwimage
>>> from os.path import join
>>> dpath = ub.ensure_app_cache_dir('kwcoco/tests/delayed_poc')
>>> chan1_fpath = join(dpath, 'chan1.tiff')
>>> chan2_fpath = join(dpath, 'chan2.tiff')
>>> chan3_fpath = join(dpath, 'chan2.tiff')
>>> chan1_raw = np.random.rand(128, 128, 1)
>>> chan2_raw = np.random.rand(64, 64, 1)
>>> chan3_raw = np.random.rand(256, 256, 1)
>>> kwimage.imwrite(chan1_fpath, chan1_raw)
>>> kwimage.imwrite(chan2_fpath, chan2_raw)
>>> kwimage.imwrite(chan3_fpath, chan3_raw)
>>> #
>>> c1 = channel_spec.FusedChannelSpec.coerce('c1')
>>> c2 = channel_spec.FusedChannelSpec.coerce('c2')
>>> c3 = channel_spec.FusedChannelSpec.coerce('c2')
>>> aux1 = DelayedLoad(chan1_fpath, dsize=chan1_raw.shape[0:2][::-1], channels=c1, num_bands=1)
>>> aux2 = DelayedLoad(chan2_fpath, dsize=chan2_raw.shape[0:2][::-1], channels=c2, num_bands=1)
>>> aux3 = DelayedLoad(chan3_fpath, dsize=chan3_raw.shape[0:2][::-1], channels=c3, num_bands=1)
>>> #
>>> img_dsize = (128, 128)
>>> transform1 = kwimage.Affine.coerce(scale=0.5)
>>> transform2 = kwimage.Affine.coerce(theta=0.5, shearx=0.01, offset=(-20, -40))
>>> transform3 = kwimage.Affine.coerce(offset=(64, 0)) @ kwimage.Affine.random(rng=10)
>>> part1 = aux1.delayed_warp(np.eye(3), dsize=img_dsize)
>>> part2 = aux2.delayed_warp(transform2, dsize=img_dsize)
>>> part3 = aux3.delayed_warp(transform3, dsize=img_dsize)
>>> delayed = DelayedChannelConcat([part1, part2, part3])
>>> #
>>> delayed_crop = delayed.crop((slice(0, 10), slice(0, 10)))
>>> delayed_final = delayed_crop.finalize()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> final = delayed.finalize()
>>> kwplot.imshow(final, fnum=1, pnum=(1, 2, 1))
>>> kwplot.imshow(delayed_final, fnum=1, pnum=(1, 2, 2))

comp = delayed_crop.components[2]

comp.sub_data.finalize()

data = np.array([[0]]).astype(np.float32) kwimage.warp_affine(data, np.eye(3), dsize=(32, 32)) kwimage.warp_affine(data, np.eye(3))

kwimage.warp_affine(data[0:0], np.eye(3))

transform = kwimage.Affine.coerce(scale=0.1) data = np.array([[0]]).astype(np.float32)

data = np.array([[]]).astype(np.float32) kwimage.warp_affine(data, transform, dsize=(0, 2), antialias=True)

data = np.array([[]]).astype(np.float32) kwimage.warp_affine(data, transform, dsize=(10, 10))

data = np.array([[0]]).astype(np.float32) kwimage.warp_affine(data, transform, dsize=(0, 2), antialias=True)

data = np.array([[0]]).astype(np.float32) kwimage.warp_affine(data, transform, dsize=(10, 10))

cv2.warpAffine(

kwimage.grab_test_image(dsize=(1, 1)), kwimage.Affine.coerce(scale=0.1).matrix[0:2], dsize=(0, 1),

)

Module Contents

Classes

DelayedVisionOperation

Base class for nodes in a tree of delayed computer-vision operations

DelayedVideoOperation

Base class for nodes in a tree of delayed computer-vision operations

DelayedImageOperation

Operations that pertain only to images

DelayedIdentity

Noop leaf that does nothing. Can be used to hold raw data.

DelayedNans

Constructs nan channels as needed

DelayedLoad

A load operation for a specific sub-region and sub-bands in a specified

DelayedFrameConcat

Represents multiple frames in a video

DelayedChannelConcat

Represents multiple channels in an image that could be concatenated

DelayedWarp

POC for chainable transforms

DelayedCrop

Represent a delayed crop operation

Functions

dequantize(quant_data, quantization)

Helper for dequantization

_compute_leaf_subcrop(root_region_bounds, tf_leaf_to_root)

Given a region in a "root" image and a trasnform between that "root" and

_largest_shape(shapes)

Finds maximum over all shapes

_devcheck_corner()

_auto_dsize(transform, sub_dsize)

Attributes

xr

profile

kwcoco.util.util_delayed_poc.xr[source]
kwcoco.util.util_delayed_poc.profile[source]
class kwcoco.util.util_delayed_poc.DelayedVisionOperation[source]

Bases: ubelt.NiceRepr

Base class for nodes in a tree of delayed computer-vision operations

__nice__(self)[source]
abstract finalize(self, **kwargs)[source]
abstract children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

_optimize_paths(self, **kwargs)[source]

Iterate through the leaf nodes, which are virtually transformed into the root space.

This returns some sort of hueristically optimized leaf repr wrt warps.

__json__(self)[source]
nesting(self)[source]
warp(self, *args, **kwargs)[source]

alias for delayed_warp, might change to this API in the future

crop(self, *args, **kwargs)[source]

alias for delayed_crop, might change to this API in the future

class kwcoco.util.util_delayed_poc.DelayedVideoOperation[source]

Bases: DelayedVisionOperation

Base class for nodes in a tree of delayed computer-vision operations

class kwcoco.util.util_delayed_poc.DelayedImageOperation[source]

Bases: DelayedVisionOperation

Operations that pertain only to images

delayed_crop(self, region_slices)[source]

Create a new delayed image that performs a crop in the transformed “self” space.

Parameters

region_slices (Tuple[slice, slice]) – y-slice and x-slice.

Note

Returns a heuristically “simplified” tree. In the current implementation there are only 3 operations, cat, warp, and crop. All cats go at the top, all crops go at the bottom, all warps are in the middle.

Returns

lazy executed delayed transform

Return type

DelayedWarp

Example

>>> dsize = (100, 100)
>>> tf2 = kwimage.Affine.affine(scale=3).matrix
>>> self = DelayedWarp(np.random.rand(33, 33), tf2, dsize)
>>> region_slices = (slice(5, 10), slice(1, 12))
>>> delayed_crop = self.delayed_crop(region_slices)
>>> print(ub.repr2(delayed_crop.nesting(), nl=-1, sort=0))
>>> delayed_crop.finalize()

Example

>>> chan1 = DelayedLoad.demo('astro')
>>> chan2 = DelayedLoad.demo('carl')
>>> warped1a = chan1.delayed_warp(kwimage.Affine.scale(1.2).matrix)
>>> warped2a = chan2.delayed_warp(kwimage.Affine.scale(1.5))
>>> warped1b = warped1a.delayed_warp(kwimage.Affine.scale(1.2).matrix)
>>> warped2b = warped2a.delayed_warp(kwimage.Affine.scale(1.5))
>>> #
>>> region_slices = (slice(97, 677), slice(5, 691))
>>> self = warped2b
>>> #
>>> crop1 = warped1b.delayed_crop(region_slices)
>>> crop2 = warped2b.delayed_crop(region_slices)
>>> print(ub.repr2(warped1b.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(warped2b.nesting(), nl=-1, sort=0))
>>> # Notice how the crop merges the two nesting layers
>>> # (via the hueristic optimize step)
>>> print(ub.repr2(crop1.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(crop2.nesting(), nl=-1, sort=0))
>>> frame1 = crop1.finalize(dsize=(500, 500))
>>> frame2 = crop2.finalize(dsize=(500, 500))
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(frame1, pnum=(1, 2, 1), fnum=1)
>>> kwplot.imshow(frame2, pnum=(1, 2, 2), fnum=1)
delayed_warp(self, transform, dsize=None)[source]

Delayed transform the underlying data.

Note

this deviates from kwimage warp functions because instead of “output_dims” (specified in c-style shape) we specify dsize (w, h).

Returns

new delayed transform a chained transform

Return type

DelayedWarp

abstract take_channels(self, channels)[source]
class kwcoco.util.util_delayed_poc.DelayedIdentity(sub_data, dsize=None, channels=None, quantization=None)[source]

Bases: DelayedImageOperation

Noop leaf that does nothing. Can be used to hold raw data.

Typically used to just hold raw data.

DelayedIdentity.demo(‘astro’, chan=0, dsize=(32, 32))

Example

>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> sub_data = np.random.rand(31, 37, 3)
>>> self = DelayedIdentity(sub_data)
>>> self = DelayedIdentity(sub_data, channels='L|a|b')
>>> # test with quantization
>>> rng = kwarray.ensure_rng(32)
>>> sub_data_quant = (rng.rand(31, 37, 3) * 1000).astype(np.int16)
>>> sub_data_quant[0, 0] = -9999
>>> self = DelayedIdentity(sub_data_quant, channels='L|a|b', quantization={
>>>     'orig_min': 0.,
>>>     'orig_max': 1.,
>>>     'quant_min': 0,
>>>     'quant_max': 1000,
>>>     'nodata': -9999,
>>> })
>>> final1 = self.finalize(dequantize=True)
>>> final2 = self.finalize(dequantize=False)
>>> assert np.all(np.isnan(final1[0, 0]))
>>> scale = final2 / final1
>>> scales = scale[scale > 0]
>>> assert np.all(np.isclose(scales, 1000))
>>> # check that take channels works
>>> new_subdata = self.take_channels('a')
>>> sub_final1 = new_subdata.finalize(dequantize=True)
>>> sub_final2 = new_subdata.finalize(dequantize=False)
>>> assert sub_final1.dtype.kind == 'f'
>>> assert sub_final2.dtype.kind == 'i'
__hack_dont_optimize__ = True[source]
classmethod demo(cls, key='astro', chan=None, dsize=None)[source]
children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

_optimize_paths(self, **kwargs)[source]

Iterate through the leaf nodes, which are virtually transformed into the root space.

This returns some sort of hueristically optimized leaf repr wrt warps.

finalize(self, **kwargs)[source]
take_channels(self, channels)[source]
kwcoco.util.util_delayed_poc.dequantize(quant_data, quantization)[source]

Helper for dequantization

class kwcoco.util.util_delayed_poc.DelayedNans(dsize=None, channels=None)[source]

Bases: DelayedImageOperation

Constructs nan channels as needed

Example

self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce(‘rgb’)) region_slices = (slice(5, 10), slice(1, 12)) delayed = self.delayed_crop(region_slices)

Example

>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> dsize = (307, 311)
>>> c1 = DelayedNans(dsize=dsize, channels=channel_spec.FusedChannelSpec.coerce('foo'))
>>> c2 = DelayedLoad.demo('astro', dsize=dsize).load_shape(True)
>>> cat = DelayedChannelConcat([c1, c2])
>>> warped_cat = cat.delayed_warp(kwimage.Affine.scale(1.07), dsize=(328, 332))
>>> warped_cat.finalize()

#>>> cropped = warped_cat.delayed_crop((slice(0, 300), slice(0, 100))) #>>> cropped.finalize().shape

property shape(self)[source]
property num_bands(self)[source]
property dsize(self)[source]
property channels(self)[source]
children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

_optimize_paths(self, **kwargs)[source]

Iterate through the leaf nodes, which are virtually transformed into the root space.

This returns some sort of hueristically optimized leaf repr wrt warps.

finalize(self, **kwargs)[source]
delayed_crop(self, region_slices)[source]

Create a new delayed image that performs a crop in the transformed “self” space.

Parameters

region_slices (Tuple[slice, slice]) – y-slice and x-slice.

Note

Returns a heuristically “simplified” tree. In the current implementation there are only 3 operations, cat, warp, and crop. All cats go at the top, all crops go at the bottom, all warps are in the middle.

Returns

lazy executed delayed transform

Return type

DelayedWarp

Example

>>> dsize = (100, 100)
>>> tf2 = kwimage.Affine.affine(scale=3).matrix
>>> self = DelayedWarp(np.random.rand(33, 33), tf2, dsize)
>>> region_slices = (slice(5, 10), slice(1, 12))
>>> delayed_crop = self.delayed_crop(region_slices)
>>> print(ub.repr2(delayed_crop.nesting(), nl=-1, sort=0))
>>> delayed_crop.finalize()

Example

>>> chan1 = DelayedLoad.demo('astro')
>>> chan2 = DelayedLoad.demo('carl')
>>> warped1a = chan1.delayed_warp(kwimage.Affine.scale(1.2).matrix)
>>> warped2a = chan2.delayed_warp(kwimage.Affine.scale(1.5))
>>> warped1b = warped1a.delayed_warp(kwimage.Affine.scale(1.2).matrix)
>>> warped2b = warped2a.delayed_warp(kwimage.Affine.scale(1.5))
>>> #
>>> region_slices = (slice(97, 677), slice(5, 691))
>>> self = warped2b
>>> #
>>> crop1 = warped1b.delayed_crop(region_slices)
>>> crop2 = warped2b.delayed_crop(region_slices)
>>> print(ub.repr2(warped1b.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(warped2b.nesting(), nl=-1, sort=0))
>>> # Notice how the crop merges the two nesting layers
>>> # (via the hueristic optimize step)
>>> print(ub.repr2(crop1.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(crop2.nesting(), nl=-1, sort=0))
>>> frame1 = crop1.finalize(dsize=(500, 500))
>>> frame2 = crop2.finalize(dsize=(500, 500))
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(frame1, pnum=(1, 2, 1), fnum=1)
>>> kwplot.imshow(frame2, pnum=(1, 2, 2), fnum=1)
delayed_warp(self, transform, dsize=None)[source]

Delayed transform the underlying data.

Note

this deviates from kwimage warp functions because instead of “output_dims” (specified in c-style shape) we specify dsize (w, h).

Returns

new delayed transform a chained transform

Return type

DelayedWarp

class kwcoco.util.util_delayed_poc.DelayedLoad(fpath, channels=None, dsize=None, num_bands=None, immediate_crop=None, immediate_chan_idxs=None, immediate_dsize=None, quantization=None)[source]

Bases: DelayedImageOperation

A load operation for a specific sub-region and sub-bands in a specified image.

Note

This class contains support for fusing certain lazy operations into this layer, namely cropping, scaling, and channel selection.

For now these are named immediates

Example

>>> fpath = kwimage.grab_test_image_fpath()
>>> self = DelayedLoad(fpath)
>>> print('self = {!r}'.format(self))
>>> self.load_shape()
>>> print('self = {!r}'.format(self))
>>> self.finalize()
>>> f1_img = DelayedLoad.demo('astro', dsize=(300, 300))
>>> f2_img = DelayedLoad.demo('carl', dsize=(256, 320))
>>> print('f1_img = {!r}'.format(f1_img))
>>> print('f2_img = {!r}'.format(f2_img))
>>> print(f2_img.finalize().shape)
>>> print(f1_img.finalize().shape)
>>> fpath = kwimage.grab_test_image_fpath()
>>> channels = channel_spec.FusedChannelSpec.coerce('rgb')
>>> self = DelayedLoad(fpath, channels=channels)

Example

>>> # Test with quantization
>>> fpath = kwimage.grab_test_image_fpath()
>>> channels = channel_spec.FusedChannelSpec.coerce('rgb')
>>> self = DelayedLoad(fpath, channels=channels, quantization={
>>>     'orig_min': 0.,
>>>     'orig_max': 1.,
>>>     'quant_min': 0,
>>>     'quant_max': 256,
>>>     'nodata': None,
>>> })
>>> final1 = self.finalize(dequantize=False)
>>> final2 = self.finalize(dequantize=True)
>>> assert final1.dtype.kind == 'u'
>>> assert final2.dtype.kind == 'f'
>>> assert final2.max() <= 1
__hack_dont_optimize__ = True[source]
classmethod demo(DelayedLoad, key='astro', dsize=None)[source]
abstract classmethod coerce(cls, data)[source]
children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

nesting(self)[source]
_optimize_paths(self, **kwargs)[source]

Iterate through the leaf nodes, which are virtually transformed into the root space.

This returns some sort of hueristically optimized leaf repr wrt warps.

load_shape(self, use_channel_heuristic=False)[source]
_ensure_dsize(self)[source]
property shape(self)[source]
property num_bands(self)[source]
property dsize(self)[source]
property channels(self)[source]
property fpath(self)[source]
finalize(self, **kwargs)[source]

Todo

  • [ ] Load from overviews if a scale will be necessary

Parameters

**kwargs

nodataif specified this data item is treated as nodata, the

data is then converted to floats and the nodata value is replaced with nan.

delayed_crop(self, region_slices)[source]
Parameters

region_slices (Tuple[slice, slice]) – y-slice and x-slice.

Returns

a new delayed load object with a fused crop operation

Return type

DelayedLoad

Example

>>> # Test chained crop operations
>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> self = orig = DelayedLoad.demo('astro').load_shape()
>>> region_slices = slices1 = (slice(0, 90), slice(30, 60))
>>> self = crop1 = orig.delayed_crop(slices1)
>>> region_slices = slices2 = (slice(10, 21), slice(10, 22))
>>> self = crop2 = crop1.delayed_crop(slices2)
>>> region_slices = slices3 = (slice(3, 20), slice(5, 20))
>>> crop3 = crop2.delayed_crop(slices3)
>>> # Spot check internals
>>> print('orig = {}'.format(ub.repr2(orig.__json__(), nl=2)))
>>> print('crop1 = {}'.format(ub.repr2(crop1.__json__(), nl=2)))
>>> print('crop2 = {}'.format(ub.repr2(crop2.__json__(), nl=2)))
>>> print('crop3 = {}'.format(ub.repr2(crop3.__json__(), nl=2)))
>>> # Test internals
>>> assert crop3._immediates['crop'][0].start == 13
>>> assert crop3._immediates['crop'][0].stop == 21
>>> # Test shapes work out correctly
>>> assert crop3.finalize().shape == (8, 7, 3)
>>> assert crop2.finalize().shape == (11, 12, 3)
>>> assert crop1.take_channels([1, 2]).finalize().shape == (90, 30, 2)
>>> assert orig.finalize().shape == (512, 512, 3)

Note

This chart gives an intuition on how new absolute slice coords
are computed from existing absolute coords ane relative coords.

      5 7    <- new
      3 5    <- rel
   --------
   01234567  <- relative coordinates
   --------
   2      9  <- curr
 ----------
 0123456789  <- absolute coordinates
 ----------
take_channels(self, channels)[source]

This method returns a subset of the vision data with only the specified bands / channels.

Parameters

channels (List[int] | slice | channel_spec.FusedChannelSpec) – List of integers indexes, a slice, or a channel spec, which is typically a pipe (|) delimited list of channel codes. See kwcoco.ChannelSpec for more detials.

Returns

a new delayed load with a fused take channel operation

Return type

DelayedLoad

Note

The channel subset must exist here or it will raise an error. A better implementation (via pymbolic) might be able to do better

Example

>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> import kwcoco
>>> self = DelayedLoad.demo('astro').load_shape()
>>> channels = [2, 0]
>>> new = self.take_channels(channels)
>>> new3 = new.take_channels([1, 0])
>>> final1 = self.finalize()
>>> final2 = new.finalize()
>>> final3 = new3.finalize()
>>> assert np.all(final1[..., 2] == final2[..., 0])
>>> assert np.all(final1[..., 0] == final2[..., 1])
>>> assert final2.shape[2] == 2
>>> assert np.all(final1[..., 2] == final3[..., 1])
>>> assert np.all(final1[..., 0] == final3[..., 0])
>>> assert final3.shape[2] == 2
class kwcoco.util.util_delayed_poc.DelayedFrameConcat(frames, dsize=None)[source]

Bases: DelayedVideoOperation

Represents multiple frames in a video

Note

Video[0]:
    Frame[0]:
        Chan[0]: (32) +--------------------------------+
        Chan[1]: (16) +----------------+
        Chan[2]: ( 8) +--------+
    Frame[1]:
        Chan[0]: (30) +------------------------------+
        Chan[1]: (14) +--------------+
        Chan[2]: ( 6) +------+

Todo

  • [ ] Support computing the transforms when none of the data is loaded

Example

>>> # Simpler case with fewer nesting levels
>>> rng = kwarray.ensure_rng(None)
>>> # Delayed warp each channel into its "image" space
>>> # Note: the images never enter the space we transform through
>>> f1_img = DelayedLoad.demo('astro', (300, 300))
>>> f2_img = DelayedLoad.demo('carl', (256, 256))
>>> # Combine frames into a video
>>> vid_dsize = np.array((100, 100))
>>> self = vid = DelayedFrameConcat([
>>>     f1_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f1_img.dsize)),
>>>     f2_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f2_img.dsize)),
>>> ], dsize=vid_dsize)
>>> print(ub.repr2(vid.nesting(), nl=-1, sort=0))
>>> final = vid.finalize(interpolation='nearest', dsize=(32, 32))
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final[0], pnum=(1, 2, 1), fnum=1)
>>> kwplot.imshow(final[1], pnum=(1, 2, 2), fnum=1)
>>> region_slices = (slice(0, 90), slice(30, 60))
children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

property channels(self)[source]
property shape(self)[source]
finalize(self, **kwargs)[source]

Execute the final transform

delayed_crop(self, region_slices)[source]

Example

>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> # Create raw channels in some "native" resolution for frame 1
>>> f1_chan1 = DelayedIdentity.demo('astro', chan=(1, 0), dsize=(300, 300))
>>> f1_chan2 = DelayedIdentity.demo('astro', chan=2, dsize=(10, 10))
>>> # Create raw channels in some "native" resolution for frame 2
>>> f2_chan1 = DelayedIdentity.demo('carl', dsize=(64, 64), chan=(1, 0))
>>> f2_chan2 = DelayedIdentity.demo('carl', dsize=(10, 10), chan=2)
>>> #
>>> f1_dsize = np.array(f1_chan1.dsize)
>>> f2_dsize = np.array(f2_chan1.dsize)
>>> f1_img = DelayedChannelConcat([
>>>     f1_chan1.delayed_warp(kwimage.Affine.scale(f1_dsize / f1_chan1.dsize), dsize=f1_dsize),
>>>     f1_chan2.delayed_warp(kwimage.Affine.scale(f1_dsize / f1_chan2.dsize), dsize=f1_dsize),
>>> ])
>>> f2_img = DelayedChannelConcat([
>>>     f2_chan1.delayed_warp(kwimage.Affine.scale(f2_dsize / f2_chan1.dsize), dsize=f2_dsize),
>>>     f2_chan2.delayed_warp(kwimage.Affine.scale(f2_dsize / f2_chan2.dsize), dsize=f2_dsize),
>>> ])
>>> vid_dsize = np.array((280, 280))
>>> full_vid = DelayedFrameConcat([
>>>     f1_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f1_img.dsize), dsize=vid_dsize),
>>>     f2_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f2_img.dsize), dsize=vid_dsize),
>>> ])
>>> region_slices = (slice(80, 200), slice(80, 200))
>>> print(ub.repr2(full_vid.nesting(), nl=-1, sort=0))
>>> crop_vid = full_vid.delayed_crop(region_slices)
>>> final_full = full_vid.finalize(interpolation='nearest')
>>> final_crop = crop_vid.finalize(interpolation='nearest')
>>> import pytest
>>> with pytest.raises(ValueError):
>>>     # should not be able to crop a crop yet
>>>     crop_vid.delayed_crop(region_slices)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(final_full[0], pnum=(2, 2, 1), fnum=1)
>>> kwplot.imshow(final_full[1], pnum=(2, 2, 2), fnum=1)
>>> kwplot.imshow(final_crop[0], pnum=(2, 2, 3), fnum=1)
>>> kwplot.imshow(final_crop[1], pnum=(2, 2, 4), fnum=1)
delayed_warp(self, transform, dsize=None)[source]

Delayed transform the underlying data.

Note

this deviates from kwimage warp functions because instead of “output_dims” (specified in c-style shape) we specify dsize (w, h).

Returns

new delayed transform a chained transform

Return type

DelayedWarp

class kwcoco.util.util_delayed_poc.DelayedChannelConcat(components, dsize=None)[source]

Bases: DelayedImageOperation

Represents multiple channels in an image that could be concatenated

Variables

components (List[DelayedWarp]) – a list of stackable channels. Each component may be comprised of multiple channels.

Todo

  • [ ] can this be generalized into a delayed concat?

  • [ ] can all concats be delayed until the very end?

Example

>>> comp1 = DelayedWarp(np.random.rand(11, 7))
>>> comp2 = DelayedWarp(np.random.rand(11, 7, 3))
>>> comp3 = DelayedWarp(
>>>     np.random.rand(3, 5, 2),
>>>     transform=kwimage.Affine.affine(scale=(7/5, 11/3)).matrix,
>>>     dsize=(7, 11)
>>> )
>>> components = [comp1, comp2, comp3]
>>> chans = DelayedChannelConcat(components)
>>> final = chans.finalize()
>>> assert final.shape == chans.shape
>>> assert final.shape == (11, 7, 6)
>>> # We should be able to nest DelayedChannelConcat inside virutal images
>>> frame1 = DelayedWarp(
>>>     chans, transform=kwimage.Affine.affine(scale=2.2).matrix,
>>>     dsize=(20, 26))
>>> frame2 = DelayedWarp(
>>>     np.random.rand(3, 3, 6), dsize=(20, 26))
>>> frame3 = DelayedWarp(
>>>     np.random.rand(3, 3, 6), dsize=(20, 26))
>>> print(ub.repr2(frame1.nesting(), nl=-1, sort=False))
>>> frame1.finalize()
>>> vid = DelayedFrameConcat([frame1, frame2, frame3])
>>> print(ub.repr2(vid.nesting(), nl=-1, sort=False))
children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

classmethod random(cls, num_parts=3, rng=None)[source]

Example

>>> self = DelayedChannelConcat.random()
>>> print('self = {!r}'.format(self))
>>> print(ub.repr2(self.nesting(), nl=-1, sort=0))
property channels(self)[source]
property shape(self)[source]
finalize(self, **kwargs)[source]

Execute the final transform

delayed_warp(self, transform, dsize=None)[source]

Delayed transform the underlying data.

Note

this deviates from kwimage warp functions because instead of “output_dims” (specified in c-style shape) we specify dsize (w, h).

Returns

new delayed transform a chained transform

Return type

DelayedWarp

take_channels(self, channels)[source]

This method returns a subset of the vision data with only the specified bands / channels.

Parameters

channels (List[int] | slice | channel_spec.FusedChannelSpec) – List of integers indexes, a slice, or a channel spec, which is typically a pipe (|) delimited list of channel codes. See kwcoco.ChannelSpec for more detials.

Returns

a delayed vision operation that only operates on the following channels.

Return type

DelayedVisionOperation

Example

>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> import kwcoco
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> self = delayed = dset.delayed_load(1)
>>> channels = 'B11|B8|B1|B10'
>>> new = self.take_channels(channels)

Example

>>> # Complex case
>>> import kwcoco
>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral')
>>> delayed = dset.delayed_load(1)
>>> astro = DelayedLoad.demo('astro').load_shape(use_channel_heuristic=True)
>>> aligned = astro.warp(kwimage.Affine.scale(600 / 512), dsize='auto')
>>> self = combo = DelayedChannelConcat(delayed.components + [aligned])
>>> channels = 'B1|r|B8|g'
>>> new = self.take_channels(channels)
>>> new_cropped = new.crop((slice(10, 200), slice(12, 350)))
>>> datas = new_cropped.finalize()
>>> vizable = kwimage.normalize_intensity(datas, axis=2)
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> stacked = kwimage.stack_images(vizable.transpose(2, 0, 1))
>>> kwplot.imshow(stacked)

CommandLine

xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/util_delayed_poc.py DelayedChannelConcat.take_channels:2 --profile

Example

>>> # Test case where requested channel does not exist
>>> import kwcoco
>>> from kwcoco.util.util_delayed_poc import *  # NOQA
>>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', use_cache=1, verbose=100)
>>> self = dset.delayed_load(1)
>>> channels = 'B1|foobar|bazbiz|B8'
>>> new = self.take_channels(channels)
>>> new_cropped = new.crop((slice(10, 200), slice(12, 350)))
>>> fused = new_cropped.finalize()
>>> assert fused.shape == (190, 338, 4)
>>> assert np.all(np.isnan(fused[..., 1:3]))
>>> assert not np.any(np.isnan(fused[..., 0]))
>>> assert not np.any(np.isnan(fused[..., 3]))
class kwcoco.util.util_delayed_poc.DelayedWarp(sub_data, transform=None, dsize=None)[source]

Bases: DelayedImageOperation

POC for chainable transforms

Note

“sub” is used to refer to the underlying data in its native coordinates and resolution.

“self” is used to refer to the data in the transformed coordinates that are exposed by this class.

Variables
  • sub_data (DelayedWarp | ArrayLike) – array-like image data at a naitive resolution

  • transform (Transform) – transforms data from native “sub”-image-space to “self”-image-space.

Example

>>> dsize = (12, 12)
>>> tf1 = np.array([[2, 0, 0], [0, 2, 0], [0, 0, 1]])
>>> tf2 = np.array([[3, 0, 0], [0, 3, 0], [0, 0, 1]])
>>> tf3 = np.array([[4, 0, 0], [0, 4, 0], [0, 0, 1]])
>>> band1 = DelayedWarp(np.random.rand(6, 6), tf1, dsize)
>>> band2 = DelayedWarp(np.random.rand(4, 4), tf2, dsize)
>>> band3 = DelayedWarp(np.random.rand(3, 3), tf3, dsize)
>>> #
>>> # Execute a crop in a one-level transformed space
>>> region_slices = (slice(5, 10), slice(0, 12))
>>> delayed_crop = band2.delayed_crop(region_slices)
>>> final_crop = delayed_crop.finalize()
>>> #
>>> # Execute a crop in a nested transformed space
>>> tf4 = np.array([[1.5, 0, 0], [0, 1.5, 0], [0, 0, 1]])
>>> chained = DelayedWarp(band2, tf4, (18, 18))
>>> delayed_crop = chained.delayed_crop(region_slices)
>>> final_crop = delayed_crop.finalize()
>>> #
>>> tf4 = np.array([[.5, 0, 0], [0, .5, 0], [0, 0, 1]])
>>> chained = DelayedWarp(band2, tf4, (6, 6))
>>> delayed_crop = chained.delayed_crop(region_slices)
>>> final_crop = delayed_crop.finalize()
>>> #
>>> region_slices = (slice(1, 5), slice(2, 4))
>>> delayed_crop = chained.delayed_crop(region_slices)
>>> final_crop = delayed_crop.finalize()

Example

>>> dsize = (17, 12)
>>> tf = np.array([[5.2, 0, 1.1], [0, 3.1, 2.2], [0, 0, 1]])
>>> self = DelayedWarp(np.random.rand(3, 5, 13), tf, dsize=dsize)
>>> self.finalize().shape
classmethod random(cls, nesting=(2, 5), rng=None)[source]

Example

>>> self = DelayedWarp.random(nesting=(4, 7))
>>> print('self = {!r}'.format(self))
>>> print(ub.repr2(self.nesting(), nl=-1, sort=0))
property channels(self)[source]
children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

property dsize(self)[source]
property num_bands(self)[source]
property shape(self)[source]
_optimize_paths(self, **kwargs)[source]

Example

>>> self = DelayedWarp.random()
>>> leafs = list(self._optimize_paths())
>>> print('leafs = {!r}'.format(leafs))
finalize(self, transform=None, dsize=None, interpolation='linear', **kwargs)[source]

Execute the final transform

Can pass a parent transform to augment this underlying transform.

Parameters
  • transform (Transform) – an additional transform to perform

  • dsize (Tuple[int, int]) – overrides destination canvas size

Example

>>> tf = np.array([[0.9, 0, 3.9], [0, 1.1, -.5], [0, 0, 1]])
>>> raw = kwimage.grab_test_image(dsize=(54, 65))
>>> raw = kwimage.ensure_float01(raw)
>>> # Test nested finalize
>>> layer1 = raw
>>> num = 10
>>> for _ in range(num):
...     layer1  = DelayedWarp(layer1, tf, dsize='auto')
>>> final1 = layer1.finalize()
>>> # Test non-nested finalize
>>> layer2 = list(layer1._optimize_paths())[0]
>>> final2 = layer2.finalize()
>>> #
>>> print(ub.repr2(layer1.nesting(), nl=-1, sort=0))
>>> print(ub.repr2(layer2.nesting(), nl=-1, sort=0))
>>> print('final1 = {!r}'.format(final1))
>>> print('final2 = {!r}'.format(final2))
>>> print('final1.shape = {!r}'.format(final1.shape))
>>> print('final2.shape = {!r}'.format(final2.shape))
>>> assert np.allclose(final1, final2)
>>> #
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> kwplot.imshow(raw, pnum=(1, 3, 1), fnum=1)
>>> kwplot.imshow(final1, pnum=(1, 3, 2), fnum=1)
>>> kwplot.imshow(final2, pnum=(1, 3, 3), fnum=1)
>>> kwplot.show_if_requested()

Example

>>> # Test aliasing
>>> s = DelayedIdentity.demo()
>>> s = DelayedIdentity.demo('checkerboard')
>>> a = s.delayed_warp(kwimage.Affine.scale(0.05), dsize='auto')
>>> b = s.delayed_warp(kwimage.Affine.scale(3), dsize='auto')
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> # It looks like downsampling linear and area is the same
>>> # Does warpAffine have no alias handling?
>>> pnum_ = kwplot.PlotNums(nRows=2, nCols=4)
>>> kwplot.imshow(a.finalize(interpolation='area'), pnum=pnum_(), title='warpAffine area')
>>> kwplot.imshow(a.finalize(interpolation='linear'), pnum=pnum_(), title='warpAffine linear')
>>> kwplot.imshow(a.finalize(interpolation='nearest'), pnum=pnum_(), title='warpAffine nearest')
>>> kwplot.imshow(a.finalize(interpolation='nearest', antialias=False), pnum=pnum_(), title='warpAffine nearest AA=0')
>>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='area'), pnum=pnum_(), title='resize area')
>>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='linear'), pnum=pnum_(), title='resize linear')
>>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='nearest'), pnum=pnum_(), title='resize nearest')
>>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='cubic'), pnum=pnum_(), title='resize cubic')
take_channels(self, channels)[source]
class kwcoco.util.util_delayed_poc.DelayedCrop(sub_data, sub_slices)[source]

Bases: DelayedImageOperation

Represent a delayed crop operation

Example

>>> sub_data = DelayedLoad.demo()
>>> sub_slices = (slice(5, 10), slice(1, 12))
>>> self = DelayedCrop(sub_data, sub_slices)
>>> print(ub.repr2(self.nesting(), nl=-1, sort=0))
>>> final = self.finalize()
>>> print('final.shape = {!r}'.format(final.shape))

Example

>>> sub_data = DelayedLoad.demo()
>>> sub_slices = (slice(5, 10), slice(1, 12))
>>> crop1 = DelayedCrop(sub_data, sub_slices)
>>> import pytest
>>> # Should only error while huristics are in use.
>>> with pytest.raises(ValueError):
>>>     crop2 = DelayedCrop(crop1, sub_slices)
__hack_dont_optimize__ = True[source]
property channels(self)[source]
children(self)[source]

Abstract method, which should generate all of the direct children of a node in the operation tree.

finalize(self, **kwargs)[source]
abstract _optimize_paths(self, **kwargs)[source]

Iterate through the leaf nodes, which are virtually transformed into the root space.

This returns some sort of hueristically optimized leaf repr wrt warps.

kwcoco.util.util_delayed_poc._compute_leaf_subcrop(root_region_bounds, tf_leaf_to_root)[source]

Given a region in a “root” image and a trasnform between that “root” and some “leaf” image, compute the appropriate quantized region in the “leaf” image and the adjusted transformation between that root and leaf.

Example

>>> region_slices = (slice(33, 100), slice(22, 62))
>>> region_shape = (100, 100, 1)
>>> root_region_box = kwimage.Boxes.from_slice(region_slices, shape=region_shape)
>>> root_region_bounds = root_region_box.to_polygons()[0]
>>> tf_leaf_to_root = kwimage.Affine.affine(scale=7).matrix
>>> slices, tf_new = _compute_leaf_subcrop(root_region_bounds, tf_leaf_to_root)
>>> print('tf_new =\n{!r}'.format(tf_new))
>>> print('slices = {!r}'.format(slices))
kwcoco.util.util_delayed_poc._largest_shape(shapes)[source]

Finds maximum over all shapes

Example

>>> shapes = [
>>>     (10, 20), None, (None, 30), (40, 50, 60, None), (100,)
>>> ]
>>> largest = _largest_shape(shapes)
>>> print('largest = {!r}'.format(largest))
>>> assert largest == (100, 50, 60, None)
kwcoco.util.util_delayed_poc._devcheck_corner()[source]
kwcoco.util.util_delayed_poc._auto_dsize(transform, sub_dsize)[source]