"""
Terminal nodes
"""
import kwarray
import kwimage
import numpy as np
import warnings
from delayed_image.delayed_nodes import DelayedImage, TRACE_OPTIMIZE
# from delayed_image.delayed_nodes import DelayedArray
try:
from xdev import profile
except ImportError:
from ubelt import identity as profile
__docstubs__ = """
from delayed_image.channel_spec import FusedChannelSpec
"""
[docs]
class DelayedImageLeaf(DelayedImage):
[docs]
@profile
def optimize(self):
return self
[docs]
class DelayedLoad(DelayedImageLeaf):
"""
Points to an image on disk to be loaded.
This is the starting point for most delayed operations. Disk IO is avoided
until the ``finalize`` operation is called. Calling ``prepare`` can read
image headers if metadata like the image width, height, and number of
channels is not provided, but most operations can be performed while these
are still unknown.
If a gdal backend is available, and the underlying image is in the
appropriate formate (e.g. COG), finalize will return a lazy reference that
enables fast overviews and crops. For image formats that do not allow for
tiling / overviews, then there is no way to avoid reading entire image as
an ndarray.
Example:
>>> from delayed_image import * # NOQA
>>> self = DelayedLoad.demo(dsize=(16, 16)).prepare()
>>> data1 = self.finalize()
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Demo code to develop support for overviews
>>> from delayed_image import * # NOQA
>>> import kwimage
>>> import ubelt as ub
>>> fpath = kwimage.grab_test_image_fpath(overviews=3)
>>> self = DelayedLoad(fpath, channels='r|g|b').prepare()
>>> print(f'self={self}')
>>> print('self.meta = {}'.format(ub.repr2(self.meta, nl=1)))
>>> quantization = {
>>> 'quant_max': 255,
>>> 'nodata': 0,
>>> }
>>> node0 = self
>>> node1 = node0.get_overview(2)
>>> node2 = node1[13:900, 11:700]
>>> node3 = node2.dequantize(quantization)
>>> node4 = node3.warp({'scale': 0.05})
>>> #
>>> data0 = node0._validate().finalize()
>>> data1 = node1._validate().finalize()
>>> data2 = node2._validate().finalize()
>>> data3 = node3._validate().finalize()
>>> data4 = node4._validate().finalize()
>>> node4.write_network_text()
Example:
>>> # xdoctest: +REQUIRES(module:osgeo)
>>> # Test delayed ops with int16 and nodata values
>>> from delayed_image import * # NOQA
>>> import kwimage
>>> from delayed_image.helpers import quantize_float01
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('delayed_image/tests/test_delay_nodata').ensuredir()
>>> fpath = dpath / 'data.tif'
>>> data = kwimage.ensure_float01(kwimage.grab_test_image())
>>> poly = kwimage.Polygon.random(rng=321032).scale(data.shape[0])
>>> poly.fill(data, np.nan)
>>> data_uint16, quantization = quantize_float01(data)
>>> nodata = quantization['nodata']
>>> kwimage.imwrite(fpath, data_uint16, nodata=nodata, backend='gdal', overviews=3)
>>> # Test loading the data
>>> self = DelayedLoad(fpath, channels='r|g|b', nodata_method='float').prepare()
>>> node0 = self
>>> node1 = node0.dequantize(quantization)
>>> node2 = node1.warp({'scale': 0.51}, interpolation='lanczos')
>>> node3 = node2[13:900, 11:700]
>>> node4 = node3.warp({'scale': 0.9}, interpolation='lanczos')
>>> node4.write_network_text()
>>> node5 = node4.optimize()
>>> node5.write_network_text()
>>> node6 = node5.warp({'scale': 8}, interpolation='lanczos').optimize()
>>> node6.write_network_text()
>>> #
>>> data0 = node0._validate().finalize()
>>> data1 = node1._validate().finalize()
>>> data2 = node2._validate().finalize()
>>> data3 = node3._validate().finalize()
>>> data4 = node4._validate().finalize()
>>> data5 = node5._validate().finalize()
>>> data6 = node6._validate().finalize()
>>> # xdoctest: +REQUIRES(--show)
>>> import kwplot
>>> kwplot.autompl()
>>> stack1 = kwimage.stack_images([data1, data2, data3, data4, data5])
>>> stack2 = kwimage.stack_images([stack1, data6], axis=1)
>>> kwplot.imshow(stack2)
"""
def __init__(self, fpath, channels=None, dsize=None, nodata_method=None):
"""
Args:
fpath (str | PathLike):
URI pointing at the image data to load
channels (int | str | FusedChannelSpec | None):
the underlying channels of the image if known a-priori
dsize (Tuple[int, int]):
The width / height of the image if known a-priori
nodata_method (str | None):
How to handle nodata values in the file itself.
Can be "auto", "float", or "ma".
"""
super().__init__(channels=channels, dsize=dsize)
self.meta['fpath'] = fpath
self.meta['nodata_method'] = nodata_method
self.lazy_ref = None
@property
def fpath(self):
return self.meta['fpath']
[docs]
@classmethod
def demo(DelayedLoad, key='astro', channels=None, dsize=None,
nodata_method=None, overviews=None):
"""
Creates a demo DelayedLoad node that points to a file generated by
:func:`kwimage.grab_test_image_fpath`.
If metadata like dsize and channels are not provided, then the
:func:`prepare` can be used to auto-populate them at the cost of the
disk IO to read image headers.
Args:
key (str): which test image to grab. Valid choices are:
astro - an astronaught
carl - Carl Sagan
paraview - ParaView logo
stars - picture of stars in the sky
channels (str):
if specified, these channels will be stored in the delayed load
metadata. Note: these are not auto-populated. Usually the
key corresponds to 3-channel data,
dsize (None | Tuple[int, int]):
if specified, we will return a variant of the data with the
specific dsize
nodata_method (str | None):
How to handle nodata values in the file itself.
Can be "auto", "float", or "ma".
overviews (None | int):
if specified, will return a variant of the data with overviews
Returns:
DelayedLoad
Example:
>>> from delayed_image.delayed_leafs import * # NOQA
>>> import delayed_image
>>> delayed = delayed_image.DelayedLoad.demo()
>>> print(f'delayed={delayed}')
>>> delayed.prepare()
>>> print(f'delayed={delayed}')
>>> delayed = DelayedLoad.demo(channels='r|g|b', nodata_method='float')
>>> print(f'delayed={delayed}')
>>> delayed.prepare()
>>> print(f'delayed={delayed}')
>>> delayed.finalize()
"""
fpath = kwimage.grab_test_image_fpath(key, dsize=dsize,
overviews=overviews)
self = DelayedLoad(fpath, channels=channels, dsize=dsize,
nodata_method=nodata_method)
return self
[docs]
@profile
def _load_reference(self):
nodata_method = self.meta.get('nodata_method', None)
if self.lazy_ref is None:
from delayed_image import lazy_loaders
using_gdal = lazy_loaders.LazyGDalFrameFile.available()
if using_gdal:
# the nodata arg here isn't named that great
self.lazy_ref = lazy_loaders.LazyGDalFrameFile(
self.fpath, nodata_method=nodata_method)
else:
if nodata_method == 'auto':
raise Exception('need gdal for auto no-data')
self.lazy_ref = NotImplemented
return self
[docs]
@profile
def prepare(self):
"""
If metadata is missing, perform minimal IO operations in order to
prepopulate metadata that could help us better optimize the operation
tree.
Returns:
DelayedLoad
"""
self._load_metadata()
return self
[docs]
@profile
def _finalize(self):
"""
Returns:
ArrayLike
Example:
>>> # Check difference between finalize and _finalize
>>> from delayed_image.delayed_leafs import * # NOQA
>>> self = DelayedLoad.demo().prepare()
>>> final_arr = self.finalize()
>>> assert isinstance(final_arr, np.ndarray), 'finalize should always return an array'
>>> final_ref = self._finalize()
>>> if self.lazy_ref is not NotImplemented:
>>> assert not isinstance(final_ref, np.ndarray), (
>>> 'A pure load with gdal should return a reference that is '
>>> 'similiar to but not quite an array')
"""
self._load_reference()
if self.lazy_ref is NotImplemented:
warnings.warn('DelayedLoad may not be efficient without gdal')
pre_final = kwimage.imread(self.fpath)
pre_final = kwarray.atleast_nd(pre_final, 3)
return pre_final
else:
# Need to ensure that if any metadata changed, we modify the
# underlying lazy ref.
self.lazy_ref.nodata_method = self.meta.get('nodata_method', None)
return self.lazy_ref
[docs]
class DelayedNans(DelayedImageLeaf):
"""
Constructs nan channels as needed
Example:
self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce('rgb'))
region_slices = (slice(5, 10), slice(1, 12))
delayed = self.crop(region_slices)
Example:
>>> from delayed_image.delayed_leafs import * # NOQA
>>> from delayed_image import DelayedChannelConcat
>>> dsize = (307, 311)
>>> c1 = DelayedNans(dsize=dsize, channels='foo')
>>> c2 = DelayedLoad.demo('astro', dsize=dsize, channels='R|G|B').prepare()
>>> cat = DelayedChannelConcat([c1, c2])
>>> warped_cat = cat.warp({'scale': 1.07}, dsize=(328, 332))._validate()
>>> warped_cat._validate().optimize().finalize()
"""
def __init__(self, dsize=None, channels=None):
super().__init__(channels=channels, dsize=dsize)
self._kwargs = {}
[docs]
@profile
def _finalize(self):
"""
Returns:
ArrayLike
"""
shape = self.shape
from delayed_image.helpers import _ensure_valid_shape
shape = _ensure_valid_shape(shape)
final = np.full(shape, fill_value=np.nan)
return final
[docs]
@profile
def _optimized_crop(self, space_slice=None, chan_idxs=None):
"""
Crops an image along integer pixel coordinates.
Args:
space_slice (Tuple[slice, slice]): y-slice and x-slice.
chan_idxs (List[int]): indexes of bands to take
Returns:
DelayedImage
"""
if chan_idxs is None:
channels = self.channels
else:
channels = self.channels[chan_idxs]
dsize = self.dsize
data_dims = dsize[::-1]
data_slice, extra_pad = kwarray.embed_slice(space_slice, data_dims)
box = kwimage.Boxes.from_slice(data_slice)
new_width = box.width.ravel()[0]
new_height = box.height.ravel()[0]
new_dsize = (new_width, new_height)
new = self.__class__(new_dsize, channels=channels, **self._kwargs)
if TRACE_OPTIMIZE:
new._opt_logs.append('Nans._optimized_crop')
return new
[docs]
@profile
def _optimized_warp(self, transform, dsize=None, **warp_kwargs):
"""
Returns:
DelayedImage
"""
# Warping does nothing to nans, except maybe changing the dsize
new = self.__class__(dsize, channels=self.channels, **self._kwargs)
if TRACE_OPTIMIZE:
new._opt_logs.append('Nans._optimized_warp')
return new
class DelayedNodata(DelayedNans):
"""
Constructs nan or masked array depending on what is needed
Example:
>>> from delayed_image.delayed_leafs import * # NOQA
>>> dsize = (307, 311)
>>> self1 = DelayedNodata(dsize=dsize, channels='foo', nodata_method='float')
>>> self2 = DelayedNodata(dsize=dsize, channels='foo', nodata_method='ma')
>>> im1 = self1.finalize()
>>> im2 = self2.finalize()
>>> assert im1.dtype.kind == 'f'
>>> assert not hasattr(im1, 'mask')
>>> assert hasattr(im2, 'mask')
"""
def __init__(self, dsize=None, channels=None, nodata_method='float'):
super().__init__(channels=channels, dsize=dsize)
self.meta['nodata_method'] = nodata_method
self._kwargs['nodata_method'] = nodata_method
@profile
def _finalize(self):
"""
Returns:
ArrayLike
"""
shape = self.shape
from delayed_image.helpers import _ensure_valid_shape
shape = _ensure_valid_shape(shape)
nodata_method = self.meta['nodata_method']
if nodata_method == 'ma':
# TODO: dtype should probably depend on what it will be combined
# with?
wrapped = np.empty(shape, dtype=np.uint8)
final = np.ma.array(wrapped, dtype=np.uint8, mask=True)
elif nodata_method is None or nodata_method in {'float', 'nan'}:
final = np.full(shape, fill_value=np.nan)
else:
raise KeyError(nodata_method)
return final
[docs]
class DelayedIdentity(DelayedImageLeaf):
"""
Returns an ndarray as-is
Example:
self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce('rgb'))
region_slices = (slice(5, 10), slice(1, 12))
delayed = self.crop(region_slices)
Example:
>>> from delayed_image import * # NOQA
>>> arr = kwimage.checkerboard()
>>> self = DelayedIdentity(arr, channels='gray')
>>> warp = self.warp({'scale': 1.07})
>>> warp.optimize().finalize()
"""
def __init__(self, data, channels=None, dsize=None):
super().__init__(channels=channels)
self.data = data
self.meta['num_channels'] = kwimage.num_channels(data)
if dsize is None:
dsize = data.shape[0:2][::-1]
self.meta['dsize'] = dsize
[docs]
def _finalize(self):
"""
Returns:
ArrayLike
"""
return self.data