:py:mod:`kwcoco.util` ===================== .. py:module:: kwcoco.util .. autoapi-nested-parse:: mkinit ~/code/kwcoco/kwcoco/util/__init__.py -w Submodules ---------- .. toctree:: :titlesonly: :maxdepth: 1 dict_like/index.rst jsonschema_elements/index.rst lazy_frame_backends/index.rst util_archive/index.rst util_delayed_poc/index.rst util_futures/index.rst util_json/index.rst util_monkey/index.rst util_sklearn/index.rst util_truncate/index.rst Package Contents ---------------- Classes ~~~~~~~ .. autoapisummary:: kwcoco.util.DictLike kwcoco.util.ContainerElements kwcoco.util.Element kwcoco.util.QuantifierElements kwcoco.util.ScalarElements kwcoco.util.SchemaElements kwcoco.util.LazyGDalFrameFile kwcoco.util.LazyRasterIOFrameFile kwcoco.util.LazySpectralFrameFile kwcoco.util.Archive kwcoco.util.DelayedChannelConcat kwcoco.util.DelayedCrop kwcoco.util.DelayedFrameConcat kwcoco.util.DelayedIdentity kwcoco.util.DelayedImageOperation kwcoco.util.DelayedLoad kwcoco.util.DelayedNans kwcoco.util.DelayedVideoOperation kwcoco.util.DelayedVisionOperation kwcoco.util.DelayedWarp kwcoco.util.SupressPrint kwcoco.util.StratifiedGroupKFold Functions ~~~~~~~~~ .. autoapisummary:: kwcoco.util.unarchive_file kwcoco.util.ensure_json_serializable kwcoco.util.find_json_unserializable kwcoco.util.indexable_allclose kwcoco.util.smart_truncate Attributes ~~~~~~~~~~ .. autoapisummary:: kwcoco.util.ALLOF kwcoco.util.ANY kwcoco.util.ANYOF kwcoco.util.ARRAY kwcoco.util.BOOLEAN kwcoco.util.INTEGER kwcoco.util.NOT kwcoco.util.NULL kwcoco.util.NUMBER kwcoco.util.OBJECT kwcoco.util.ONEOF kwcoco.util.STRING kwcoco.util.elem kwcoco.util.IndexableWalker .. py:class:: DictLike Bases: :py:obj:`ubelt.NiceRepr` An inherited class must specify the ``getitem``, ``setitem``, and ``keys`` methods. A class is dictionary like if it has: ``__iter__``, ``__len__``, ``__contains__``, ``__getitem__``, ``items``, ``keys``, ``values``, ``get``, and if it should be writable it should have: ``__delitem__``, ``__setitem__``, ``update``, And perhaps: ``copy``, ``__iter__``, ``__len__``, ``__contains__``, ``__getitem__``, ``items``, ``keys``, ``values``, ``get``, and if it should be writable it should have: ``__delitem__``, ``__setitem__``, ``update``, And perhaps: ``copy``, .. py:attribute:: asdict .. py:method:: getitem(self, key) :abstractmethod: .. py:method:: setitem(self, key, value) :abstractmethod: .. py:method:: delitem(self, key) :abstractmethod: .. py:method:: keys(self) :abstractmethod: .. py:method:: __len__(self) .. py:method:: __iter__(self) .. py:method:: __contains__(self, key) .. py:method:: __delitem__(self, key) .. py:method:: __getitem__(self, key) .. py:method:: __setitem__(self, key, value) .. py:method:: items(self) .. py:method:: values(self) .. py:method:: copy(self) .. py:method:: to_dict(self) .. py:method:: update(self, other) .. py:method:: get(self, key, default=None) .. py:data:: ALLOF .. py:data:: ANY .. py:data:: ANYOF .. py:data:: ARRAY .. py:data:: BOOLEAN .. py:class:: ContainerElements Types that contain other types .. rubric:: Example >>> from kwcoco.util.jsonschema_elements import * # NOQA >>> print(elem.ARRAY().validate()) >>> print(elem.OBJECT().validate()) >>> print(elem.OBJECT().validate()) {'type': 'array', 'items': {}} {'type': 'object', 'properties': {}} {'type': 'object', 'properties': {}} .. py:method:: ARRAY(self, TYPE={}, **kw) https://json-schema.org/understanding-json-schema/reference/array.html .. rubric:: Example >>> from kwcoco.util.jsonschema_elements import * # NOQA >>> ARRAY(numItems=3) >>> schema = ARRAY(minItems=3) >>> schema.validate() {'type': 'array', 'items': {}, 'minItems': 3} .. py:method:: OBJECT(self, PROPERTIES={}, **kw) https://json-schema.org/understanding-json-schema/reference/object.html .. rubric:: Example >>> import jsonschema >>> schema = elem.OBJECT() >>> jsonschema.validate({}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.ANY(), >>> 'key2': elem.ANY(), >>> }, required=['key1']) >>> jsonschema.validate({'key1': None}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.OBJECT({'arr': elem.ARRAY()}), >>> 'key2': elem.ANY(), >>> }, required=['key1'], title='a title') >>> schema.validate() >>> print('schema = {}'.format(ub.repr2(schema, sort=1, nl=-1))) >>> jsonschema.validate({'key1': {'arr': []}}, schema) schema = { 'properties': { 'key1': { 'properties': { 'arr': {'items': {}, 'type': 'array'} }, 'type': 'object' }, 'key2': {} }, 'required': ['key1'], 'title': 'a title', 'type': 'object' } .. py:class:: Element(base, options={}, _magic=None) Bases: :py:obj:`dict` A dictionary used to define an element of a JSON Schema. The exact keys/values for the element will depend on the type of element being described. The :class:`SchemaElements` defines exactly what these are for the core elements. (e.g. OBJECT, INTEGER, NULL, ARRAY, ANYOF) .. rubric:: Example >>> from kwcoco.coco_schema import * # NOQA >>> self = Element(base={'type': 'demo'}, options={'opt1', 'opt2'}) >>> new = self(opt1=3) >>> print('self = {}'.format(ub.repr2(self, nl=1, sort=1))) >>> print('new = {}'.format(ub.repr2(new, nl=1, sort=1))) >>> print('new2 = {}'.format(ub.repr2(new(), nl=1, sort=1))) >>> print('new3 = {}'.format(ub.repr2(new(title='myvar'), nl=1, sort=1))) >>> print('new4 = {}'.format(ub.repr2(new(title='myvar')(examples=['']), nl=1, sort=1))) >>> print('new5 = {}'.format(ub.repr2(new(badattr=True), nl=1, sort=1))) self = { 'type': 'demo', } new = { 'opt1': 3, 'type': 'demo', } new2 = { 'opt1': 3, 'type': 'demo', } new3 = { 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new4 = { 'examples': [''], 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new5 = { 'opt1': 3, 'type': 'demo', } .. py:attribute:: __generics__ .. py:method:: __call__(self, *args, **kw) .. py:method:: validate(self, instance=ub.NoParam) If ``instance`` is given, validates that that dictionary conforms to this schema. Otherwise validates that this is a valid schema element. :Parameters: **instance** (*dict*) -- a dictionary to validate .. py:method:: __or__(self, other) Syntax for making an anyOf relationship .. rubric:: Example >>> from kwcoco.util.jsonschema_elements import * # NOQA >>> obj1 = OBJECT(dict(opt1=NUMBER())) >>> obj2 = OBJECT(dict(opt2=STRING())) >>> obj3 = OBJECT(dict(opt3=ANY())) >>> any_v1 = obj1 | obj2 >>> any_v2 = ANYOF(obj1, obj2) >>> assert any_v1 == any_v2 >>> any_v3 = any_v1 | obj3 >>> any_v4 = ANYOF(obj1, obj2, obj3) >>> assert any_v3 == any_v4 .. py:data:: INTEGER .. py:data:: NOT .. py:data:: NULL .. py:data:: NUMBER .. py:data:: OBJECT .. py:data:: ONEOF .. py:class:: QuantifierElements Bases: :py:obj:`object` Quantifier types https://json-schema.org/understanding-json-schema/reference/combining.html#allof .. rubric:: Example >>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem.ANYOF(elem.STRING, elem.NUMBER).validate() >>> elem.ONEOF(elem.STRING, elem.NUMBER).validate() >>> elem.NOT(elem.NULL).validate() >>> elem.NOT(elem.ANY).validate() >>> elem.ANY.validate() .. py:method:: ANY(self) :property: .. py:method:: ALLOF(self, *TYPES) .. py:method:: ANYOF(self, *TYPES) .. py:method:: ONEOF(self, *TYPES) .. py:method:: NOT(self, TYPE) .. py:data:: STRING .. py:class:: ScalarElements Bases: :py:obj:`object` Single-valued elements .. py:method:: NULL(self) :property: https://json-schema.org/understanding-json-schema/reference/null.html .. py:method:: BOOLEAN(self) :property: https://json-schema.org/understanding-json-schema/reference/null.html .. py:method:: STRING(self) :property: https://json-schema.org/understanding-json-schema/reference/string.html .. py:method:: NUMBER(self) :property: https://json-schema.org/understanding-json-schema/reference/numeric.html#number .. py:method:: INTEGER(self) :property: https://json-schema.org/understanding-json-schema/reference/numeric.html#integer .. py:class:: SchemaElements Bases: :py:obj:`ScalarElements`, :py:obj:`QuantifierElements`, :py:obj:`ContainerElements` Functional interface into defining jsonschema structures. See mixin classes for details. .. rubric:: References https://json-schema.org/understanding-json-schema/ .. todo:: - [ ] Generics: title, description, default, examples .. rubric:: CommandLine .. code-block:: bash xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/jsonschema_elements.py SchemaElements .. rubric:: Example >>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem = SchemaElements() >>> elem.ARRAY(elem.ANY()) >>> schema = OBJECT({ >>> 'prop1': ARRAY(INTEGER, minItems=3), >>> 'prop2': ARRAY(STRING, numItems=2), >>> 'prop3': ARRAY(OBJECT({ >>> 'subprob1': NUMBER, >>> 'subprob2': NUMBER, >>> })) >>> }) >>> print('schema = {}'.format(ub.repr2(schema, nl=2, sort=1))) schema = { 'properties': { 'prop1': {'items': {'type': 'integer'}, 'minItems': 3, 'type': 'array'}, 'prop2': {'items': {'type': 'string'}, 'maxItems': 2, 'minItems': 2, 'type': 'array'}, 'prop3': {'items': {'properties': {'subprob1': {'type': 'number'}, 'subprob2': {'type': 'number'}}, 'type': 'object'}, 'type': 'array'}, }, 'type': 'object', } >>> TYPE = elem.OBJECT({ >>> 'p1': ANY, >>> 'p2': ANY, >>> }, required=['p1']) >>> import jsonschema >>> inst = {'p1': None} >>> jsonschema.validate(inst, schema=TYPE) >>> #jsonschema.validate({'p2': None}, schema=TYPE) .. py:data:: elem .. py:class:: LazyGDalFrameFile(fpath, nodata=None) Bases: :py:obj:`ubelt.NiceRepr` .. todo:: - [ ] Move to its own backend module - [ ] When used with COCO, allow the image metadata to populate the height, width, and channels if possible. .. rubric:: Example >>> # xdoctest: +REQUIRES(module:osgeo) >>> self = LazyGDalFrameFile.demo() >>> print('self = {!r}'.format(self)) >>> self[0:3, 0:3] >>> self[:, :, 0] >>> self[0] >>> self[0, 3] >>> # import kwplot >>> # kwplot.imshow(self[:]) :Parameters: * **nodata** * **masking_method** .. rubric:: Example >>> # See if we can reproduce the INTERLEAVE bug data = np.random.rand(128, 128, 64) import kwimage import ubelt as ub from os.path import join dpath = ub.ensure_app_cache_dir('kwcoco/tests/reader') fpath = join(dpath, 'foo.tiff') kwimage.imwrite(fpath, data, backend='skimage') recon1 = kwimage.imread(fpath) recon1.shape self = LazyGDalFrameFile(fpath) self.shape self[:] .. py:method:: available(self) :classmethod: Returns True if this backend is available .. py:method:: _ds(self) .. py:method:: demo(cls, key='astro', dsize=None) :classmethod: .. py:method:: ndim(self) :property: .. py:method:: shape(self) :property: .. py:method:: dtype(self) :property: .. py:method:: __nice__(self) .. py:method:: __getitem__(self, index) .. rubric:: References https://gis.stackexchange.com/questions/162095/gdal-driver-create-typeerror .. rubric:: Example >>> # Test nodata works correctly >>> # xdoctest: +REQUIRES(module:osgeo) >>> from kwcoco.util.lazy_frame_backends import * # NOQA >>> from kwcoco.util.lazy_frame_backends import _demo_geoimg_with_nodata >>> fpath = _demo_geoimg_with_nodata() >>> self = LazyGDalFrameFile(fpath, nodata='auto') >>> imdata = self[:] >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> import kwarray >>> kwplot.autompl() >>> imdata = kwimage.normalize_intensity(imdata) >>> imdata = np.nan_to_num(imdata) >>> kwplot.imshow(imdata) .. py:method:: __array__(self) Allow this object to be passed to np.asarray .. rubric:: References https://numpy.org/doc/stable/user/basics.dispatch.html .. py:class:: LazyRasterIOFrameFile(fpath) Bases: :py:obj:`ubelt.NiceRepr` fpath = '/home/joncrall/.cache/kwcoco/demo/large_hyperspectral/big_img_128.bsq' lazy_rio = LazyRasterIOFrameFile(fpath) ds = lazy_rio._ds .. py:method:: available(self) :classmethod: Returns True if this backend is available .. py:method:: _ds(self) .. py:method:: ndim(self) :property: .. py:method:: shape(self) :property: .. py:method:: dtype(self) :property: .. py:method:: __nice__(self) .. py:method:: __getitem__(self, index) .. py:class:: LazySpectralFrameFile(fpath) Bases: :py:obj:`ubelt.NiceRepr` Potentially faster than GDAL for HDR formats. .. py:method:: _ds(self) .. py:method:: available(self) :classmethod: Returns True if this backend is available .. py:method:: ndim(self) :property: .. py:method:: shape(self) :property: .. py:method:: dtype(self) :property: .. py:method:: __nice__(self) .. py:method:: __getitem__(self, index) .. py:class:: Archive(fpath=None, mode='r', backend=None, file=None) Bases: :py:obj:`object` Abstraction over zipfile and tarfile .. todo:: see if we can use one of these other tools instead SeeAlso: https://github.com/RKrahl/archive-tools https://pypi.org/project/arlib/ .. rubric:: Example >>> from os.path import join >>> dpath = ub.ensure_app_cache_dir('ubelt', 'tests', 'archive') >>> ub.delete(dpath) >>> dpath = ub.ensure_app_cache_dir(dpath) >>> import pathlib >>> dpath = pathlib.Path(dpath) >>> # >>> # >>> mode = 'w' >>> self1 = Archive(str(dpath / 'demo.zip'), mode=mode) >>> self2 = Archive(str(dpath / 'demo.tar.gz'), mode=mode) >>> # >>> open(dpath / 'data_1only.txt', 'w').write('bazbzzz') >>> open(dpath / 'data_2only.txt', 'w').write('buzzz') >>> open(dpath / 'data_both.txt', 'w').write('foobar') >>> # >>> self1.add(dpath / 'data_both.txt') >>> self1.add(dpath / 'data_1only.txt') >>> # >>> self2.add(dpath / 'data_both.txt') >>> self2.add(dpath / 'data_2only.txt') >>> # >>> self1.close() >>> self2.close() >>> # >>> self1 = Archive(str(dpath / 'demo.zip'), mode='r') >>> self2 = Archive(str(dpath / 'demo.tar.gz'), mode='r') >>> # >>> extract_dpath = ub.ensuredir(str(dpath / 'extracted')) >>> extracted1 = self1.extractall(extract_dpath) >>> extracted2 = self2.extractall(extract_dpath) >>> for fpath in extracted2: >>> print(open(fpath, 'r').read()) >>> for fpath in extracted1: >>> print(open(fpath, 'r').read()) .. py:method:: _open(cls, fpath, mode) :classmethod: .. py:method:: __iter__(self) .. py:method:: coerce(cls, data) :classmethod: Either open an archive file path or coerce an existing ZipFile or tarfile structure into this wrapper class .. py:method:: add(self, fpath, arcname=None) .. py:method:: close(self) .. py:method:: __enter__(self) .. py:method:: __exit__(self, *args) .. py:method:: extractall(self, output_dpath='.', verbose=1, overwrite=True) .. py:function:: unarchive_file(archive_fpath, output_dpath='.', verbose=1, overwrite=True) .. py:class:: DelayedChannelConcat(components, dsize=None) Bases: :py:obj:`DelayedImageOperation` Represents multiple channels in an image that could be concatenated :ivar components: a list of stackable channels. Each component may be comprised of multiple channels. :vartype components: List[DelayedWarp] .. todo:: - [ ] can this be generalized into a delayed concat? - [ ] can all concats be delayed until the very end? .. rubric:: Example >>> comp1 = DelayedWarp(np.random.rand(11, 7)) >>> comp2 = DelayedWarp(np.random.rand(11, 7, 3)) >>> comp3 = DelayedWarp( >>> np.random.rand(3, 5, 2), >>> transform=kwimage.Affine.affine(scale=(7/5, 11/3)).matrix, >>> dsize=(7, 11) >>> ) >>> components = [comp1, comp2, comp3] >>> chans = DelayedChannelConcat(components) >>> final = chans.finalize() >>> assert final.shape == chans.shape >>> assert final.shape == (11, 7, 6) >>> # We should be able to nest DelayedChannelConcat inside virutal images >>> frame1 = DelayedWarp( >>> chans, transform=kwimage.Affine.affine(scale=2.2).matrix, >>> dsize=(20, 26)) >>> frame2 = DelayedWarp( >>> np.random.rand(3, 3, 6), dsize=(20, 26)) >>> frame3 = DelayedWarp( >>> np.random.rand(3, 3, 6), dsize=(20, 26)) >>> print(ub.repr2(frame1.nesting(), nl=-1, sort=False)) >>> frame1.finalize() >>> vid = DelayedFrameConcat([frame1, frame2, frame3]) >>> print(ub.repr2(vid.nesting(), nl=-1, sort=False)) .. py:method:: children(self) Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: random(cls, num_parts=3, rng=None) :classmethod: .. rubric:: Example >>> self = DelayedChannelConcat.random() >>> print('self = {!r}'.format(self)) >>> print(ub.repr2(self.nesting(), nl=-1, sort=0)) .. py:method:: channels(self) :property: .. py:method:: shape(self) :property: .. py:method:: finalize(self, **kwargs) Execute the final transform .. py:method:: delayed_warp(self, transform, dsize=None) Delayed transform the underlying data. .. note:: this deviates from kwimage warp functions because instead of "output_dims" (specified in c-style shape) we specify dsize (w, h). :returns: new delayed transform a chained transform :rtype: DelayedWarp .. py:method:: take_channels(self, channels) This method returns a subset of the vision data with only the specified bands / channels. :Parameters: **channels** (*List[int] | slice | channel_spec.FusedChannelSpec*) -- List of integers indexes, a slice, or a channel spec, which is typically a pipe (`|`) delimited list of channel codes. See kwcoco.ChannelSpec for more detials. :returns: a delayed vision operation that only operates on the following channels. :rtype: DelayedVisionOperation .. rubric:: Example >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> import kwcoco >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> self = delayed = dset.delayed_load(1) >>> channels = 'B11|B8|B1|B10' >>> new = self.take_channels(channels) .. rubric:: Example >>> # Complex case >>> import kwcoco >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral') >>> delayed = dset.delayed_load(1) >>> astro = DelayedLoad.demo('astro').load_shape(use_channel_heuristic=True) >>> aligned = astro.warp(kwimage.Affine.scale(600 / 512), dsize='auto') >>> self = combo = DelayedChannelConcat(delayed.components + [aligned]) >>> channels = 'B1|r|B8|g' >>> new = self.take_channels(channels) >>> new_cropped = new.crop((slice(10, 200), slice(12, 350))) >>> datas = new_cropped.finalize() >>> vizable = kwimage.normalize_intensity(datas, axis=2) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> stacked = kwimage.stack_images(vizable.transpose(2, 0, 1)) >>> kwplot.imshow(stacked) .. rubric:: CommandLine .. code-block:: bash xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/util_delayed_poc.py DelayedChannelConcat.take_channels:2 --profile .. rubric:: Example >>> # Test case where requested channel does not exist >>> import kwcoco >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> dset = kwcoco.CocoDataset.demo('vidshapes8-multispectral', use_cache=1, verbose=100) >>> self = dset.delayed_load(1) >>> channels = 'B1|foobar|bazbiz|B8' >>> new = self.take_channels(channels) >>> new_cropped = new.crop((slice(10, 200), slice(12, 350))) >>> fused = new_cropped.finalize() >>> assert fused.shape == (190, 338, 4) >>> assert np.all(np.isnan(fused[..., 1:3])) >>> assert not np.any(np.isnan(fused[..., 0])) >>> assert not np.any(np.isnan(fused[..., 3])) .. py:class:: DelayedCrop(sub_data, sub_slices) Bases: :py:obj:`DelayedImageOperation` Represent a delayed crop operation .. rubric:: Example >>> sub_data = DelayedLoad.demo() >>> sub_slices = (slice(5, 10), slice(1, 12)) >>> self = DelayedCrop(sub_data, sub_slices) >>> print(ub.repr2(self.nesting(), nl=-1, sort=0)) >>> final = self.finalize() >>> print('final.shape = {!r}'.format(final.shape)) .. rubric:: Example >>> sub_data = DelayedLoad.demo() >>> sub_slices = (slice(5, 10), slice(1, 12)) >>> crop1 = DelayedCrop(sub_data, sub_slices) >>> import pytest >>> # Should only error while huristics are in use. >>> with pytest.raises(ValueError): >>> crop2 = DelayedCrop(crop1, sub_slices) .. py:attribute:: __hack_dont_optimize__ :annotation: = True .. py:method:: channels(self) :property: .. py:method:: children(self) Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: finalize(self, **kwargs) .. py:method:: _optimize_paths(self, **kwargs) :abstractmethod: Iterate through the leaf nodes, which are virtually transformed into the root space. This returns some sort of hueristically optimized leaf repr wrt warps. .. py:class:: DelayedFrameConcat(frames, dsize=None) Bases: :py:obj:`DelayedVideoOperation` Represents multiple frames in a video .. note:: .. code:: Video[0]: Frame[0]: Chan[0]: (32) +--------------------------------+ Chan[1]: (16) +----------------+ Chan[2]: ( 8) +--------+ Frame[1]: Chan[0]: (30) +------------------------------+ Chan[1]: (14) +--------------+ Chan[2]: ( 6) +------+ .. todo:: - [ ] Support computing the transforms when none of the data is loaded .. rubric:: Example >>> # Simpler case with fewer nesting levels >>> rng = kwarray.ensure_rng(None) >>> # Delayed warp each channel into its "image" space >>> # Note: the images never enter the space we transform through >>> f1_img = DelayedLoad.demo('astro', (300, 300)) >>> f2_img = DelayedLoad.demo('carl', (256, 256)) >>> # Combine frames into a video >>> vid_dsize = np.array((100, 100)) >>> self = vid = DelayedFrameConcat([ >>> f1_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f1_img.dsize)), >>> f2_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f2_img.dsize)), >>> ], dsize=vid_dsize) >>> print(ub.repr2(vid.nesting(), nl=-1, sort=0)) >>> final = vid.finalize(interpolation='nearest', dsize=(32, 32)) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(final[0], pnum=(1, 2, 1), fnum=1) >>> kwplot.imshow(final[1], pnum=(1, 2, 2), fnum=1) >>> region_slices = (slice(0, 90), slice(30, 60)) .. py:method:: children(self) Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: channels(self) :property: .. py:method:: shape(self) :property: .. py:method:: finalize(self, **kwargs) Execute the final transform .. py:method:: delayed_crop(self, region_slices) .. rubric:: Example >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> # Create raw channels in some "native" resolution for frame 1 >>> f1_chan1 = DelayedIdentity.demo('astro', chan=(1, 0), dsize=(300, 300)) >>> f1_chan2 = DelayedIdentity.demo('astro', chan=2, dsize=(10, 10)) >>> # Create raw channels in some "native" resolution for frame 2 >>> f2_chan1 = DelayedIdentity.demo('carl', dsize=(64, 64), chan=(1, 0)) >>> f2_chan2 = DelayedIdentity.demo('carl', dsize=(10, 10), chan=2) >>> # >>> f1_dsize = np.array(f1_chan1.dsize) >>> f2_dsize = np.array(f2_chan1.dsize) >>> f1_img = DelayedChannelConcat([ >>> f1_chan1.delayed_warp(kwimage.Affine.scale(f1_dsize / f1_chan1.dsize), dsize=f1_dsize), >>> f1_chan2.delayed_warp(kwimage.Affine.scale(f1_dsize / f1_chan2.dsize), dsize=f1_dsize), >>> ]) >>> f2_img = DelayedChannelConcat([ >>> f2_chan1.delayed_warp(kwimage.Affine.scale(f2_dsize / f2_chan1.dsize), dsize=f2_dsize), >>> f2_chan2.delayed_warp(kwimage.Affine.scale(f2_dsize / f2_chan2.dsize), dsize=f2_dsize), >>> ]) >>> vid_dsize = np.array((280, 280)) >>> full_vid = DelayedFrameConcat([ >>> f1_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f1_img.dsize), dsize=vid_dsize), >>> f2_img.delayed_warp(kwimage.Affine.scale(vid_dsize / f2_img.dsize), dsize=vid_dsize), >>> ]) >>> region_slices = (slice(80, 200), slice(80, 200)) >>> print(ub.repr2(full_vid.nesting(), nl=-1, sort=0)) >>> crop_vid = full_vid.delayed_crop(region_slices) >>> final_full = full_vid.finalize(interpolation='nearest') >>> final_crop = crop_vid.finalize(interpolation='nearest') >>> import pytest >>> with pytest.raises(ValueError): >>> # should not be able to crop a crop yet >>> crop_vid.delayed_crop(region_slices) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(final_full[0], pnum=(2, 2, 1), fnum=1) >>> kwplot.imshow(final_full[1], pnum=(2, 2, 2), fnum=1) >>> kwplot.imshow(final_crop[0], pnum=(2, 2, 3), fnum=1) >>> kwplot.imshow(final_crop[1], pnum=(2, 2, 4), fnum=1) .. py:method:: delayed_warp(self, transform, dsize=None) Delayed transform the underlying data. .. note:: this deviates from kwimage warp functions because instead of "output_dims" (specified in c-style shape) we specify dsize (w, h). :returns: new delayed transform a chained transform :rtype: DelayedWarp .. py:class:: DelayedIdentity(sub_data, dsize=None, channels=None, quantization=None) Bases: :py:obj:`DelayedImageOperation` Noop leaf that does nothing. Can be used to hold raw data. Typically used to just hold raw data. DelayedIdentity.demo('astro', chan=0, dsize=(32, 32)) .. rubric:: Example >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> sub_data = np.random.rand(31, 37, 3) >>> self = DelayedIdentity(sub_data) >>> self = DelayedIdentity(sub_data, channels='L|a|b') >>> # test with quantization >>> rng = kwarray.ensure_rng(32) >>> sub_data_quant = (rng.rand(31, 37, 3) * 1000).astype(np.int16) >>> sub_data_quant[0, 0] = -9999 >>> self = DelayedIdentity(sub_data_quant, channels='L|a|b', quantization={ >>> 'orig_min': 0., >>> 'orig_max': 1., >>> 'quant_min': 0, >>> 'quant_max': 1000, >>> 'nodata': -9999, >>> }) >>> final1 = self.finalize(dequantize=True) >>> final2 = self.finalize(dequantize=False) >>> assert np.all(np.isnan(final1[0, 0])) >>> scale = final2 / final1 >>> scales = scale[scale > 0] >>> assert np.all(np.isclose(scales, 1000)) >>> # check that take channels works >>> new_subdata = self.take_channels('a') >>> sub_final1 = new_subdata.finalize(dequantize=True) >>> sub_final2 = new_subdata.finalize(dequantize=False) >>> assert sub_final1.dtype.kind == 'f' >>> assert sub_final2.dtype.kind == 'i' .. py:attribute:: __hack_dont_optimize__ :annotation: = True .. py:method:: demo(cls, key='astro', chan=None, dsize=None) :classmethod: .. py:method:: children(self) Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: _optimize_paths(self, **kwargs) Iterate through the leaf nodes, which are virtually transformed into the root space. This returns some sort of hueristically optimized leaf repr wrt warps. .. py:method:: finalize(self, **kwargs) .. py:method:: take_channels(self, channels) .. py:class:: DelayedImageOperation Bases: :py:obj:`DelayedVisionOperation` Operations that pertain only to images .. py:method:: delayed_crop(self, region_slices) Create a new delayed image that performs a crop in the transformed "self" space. :Parameters: **region_slices** (*Tuple[slice, slice]*) -- y-slice and x-slice. .. note:: Returns a heuristically "simplified" tree. In the current implementation there are only 3 operations, cat, warp, and crop. All cats go at the top, all crops go at the bottom, all warps are in the middle. :returns: lazy executed delayed transform :rtype: DelayedWarp .. rubric:: Example >>> dsize = (100, 100) >>> tf2 = kwimage.Affine.affine(scale=3).matrix >>> self = DelayedWarp(np.random.rand(33, 33), tf2, dsize) >>> region_slices = (slice(5, 10), slice(1, 12)) >>> delayed_crop = self.delayed_crop(region_slices) >>> print(ub.repr2(delayed_crop.nesting(), nl=-1, sort=0)) >>> delayed_crop.finalize() .. rubric:: Example >>> chan1 = DelayedLoad.demo('astro') >>> chan2 = DelayedLoad.demo('carl') >>> warped1a = chan1.delayed_warp(kwimage.Affine.scale(1.2).matrix) >>> warped2a = chan2.delayed_warp(kwimage.Affine.scale(1.5)) >>> warped1b = warped1a.delayed_warp(kwimage.Affine.scale(1.2).matrix) >>> warped2b = warped2a.delayed_warp(kwimage.Affine.scale(1.5)) >>> # >>> region_slices = (slice(97, 677), slice(5, 691)) >>> self = warped2b >>> # >>> crop1 = warped1b.delayed_crop(region_slices) >>> crop2 = warped2b.delayed_crop(region_slices) >>> print(ub.repr2(warped1b.nesting(), nl=-1, sort=0)) >>> print(ub.repr2(warped2b.nesting(), nl=-1, sort=0)) >>> # Notice how the crop merges the two nesting layers >>> # (via the hueristic optimize step) >>> print(ub.repr2(crop1.nesting(), nl=-1, sort=0)) >>> print(ub.repr2(crop2.nesting(), nl=-1, sort=0)) >>> frame1 = crop1.finalize(dsize=(500, 500)) >>> frame2 = crop2.finalize(dsize=(500, 500)) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(frame1, pnum=(1, 2, 1), fnum=1) >>> kwplot.imshow(frame2, pnum=(1, 2, 2), fnum=1) .. py:method:: delayed_warp(self, transform, dsize=None) Delayed transform the underlying data. .. note:: this deviates from kwimage warp functions because instead of "output_dims" (specified in c-style shape) we specify dsize (w, h). :returns: new delayed transform a chained transform :rtype: DelayedWarp .. py:method:: take_channels(self, channels) :abstractmethod: .. py:class:: DelayedLoad(fpath, channels=None, dsize=None, num_bands=None, immediate_crop=None, immediate_chan_idxs=None, immediate_dsize=None, quantization=None) Bases: :py:obj:`DelayedImageOperation` A load operation for a specific sub-region and sub-bands in a specified image. .. note:: This class contains support for fusing certain lazy operations into this layer, namely cropping, scaling, and channel selection. For now these are named ``immediates`` .. rubric:: Example >>> fpath = kwimage.grab_test_image_fpath() >>> self = DelayedLoad(fpath) >>> print('self = {!r}'.format(self)) >>> self.load_shape() >>> print('self = {!r}'.format(self)) >>> self.finalize() >>> f1_img = DelayedLoad.demo('astro', dsize=(300, 300)) >>> f2_img = DelayedLoad.demo('carl', dsize=(256, 320)) >>> print('f1_img = {!r}'.format(f1_img)) >>> print('f2_img = {!r}'.format(f2_img)) >>> print(f2_img.finalize().shape) >>> print(f1_img.finalize().shape) >>> fpath = kwimage.grab_test_image_fpath() >>> channels = channel_spec.FusedChannelSpec.coerce('rgb') >>> self = DelayedLoad(fpath, channels=channels) .. rubric:: Example >>> # Test with quantization >>> fpath = kwimage.grab_test_image_fpath() >>> channels = channel_spec.FusedChannelSpec.coerce('rgb') >>> self = DelayedLoad(fpath, channels=channels, quantization={ >>> 'orig_min': 0., >>> 'orig_max': 1., >>> 'quant_min': 0, >>> 'quant_max': 256, >>> 'nodata': None, >>> }) >>> final1 = self.finalize(dequantize=False) >>> final2 = self.finalize(dequantize=True) >>> assert final1.dtype.kind == 'u' >>> assert final2.dtype.kind == 'f' >>> assert final2.max() <= 1 .. py:attribute:: __hack_dont_optimize__ :annotation: = True .. py:method:: demo(DelayedLoad, key='astro', dsize=None) :classmethod: .. py:method:: coerce(cls, data) :classmethod: :abstractmethod: .. py:method:: children(self) Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: nesting(self) .. py:method:: _optimize_paths(self, **kwargs) Iterate through the leaf nodes, which are virtually transformed into the root space. This returns some sort of hueristically optimized leaf repr wrt warps. .. py:method:: load_shape(self, use_channel_heuristic=False) .. py:method:: _ensure_dsize(self) .. py:method:: shape(self) :property: .. py:method:: num_bands(self) :property: .. py:method:: dsize(self) :property: .. py:method:: channels(self) :property: .. py:method:: fpath(self) :property: .. py:method:: finalize(self, **kwargs) .. todo:: - [ ] Load from overviews if a scale will be necessary :Parameters: **\*\*kwargs** -- nodata : if specified this data item is treated as nodata, the data is then converted to floats and the nodata value is replaced with nan. .. py:method:: delayed_crop(self, region_slices) :Parameters: **region_slices** (*Tuple[slice, slice]*) -- y-slice and x-slice. :returns: a new delayed load object with a fused crop operation :rtype: DelayedLoad .. rubric:: Example >>> # Test chained crop operations >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> self = orig = DelayedLoad.demo('astro').load_shape() >>> region_slices = slices1 = (slice(0, 90), slice(30, 60)) >>> self = crop1 = orig.delayed_crop(slices1) >>> region_slices = slices2 = (slice(10, 21), slice(10, 22)) >>> self = crop2 = crop1.delayed_crop(slices2) >>> region_slices = slices3 = (slice(3, 20), slice(5, 20)) >>> crop3 = crop2.delayed_crop(slices3) >>> # Spot check internals >>> print('orig = {}'.format(ub.repr2(orig.__json__(), nl=2))) >>> print('crop1 = {}'.format(ub.repr2(crop1.__json__(), nl=2))) >>> print('crop2 = {}'.format(ub.repr2(crop2.__json__(), nl=2))) >>> print('crop3 = {}'.format(ub.repr2(crop3.__json__(), nl=2))) >>> # Test internals >>> assert crop3._immediates['crop'][0].start == 13 >>> assert crop3._immediates['crop'][0].stop == 21 >>> # Test shapes work out correctly >>> assert crop3.finalize().shape == (8, 7, 3) >>> assert crop2.finalize().shape == (11, 12, 3) >>> assert crop1.take_channels([1, 2]).finalize().shape == (90, 30, 2) >>> assert orig.finalize().shape == (512, 512, 3) .. note:: .. code:: This chart gives an intuition on how new absolute slice coords are computed from existing absolute coords ane relative coords. 5 7 <- new 3 5 <- rel -------- 01234567 <- relative coordinates -------- 2 9 <- curr ---------- 0123456789 <- absolute coordinates ---------- .. py:method:: take_channels(self, channels) This method returns a subset of the vision data with only the specified bands / channels. :Parameters: **channels** (*List[int] | slice | channel_spec.FusedChannelSpec*) -- List of integers indexes, a slice, or a channel spec, which is typically a pipe (`|`) delimited list of channel codes. See kwcoco.ChannelSpec for more detials. :returns: a new delayed load with a fused take channel operation :rtype: DelayedLoad .. note:: The channel subset must exist here or it will raise an error. A better implementation (via pymbolic) might be able to do better .. rubric:: Example >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> import kwcoco >>> self = DelayedLoad.demo('astro').load_shape() >>> channels = [2, 0] >>> new = self.take_channels(channels) >>> new3 = new.take_channels([1, 0]) >>> final1 = self.finalize() >>> final2 = new.finalize() >>> final3 = new3.finalize() >>> assert np.all(final1[..., 2] == final2[..., 0]) >>> assert np.all(final1[..., 0] == final2[..., 1]) >>> assert final2.shape[2] == 2 >>> assert np.all(final1[..., 2] == final3[..., 1]) >>> assert np.all(final1[..., 0] == final3[..., 0]) >>> assert final3.shape[2] == 2 .. py:class:: DelayedNans(dsize=None, channels=None) Bases: :py:obj:`DelayedImageOperation` Constructs nan channels as needed .. rubric:: Example self = DelayedNans((10, 10), channel_spec.FusedChannelSpec.coerce('rgb')) region_slices = (slice(5, 10), slice(1, 12)) delayed = self.delayed_crop(region_slices) .. rubric:: Example >>> from kwcoco.util.util_delayed_poc import * # NOQA >>> dsize = (307, 311) >>> c1 = DelayedNans(dsize=dsize, channels=channel_spec.FusedChannelSpec.coerce('foo')) >>> c2 = DelayedLoad.demo('astro', dsize=dsize).load_shape(True) >>> cat = DelayedChannelConcat([c1, c2]) >>> warped_cat = cat.delayed_warp(kwimage.Affine.scale(1.07), dsize=(328, 332)) >>> warped_cat.finalize() #>>> cropped = warped_cat.delayed_crop((slice(0, 300), slice(0, 100))) #>>> cropped.finalize().shape .. py:method:: shape(self) :property: .. py:method:: num_bands(self) :property: .. py:method:: dsize(self) :property: .. py:method:: channels(self) :property: .. py:method:: children(self) Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: _optimize_paths(self, **kwargs) Iterate through the leaf nodes, which are virtually transformed into the root space. This returns some sort of hueristically optimized leaf repr wrt warps. .. py:method:: finalize(self, **kwargs) .. py:method:: delayed_crop(self, region_slices) Create a new delayed image that performs a crop in the transformed "self" space. :Parameters: **region_slices** (*Tuple[slice, slice]*) -- y-slice and x-slice. .. note:: Returns a heuristically "simplified" tree. In the current implementation there are only 3 operations, cat, warp, and crop. All cats go at the top, all crops go at the bottom, all warps are in the middle. :returns: lazy executed delayed transform :rtype: DelayedWarp .. rubric:: Example >>> dsize = (100, 100) >>> tf2 = kwimage.Affine.affine(scale=3).matrix >>> self = DelayedWarp(np.random.rand(33, 33), tf2, dsize) >>> region_slices = (slice(5, 10), slice(1, 12)) >>> delayed_crop = self.delayed_crop(region_slices) >>> print(ub.repr2(delayed_crop.nesting(), nl=-1, sort=0)) >>> delayed_crop.finalize() .. rubric:: Example >>> chan1 = DelayedLoad.demo('astro') >>> chan2 = DelayedLoad.demo('carl') >>> warped1a = chan1.delayed_warp(kwimage.Affine.scale(1.2).matrix) >>> warped2a = chan2.delayed_warp(kwimage.Affine.scale(1.5)) >>> warped1b = warped1a.delayed_warp(kwimage.Affine.scale(1.2).matrix) >>> warped2b = warped2a.delayed_warp(kwimage.Affine.scale(1.5)) >>> # >>> region_slices = (slice(97, 677), slice(5, 691)) >>> self = warped2b >>> # >>> crop1 = warped1b.delayed_crop(region_slices) >>> crop2 = warped2b.delayed_crop(region_slices) >>> print(ub.repr2(warped1b.nesting(), nl=-1, sort=0)) >>> print(ub.repr2(warped2b.nesting(), nl=-1, sort=0)) >>> # Notice how the crop merges the two nesting layers >>> # (via the hueristic optimize step) >>> print(ub.repr2(crop1.nesting(), nl=-1, sort=0)) >>> print(ub.repr2(crop2.nesting(), nl=-1, sort=0)) >>> frame1 = crop1.finalize(dsize=(500, 500)) >>> frame2 = crop2.finalize(dsize=(500, 500)) >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(frame1, pnum=(1, 2, 1), fnum=1) >>> kwplot.imshow(frame2, pnum=(1, 2, 2), fnum=1) .. py:method:: delayed_warp(self, transform, dsize=None) Delayed transform the underlying data. .. note:: this deviates from kwimage warp functions because instead of "output_dims" (specified in c-style shape) we specify dsize (w, h). :returns: new delayed transform a chained transform :rtype: DelayedWarp .. py:class:: DelayedVideoOperation Bases: :py:obj:`DelayedVisionOperation` Base class for nodes in a tree of delayed computer-vision operations .. py:class:: DelayedVisionOperation Bases: :py:obj:`ubelt.NiceRepr` Base class for nodes in a tree of delayed computer-vision operations .. py:method:: __nice__(self) .. py:method:: finalize(self, **kwargs) :abstractmethod: .. py:method:: children(self) :abstractmethod: Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: _optimize_paths(self, **kwargs) Iterate through the leaf nodes, which are virtually transformed into the root space. This returns some sort of hueristically optimized leaf repr wrt warps. .. py:method:: __json__(self) .. py:method:: nesting(self) .. py:method:: warp(self, *args, **kwargs) alias for delayed_warp, might change to this API in the future .. py:method:: crop(self, *args, **kwargs) alias for delayed_crop, might change to this API in the future .. py:class:: DelayedWarp(sub_data, transform=None, dsize=None) Bases: :py:obj:`DelayedImageOperation` POC for chainable transforms .. note:: "sub" is used to refer to the underlying data in its native coordinates and resolution. "self" is used to refer to the data in the transformed coordinates that are exposed by this class. :ivar sub_data: array-like image data at a naitive resolution :vartype sub_data: DelayedWarp | ArrayLike :ivar transform: transforms data from native "sub"-image-space to "self"-image-space. :vartype transform: Transform .. rubric:: Example >>> dsize = (12, 12) >>> tf1 = np.array([[2, 0, 0], [0, 2, 0], [0, 0, 1]]) >>> tf2 = np.array([[3, 0, 0], [0, 3, 0], [0, 0, 1]]) >>> tf3 = np.array([[4, 0, 0], [0, 4, 0], [0, 0, 1]]) >>> band1 = DelayedWarp(np.random.rand(6, 6), tf1, dsize) >>> band2 = DelayedWarp(np.random.rand(4, 4), tf2, dsize) >>> band3 = DelayedWarp(np.random.rand(3, 3), tf3, dsize) >>> # >>> # Execute a crop in a one-level transformed space >>> region_slices = (slice(5, 10), slice(0, 12)) >>> delayed_crop = band2.delayed_crop(region_slices) >>> final_crop = delayed_crop.finalize() >>> # >>> # Execute a crop in a nested transformed space >>> tf4 = np.array([[1.5, 0, 0], [0, 1.5, 0], [0, 0, 1]]) >>> chained = DelayedWarp(band2, tf4, (18, 18)) >>> delayed_crop = chained.delayed_crop(region_slices) >>> final_crop = delayed_crop.finalize() >>> # >>> tf4 = np.array([[.5, 0, 0], [0, .5, 0], [0, 0, 1]]) >>> chained = DelayedWarp(band2, tf4, (6, 6)) >>> delayed_crop = chained.delayed_crop(region_slices) >>> final_crop = delayed_crop.finalize() >>> # >>> region_slices = (slice(1, 5), slice(2, 4)) >>> delayed_crop = chained.delayed_crop(region_slices) >>> final_crop = delayed_crop.finalize() .. rubric:: Example >>> dsize = (17, 12) >>> tf = np.array([[5.2, 0, 1.1], [0, 3.1, 2.2], [0, 0, 1]]) >>> self = DelayedWarp(np.random.rand(3, 5, 13), tf, dsize=dsize) >>> self.finalize().shape .. py:method:: random(cls, nesting=(2, 5), rng=None) :classmethod: .. rubric:: Example >>> self = DelayedWarp.random(nesting=(4, 7)) >>> print('self = {!r}'.format(self)) >>> print(ub.repr2(self.nesting(), nl=-1, sort=0)) .. py:method:: channels(self) :property: .. py:method:: children(self) Abstract method, which should generate all of the direct children of a node in the operation tree. .. py:method:: dsize(self) :property: .. py:method:: num_bands(self) :property: .. py:method:: shape(self) :property: .. py:method:: _optimize_paths(self, **kwargs) .. rubric:: Example >>> self = DelayedWarp.random() >>> leafs = list(self._optimize_paths()) >>> print('leafs = {!r}'.format(leafs)) .. py:method:: finalize(self, transform=None, dsize=None, interpolation='linear', **kwargs) Execute the final transform Can pass a parent transform to augment this underlying transform. :Parameters: * **transform** (*Transform*) -- an additional transform to perform * **dsize** (*Tuple[int, int]*) -- overrides destination canvas size .. rubric:: Example >>> tf = np.array([[0.9, 0, 3.9], [0, 1.1, -.5], [0, 0, 1]]) >>> raw = kwimage.grab_test_image(dsize=(54, 65)) >>> raw = kwimage.ensure_float01(raw) >>> # Test nested finalize >>> layer1 = raw >>> num = 10 >>> for _ in range(num): ... layer1 = DelayedWarp(layer1, tf, dsize='auto') >>> final1 = layer1.finalize() >>> # Test non-nested finalize >>> layer2 = list(layer1._optimize_paths())[0] >>> final2 = layer2.finalize() >>> # >>> print(ub.repr2(layer1.nesting(), nl=-1, sort=0)) >>> print(ub.repr2(layer2.nesting(), nl=-1, sort=0)) >>> print('final1 = {!r}'.format(final1)) >>> print('final2 = {!r}'.format(final2)) >>> print('final1.shape = {!r}'.format(final1.shape)) >>> print('final2.shape = {!r}'.format(final2.shape)) >>> assert np.allclose(final1, final2) >>> # >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> kwplot.imshow(raw, pnum=(1, 3, 1), fnum=1) >>> kwplot.imshow(final1, pnum=(1, 3, 2), fnum=1) >>> kwplot.imshow(final2, pnum=(1, 3, 3), fnum=1) >>> kwplot.show_if_requested() .. rubric:: Example >>> # Test aliasing >>> s = DelayedIdentity.demo() >>> s = DelayedIdentity.demo('checkerboard') >>> a = s.delayed_warp(kwimage.Affine.scale(0.05), dsize='auto') >>> b = s.delayed_warp(kwimage.Affine.scale(3), dsize='auto') >>> # xdoctest: +REQUIRES(--show) >>> import kwplot >>> kwplot.autompl() >>> # It looks like downsampling linear and area is the same >>> # Does warpAffine have no alias handling? >>> pnum_ = kwplot.PlotNums(nRows=2, nCols=4) >>> kwplot.imshow(a.finalize(interpolation='area'), pnum=pnum_(), title='warpAffine area') >>> kwplot.imshow(a.finalize(interpolation='linear'), pnum=pnum_(), title='warpAffine linear') >>> kwplot.imshow(a.finalize(interpolation='nearest'), pnum=pnum_(), title='warpAffine nearest') >>> kwplot.imshow(a.finalize(interpolation='nearest', antialias=False), pnum=pnum_(), title='warpAffine nearest AA=0') >>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='area'), pnum=pnum_(), title='resize area') >>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='linear'), pnum=pnum_(), title='resize linear') >>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='nearest'), pnum=pnum_(), title='resize nearest') >>> kwplot.imshow(kwimage.imresize(s.finalize(), dsize=a.dsize, interpolation='cubic'), pnum=pnum_(), title='resize cubic') .. py:method:: take_channels(self, channels) .. py:data:: IndexableWalker .. py:function:: ensure_json_serializable(dict_, normalize_containers=False, verbose=0) Attempt to convert common types (e.g. numpy) into something json complient Convert numpy and tuples into lists :Parameters: **normalize_containers** (*bool, default=False*) -- if True, normalizes dict containers to be standard python structures. .. rubric:: Example >>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = (1, np.array([1, 2, 3]), {3: np.int32(3), 4: np.float16(1.0)}) >>> dict_ = data >>> print(ub.repr2(data, nl=-1)) >>> assert list(find_json_unserializable(data)) >>> result = ensure_json_serializable(data, normalize_containers=True) >>> print(ub.repr2(result, nl=-1)) >>> assert not list(find_json_unserializable(result)) >>> assert type(result) is dict .. py:function:: find_json_unserializable(data, quickcheck=False) Recurse through json datastructure and find any component that causes a serialization error. Record the location of these errors in the datastructure as we recurse through the call tree. :Parameters: * **data** (*object*) -- data that should be json serializable * **quickcheck** (*bool*) -- if True, check the entire datastructure assuming its ok before doing the python-based recursive logic. :returns: list of "bad part" dictionaries containing items 'value' - the value that caused the serialization error 'loc' - which contains a list of key/indexes that can be used to lookup the location of the unserializable value. If the "loc" is a list, then it indicates a rare case where a key in a dictionary is causing the serialization error. :rtype: List[Dict] .. rubric:: Example >>> from kwcoco.util.util_json import * # NOQA >>> part = ub.ddict(lambda: int) >>> part['foo'] = ub.ddict(lambda: int) >>> part['bar'] = np.array([1, 2, 3]) >>> part['foo']['a'] = 1 >>> # Create a dictionary with two unserializable parts >>> data = [1, 2, {'nest1': [2, part]}, {frozenset({'badkey'}): 3, 2: 4}] >>> parts = list(find_json_unserializable(data)) >>> print('parts = {}'.format(ub.repr2(parts, nl=1))) >>> # Check expected structure of bad parts >>> assert len(parts) == 2 >>> part = parts[1] >>> assert list(part['loc']) == [2, 'nest1', 1, 'bar'] >>> # We can use the "loc" to find the bad value >>> for part in parts: >>> # "loc" is a list of directions containing which keys/indexes >>> # to traverse at each descent into the data structure. >>> directions = part['loc'] >>> curr = data >>> special_flag = False >>> for key in directions: >>> if isinstance(key, list): >>> # special case for bad keys >>> special_flag = True >>> break >>> else: >>> # normal case for bad values >>> curr = curr[key] >>> if special_flag: >>> assert part['data'] in curr.keys() >>> assert part['data'] is key[1] >>> else: >>> assert part['data'] is curr .. py:function:: indexable_allclose(dct1, dct2, return_info=False) Walks through two nested data structures and ensures that everything is roughly the same. :Parameters: * **dct1** -- a nested indexable item * **dct2** -- a nested indexable item .. rubric:: Example >>> from kwcoco.util.util_json import indexable_allclose >>> dct1 = { >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> dct2 = { >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> assert indexable_allclose(dct1, dct2) .. py:class:: SupressPrint(*mods, **kw) Bases: :py:obj:`object` Temporarily replace the print function in a module with a noop :Parameters: * **\*mods** -- the modules to disable print in * **\*\*kw** -- only accepts "enabled" enabled (bool, default=True): enables or disables this context .. py:method:: __enter__(self) .. py:method:: __exit__(self, a, b, c) .. py:class:: StratifiedGroupKFold(n_splits=3, shuffle=False, random_state=None) Bases: :py:obj:`sklearn.model_selection._split._BaseKFold` Stratified K-Folds cross-validator with Grouping Provides train/test indices to split data in train/test sets. This cross-validation object is a variation of GroupKFold that returns stratified folds. The folds are made by preserving the percentage of samples for each class. Read more in the :ref:`User Guide `. :Parameters: **n_splits** (*int, default=3*) -- Number of folds. Must be at least 2. .. py:method:: _make_test_folds(self, X, y=None, groups=None) :Parameters: * **X** (*ndarray*) -- data * **y** (*ndarray*) -- labels * **groups** (*ndarray*) -- groupids for items. Items with the same groupid must be placed in the same group. :returns: test_folds :rtype: list .. rubric:: Example >>> import kwarray >>> rng = kwarray.ensure_rng(0) >>> groups = [1, 1, 3, 4, 2, 2, 7, 8, 8] >>> y = [1, 1, 1, 1, 2, 2, 2, 3, 3] >>> X = np.empty((len(y), 0)) >>> self = StratifiedGroupKFold(random_state=rng, shuffle=True) >>> skf_list = list(self.split(X=X, y=y, groups=groups)) ... >>> import ubelt as ub >>> print(ub.repr2(skf_list, nl=1, with_dtype=False)) [ (np.array([2, 3, 4, 5, 6]), np.array([0, 1, 7, 8])), (np.array([0, 1, 2, 7, 8]), np.array([3, 4, 5, 6])), (np.array([0, 1, 3, 4, 5, 6, 7, 8]), np.array([2])), ] .. py:method:: _iter_test_masks(self, X, y=None, groups=None) Generates boolean masks corresponding to test sets. By default, delegates to _iter_test_indices(X, y, groups) .. py:method:: split(self, X, y, groups=None) Generate indices to split data into training and test set. .. py:function:: smart_truncate(string, max_length=0, separator=' ', trunc_loc=0.5) Truncate a string. :param string (str): string for modification :param max_length (int): output string length :param word_boundary (bool): :param save_order (bool): if True then word order of output string is like input string :param separator (str): separator between words :param trunc_loc (float): fraction of location where to remove the text :return: