kwcoco.util package¶
Subpackages¶
- kwcoco.util.delayed_ops package
- Module contents
DelayedArrayDelayedAsXarrayDelayedChannelConcatDelayedChannelConcat.metaDelayedChannelConcat._opt_logsDelayedChannelConcat.partsDelayedChannelConcat.dsizeDelayedChannelConcat.num_channelsDelayedChannelConcat._finalize()DelayedChannelConcat._push_operation_under()DelayedChannelConcat._validate()DelayedChannelConcat.as_xarray()DelayedChannelConcat.channelsDelayedChannelConcat.num_overviewsDelayedChannelConcat.optimize()DelayedChannelConcat.shapeDelayedChannelConcat.take_channels()DelayedChannelConcat.undo_warps()
DelayedConcatDelayedCropDelayedDequantizeDelayedFrameStackDelayedIdentityDelayedImageDelayedImage.metaDelayedImage._opt_logsDelayedImage.subdataDelayedImage._opt_push_under_concat()DelayedImage._transform_from_subdata()DelayedImage._validate()DelayedImage.channelsDelayedImage.dsizeDelayedImage.evaluate()DelayedImage.get_transform_from_leaf()DelayedImage.num_channelsDelayedImage.num_overviewsDelayedImage.shapeDelayedImage.take_channels()DelayedImage.undo_warp()
DelayedImageLeafDelayedLoadDelayedNansDelayedNaryOperationDelayedOperationDelayedOperation.metaDelayedOperation._opt_logsDelayedOperation._finalize()DelayedOperation._leaf_paths()DelayedOperation._leafs()DelayedOperation._set_nested_params()DelayedOperation._traverse()DelayedOperation._traversed_graph()DelayedOperation.as_graph()DelayedOperation.children()DelayedOperation.finalize()DelayedOperation.leafs()DelayedOperation.nesting()DelayedOperation.optimize()DelayedOperation.prepare()DelayedOperation.print_graph()DelayedOperation.shapeDelayedOperation.write_network_text()
DelayedOverviewDelayedOverview.metaDelayedOverview._opt_logsDelayedOverview.subdataDelayedOverview._finalize()DelayedOverview._opt_crop_after_overview()DelayedOverview._opt_dequant_after_overview()DelayedOverview._opt_fuse_overview()DelayedOverview._opt_overview_as_warp()DelayedOverview._opt_warp_after_overview()DelayedOverview._transform_from_subdata()DelayedOverview.num_overviewsDelayedOverview.optimize()
DelayedStackDelayedUnaryOperationDelayedWarpDelayedWarp.metaDelayedWarp._opt_logsDelayedWarp.subdataDelayedWarp._data_keysDelayedWarp._algo_keysDelayedWarp._finalize()DelayedWarp._opt_absorb_overview()DelayedWarp._opt_fuse_warps()DelayedWarp._opt_split_warp_overview()DelayedWarp._transform_from_subdata()DelayedWarp.optimize()DelayedWarp.transform
ImageOpsMixin
- Module contents
Submodules¶
- kwcoco.util.dict_like module
- kwcoco.util.dict_proxy2 module
- kwcoco.util.ijson_ext module
- kwcoco.util.jsonschema_elements module
- kwcoco.util.lazy_frame_backends module
- kwcoco.util.util_archive module
- kwcoco.util.util_deprecate module
- kwcoco.util.util_eval module
- kwcoco.util.util_futures module
- kwcoco.util.util_json module
- kwcoco.util.util_kwutil module
- kwcoco.util.util_monkey module
- kwcoco.util.util_networkx module
- kwcoco.util.util_parallel module
- kwcoco.util.util_reroot module
- kwcoco.util.util_rich module
- kwcoco.util.util_sklearn module
- kwcoco.util.util_special_json module
- kwcoco.util.util_truncate module
- kwcoco.util.util_windows module
Module contents¶
mkinit ~/code/kwcoco/kwcoco/util/__init__.py -w mkinit ~/code/kwcoco/kwcoco/util/__init__.py –lazy
- kwcoco.util.ALLOF(*TYPES)¶
- kwcoco.util.ANYOF(*TYPES)¶
- kwcoco.util.ARRAY(TYPE={}, **kw)¶
https://json-schema.org/understanding-json-schema/reference/array.html
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> ARRAY(numItems=3) >>> schema = ARRAY(minItems=3) >>> schema.validate() {'type': 'array', 'items': {}, 'minItems': 3}
- class kwcoco.util.Archive(fpath=None, mode='r', backend=None, file=None)[source]¶
Bases:
objectAbstraction over zipfile and tarfile
Todo
see if we can use one of these other tools instead
Example
>>> from kwcoco.util.util_archive import Archive >>> import ubelt as ub >>> dpath = ub.Path.appdir('kwcoco', 'tests', 'util', 'archive') >>> dpath.delete().ensuredir() >>> # Test write mode >>> mode = 'w' >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode=mode) >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode=mode) >>> open(dpath / 'data_1only.txt', 'w').write('bazbzzz') >>> open(dpath / 'data_2only.txt', 'w').write('buzzz') >>> open(dpath / 'data_both.txt', 'w').write('foobar') >>> # >>> arc_zip.add(dpath / 'data_both.txt') >>> arc_zip.add(dpath / 'data_1only.txt') >>> # >>> arc_tar.add(dpath / 'data_both.txt') >>> arc_tar.add(dpath / 'data_2only.txt') >>> # >>> arc_zip.close() >>> arc_tar.close() >>> # >>> # Test read mode >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode='r') >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode='r') >>> # Test names >>> name = 'data_both.txt' >>> assert name in arc_zip.names() >>> assert name in arc_tar.names() >>> # Test read >>> assert arc_zip.read(name, mode='r') == 'foobar' >>> assert arc_tar.read(name, mode='r') == 'foobar' >>> # >>> # Test extractall >>> extract_dpath = ub.ensuredir(str(dpath / 'extracted')) >>> extracted1 = arc_zip.extractall(extract_dpath) >>> extracted2 = arc_tar.extractall(extract_dpath) >>> for fpath in extracted2: >>> print(open(fpath, 'r').read()) >>> for fpath in extracted1: >>> print(open(fpath, 'r').read())
- Parameters:
fpath (str | None) – path to open
mode (str) – either r or w
backend (str | ModuleType | None) – either tarfile, zipfile string or module.
file (tarfile.TarFile | zipfile.ZipFile | None) – the open backend file if it already exists. If not set, than fpath will open it.
- _available_backends = {'tarfile': <module 'tarfile' from '/home/docs/.asdf/installs/python/3.13.3/lib/python3.13/tarfile.py'>, 'zipfile': <module 'zipfile' from '/home/docs/.asdf/installs/python/3.13.3/lib/python3.13/zipfile/__init__.py'>}¶
- classmethod coerce(data)[source]¶
Either open an archive file path or coerce an existing ZipFile or tarfile structure into this wrapper class
- read(name, mode='rb')[source]¶
Read data directly out of the archive.
- Parameters:
name (str) – the name of the archive member to read
mode (str) – This is a conceptual parameter that emulates the usual open mode. Defaults to “rb”, which returns data as raw bytes. If “r” will decode the bytes into utf8-text.
- class kwcoco.util.ContainerElements[source]¶
Bases:
objectTypes that contain other types
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> print(elem.ARRAY().validate()) >>> print(elem.OBJECT().validate()) >>> print(elem.OBJECT().validate()) {'type': 'array', 'items': {}} {'type': 'object', 'properties': {}} {'type': 'object', 'properties': {}}
- ARRAY(TYPE={}, **kw)[source]¶
https://json-schema.org/understanding-json-schema/reference/array.html
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> ARRAY(numItems=3) >>> schema = ARRAY(minItems=3) >>> schema.validate() {'type': 'array', 'items': {}, 'minItems': 3}
- OBJECT(PROPERTIES={}, **kw)[source]¶
https://json-schema.org/understanding-json-schema/reference/object.html
Example
>>> import jsonschema >>> schema = elem.OBJECT() >>> jsonschema.validate({}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.ANY(), >>> 'key2': elem.ANY(), >>> }, required=['key1']) >>> jsonschema.validate({'key1': None}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.OBJECT({'arr': elem.ARRAY()}), >>> 'key2': elem.ANY(), >>> }, required=['key1'], title='a title') >>> schema.validate() >>> print('schema = {}'.format(ub.urepr(schema, sort=1, nl=-1))) >>> jsonschema.validate({'key1': {'arr': []}}, schema) schema = { 'properties': { 'key1': { 'properties': { 'arr': {'items': {}, 'type': 'array'} }, 'type': 'object' }, 'key2': {} }, 'required': ['key1'], 'title': 'a title', 'type': 'object' }
- class kwcoco.util.DictLike[source]¶
Bases:
NiceRepr- An inherited class must specify the
getitem,setitem, and keysmethods.
A class is dictionary like if it has:
__iter__,__len__,__contains__,__getitem__,items,keys,values,get,and if it should be writable it should have:
__delitem__,__setitem__,update,And perhaps:
copy,__iter__,__len__,__contains__,__getitem__,items,keys,values,get,and if it should be writable it should have:
__delitem__,__setitem__,update,And perhaps:
copy,- asdict()¶
- Return type:
Dict
- An inherited class must specify the
- class kwcoco.util.Element(base, options={}, _magic=None)[source]¶
Bases:
dictA dictionary used to define an element of a JSON Schema.
The exact keys/values for the element will depend on the type of element being described. The
SchemaElementsdefines exactly what these are for the core elements. (e.g. OBJECT, INTEGER, NULL, ARRAY, ANYOF)Example
>>> from kwcoco.coco_schema import * # NOQA >>> self = Element(base={'type': 'demo'}, options={'opt1', 'opt2'}) >>> new = self(opt1=3) >>> print('self = {}'.format(ub.urepr(self, nl=1, sort=1))) >>> print('new = {}'.format(ub.urepr(new, nl=1, sort=1))) >>> print('new2 = {}'.format(ub.urepr(new(), nl=1, sort=1))) >>> print('new3 = {}'.format(ub.urepr(new(title='myvar'), nl=1, sort=1))) >>> print('new4 = {}'.format(ub.urepr(new(title='myvar')(examples=['']), nl=1, sort=1))) >>> print('new5 = {}'.format(ub.urepr(new(badattr=True), nl=1, sort=1))) self = { 'type': 'demo', } new = { 'opt1': 3, 'type': 'demo', } new2 = { 'opt1': 3, 'type': 'demo', } new3 = { 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new4 = { 'examples': [''], 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new5 = { 'opt1': 3, 'type': 'demo', }
- Parameters:
base (dict) – the keys / values this schema must contain
options (dict) – the keys / values this schema may contain
_magic (callable | None) – called when creating an instance of this schema element. Allows convenience attributes to be converted to the formal jsonschema specs. TODO: _magic is a terrible name, we need to rename it with something descriptive.
- class kwcoco.util.IndexableWalker(data, dict_cls=(<class 'dict'>, ), list_cls=(<class 'list'>, <class 'tuple'>))[source]¶
Bases:
GeneratorTraverses through a nested tree-liked indexable structure.
Generates a path and value to each node in the structure. The path is a list of indexes which if applied in order will reach the value.
The
__setitem__method can be used to modify a nested value based on the path returned by the generator.When generating values, you can use “send” to prevent traversal of a particular branch.
- RelatedWork:
- https://pypi.org/project/python-benedict/ - implements a dictionary
subclass with similar nested indexing abilities.
- Variables:
dict_cls (Tuple[type]) – the types that should be considered dictionary mappings for the purpose of nested iteration. Defaults to
dict.list_cls (Tuple[type]) – the types that should be considered list-like for the purposes of nested iteration. Defaults to
(list, tuple).indexable_cls (Tuple[type]) – combined dict_cls and list_cls
Example
>>> import ubelt as ub >>> # Given Nested Data >>> data = { >>> 'foo': {'bar': 1}, >>> 'baz': [{'biz': 3}, {'buz': [4, 5, 6]}], >>> } >>> # Create an IndexableWalker >>> walker = ub.IndexableWalker(data) >>> # We iterate over the data as if it was flat >>> # ignore the <want> string due to order issues on older Pythons >>> # xdoctest: +IGNORE_WANT >>> for path, val in walker: >>> print(path) ['foo'] ['baz'] ['baz', 0] ['baz', 1] ['baz', 1, 'buz'] ['baz', 1, 'buz', 0] ['baz', 1, 'buz', 1] ['baz', 1, 'buz', 2] ['baz', 0, 'biz'] ['foo', 'bar'] >>> # We can use "paths" as keys to getitem into the walker >>> path = ['baz', 1, 'buz', 2] >>> val = walker[path] >>> assert val == 6 >>> # We can use "paths" as keys to setitem into the walker >>> assert data['baz'][1]['buz'][2] == 6 >>> walker[path] = 7 >>> assert data['baz'][1]['buz'][2] == 7 >>> # We can use "paths" as keys to delitem into the walker >>> assert data['baz'][1]['buz'][1] == 5 >>> del walker[['baz', 1, 'buz', 1]] >>> assert data['baz'][1]['buz'][1] == 7
Example
>>> # Create nested data >>> # xdoctest: +REQUIRES(module:numpy) >>> import numpy as np >>> import ubelt as ub >>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = np.array([1, 2, 3]) >>> data['foo']['c'] = [1, 2, 3] >>> data['baz'] = 3 >>> print('data = {}'.format(ub.repr2(data, nl=True))) >>> # We can walk through every node in the nested tree >>> walker = ub.IndexableWalker(data) >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> if path[-1] == 'c': >>> # Use send to prevent traversing this branch >>> got = walker.send(False) >>> # We can modify the value based on the returned path >>> walker[path] = 'changed the value of c' >>> print('data = {}'.format(ub.repr2(data, nl=True))) >>> assert data['foo']['c'] == 'changed the value of c'
Example
>>> # Test sending false for every data item >>> import ubelt as ub >>> data = {1: [1, 2, 3], 2: [1, 2, 3]} >>> walker = ub.IndexableWalker(data) >>> # Sending false means you wont traverse any further on that path >>> num_iters_v1 = 0 >>> for path, value in walker: >>> print('[v1] walk path = {}'.format(ub.repr2(path, nl=0))) >>> walker.send(False) >>> num_iters_v1 += 1 >>> num_iters_v2 = 0 >>> for path, value in walker: >>> # When we dont send false we walk all the way down >>> print('[v2] walk path = {}'.format(ub.repr2(path, nl=0))) >>> num_iters_v2 += 1 >>> assert num_iters_v1 == 2 >>> assert num_iters_v2 == 8
Example
>>> # Test numpy >>> # xdoctest: +REQUIRES(CPython) >>> # xdoctest: +REQUIRES(module:numpy) >>> import ubelt as ub >>> import numpy as np >>> # By default we don't recurse into ndarrays because they >>> # Are registered as an indexable class >>> data = {2: np.array([1, 2, 3])} >>> walker = ub.IndexableWalker(data) >>> num_iters = 0 >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> num_iters += 1 >>> assert num_iters == 1 >>> # Currently to use top-level ndarrays, you need to extend what the >>> # list class is. This API may change in the future to be easier >>> # to work with. >>> data = np.random.rand(3, 5) >>> walker = ub.IndexableWalker(data, list_cls=(list, tuple, np.ndarray)) >>> num_iters = 0 >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> num_iters += 1 >>> assert num_iters == 3 + 3 * 5
- _abc_impl = <_abc._abc_data object>¶
- _walk(data=None, prefix=[])[source]¶
Defines the underlying generator used by IndexableWalker
- Yields:
- Tuple[List, Any] | None – path (List) - a “path” through the nested data structure
value (Any) - the value indexed by that “path”.
Can also yield None in the case that send is called on the generator.
- allclose(other, rel_tol=1e-09, abs_tol=0.0, equal_nan=False, return_info=False)[source]¶
Walks through this and another nested data structures and checks if everything is roughly the same.
- Parameters:
other (IndexableWalker | List | Dict) – a nested indexable item to compare against.
rel_tol (float) – maximum difference for being considered “close”, relative to the magnitude of the input values
abs_tol (float) – maximum difference for being considered “close”, regardless of the magnitude of the input values
equal_nan (bool) – if True, numpy must be available, and consider nans as equal.
return_info (bool) – if True, return extra info dict. Defaults to False.
- Returns:
A boolean result if
return_infois false, otherwise a tuple of the boolean result and an “info” dict containing detailed results indicating what matched and what did not.- Return type:
Example
>>> import ubelt as ub >>> items1 = ub.IndexableWalker({ >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> items2 = ub.IndexableWalker({ >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> flag, return_info = items1.allclose(items2, return_info=True) >>> print('return_info = {}'.format(ub.repr2(return_info, nl=1))) >>> print('flag = {!r}'.format(flag)) >>> for p1, v1, v2 in return_info['faillist']: >>> v1_ = items1[p1] >>> print('*fail p1, v1, v2 = {}, {}, {}'.format(p1, v1, v2)) >>> for p1 in return_info['passlist']: >>> v1_ = items1[p1] >>> print('*pass p1, v1_ = {}, {}'.format(p1, v1_)) >>> assert not flag
>>> import ubelt as ub >>> items1 = ub.IndexableWalker({ >>> 'foo': [1.0000000000000000000000001, 1.], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> items2 = ub.IndexableWalker({ >>> 'foo': [0.9999999999999999, 1.], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> flag, return_info = items1.allclose(items2, return_info=True) >>> print('return_info = {}'.format(ub.repr2(return_info, nl=1))) >>> print('flag = {!r}'.format(flag)) >>> assert flag
Example
>>> import ubelt as ub >>> flag, return_info = ub.IndexableWalker([]).allclose(ub.IndexableWalker([]), return_info=True) >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag)) >>> assert flag
Example
>>> import ubelt as ub >>> flag = ub.IndexableWalker([]).allclose([], return_info=False) >>> print('flag = {!r}'.format(flag)) >>> assert flag
Example
>>> import ubelt as ub >>> flag, return_info = ub.IndexableWalker([]).allclose([1], return_info=True) >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag)) >>> assert not flag
Example
>>> # xdoctest: +REQUIRES(module:numpy) >>> import ubelt as ub >>> import numpy as np >>> a = np.random.rand(3, 5) >>> b = a + 1 >>> wa = ub.IndexableWalker(a, list_cls=(np.ndarray,)) >>> wb = ub.IndexableWalker(b, list_cls=(np.ndarray,)) >>> flag, return_info = wa.allclose(wb, return_info=True) >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag)) >>> assert not flag >>> a = np.random.rand(3, 5) >>> b = a.copy() + 1e-17 >>> wa = ub.IndexableWalker([a], list_cls=(np.ndarray, list)) >>> wb = ub.IndexableWalker([b], list_cls=(np.ndarray, list)) >>> flag, return_info = wa.allclose(wb, return_info=True) >>> assert flag >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag))
- diff(other, rel_tol=1e-09, abs_tol=0.0, equal_nan=False)[source]¶
Walks through two nested data structures finds differences in the structures.
- Parameters:
other (IndexableWalker | List | Dict) – a nested indexable item to compare against.
rel_tol (float) – maximum difference for being considered “close”, relative to the magnitude of the input values
abs_tol (float) – maximum difference for being considered “close”, regardless of the magnitude of the input values
equal_nan (bool) – if True, numpy must be available, and consider nans as equal.
- Returns:
- information about the diff with
”similarity”: a score between 0 and 1 “num_differences” being the number of paths not common plus the
number of common paths with differing values.
”unique1”: being the paths that were unique to self “unique2”: being the paths that were unique to other “faillist”: a list 3-tuples of common path and differing values “num_approximations”:
is the number of approximately equal items (i.e. floats) there were
- Return type:
Example
>>> import ubelt as ub >>> dct1 = { >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> 'top': [1, 2, 3], >>> 'L0': {'L1': {'L2': {'K1': 'V1', 'K2': 'V2', 'D1': 1, 'D2': 2}}}, >>> } >>> dct2 = { >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> 'buz': {1: 2}, >>> 'top': [1, 1, 2], >>> 'L0': {'L1': {'L2': {'K1': 'V1', 'K2': 'V2', 'D1': 10, 'D2': 20}}}, >>> } >>> info = ub.IndexableWalker(dct1).diff(dct2) >>> print(f'info = {ub.urepr(info, nl=2)}')
Example
>>> # xdoctest: +REQUIRES(module:numpy) >>> import ubelt as ub >>> import numpy as np >>> a = np.random.rand(3, 5) >>> b = a + 1 >>> wa = ub.IndexableWalker(a, list_cls=(np.ndarray,)) >>> wb = ub.IndexableWalker(b, list_cls=(np.ndarray,)) >>> info = wa.diff(wb) >>> print(f'info = {ub.urepr(info, nl=2)}') >>> a = np.random.rand(3, 5) >>> b = a.copy() + 1e-17 >>> wa = ub.IndexableWalker([a], list_cls=(np.ndarray, list)) >>> wb = ub.IndexableWalker([b], list_cls=(np.ndarray, list)) >>> info = wa.diff(wb) >>> print(f'info = {ub.urepr(info, nl=2)}')
Example
>>> import ubelt as ub >>> # test null similarity >>> wa = ub.IndexableWalker({}).diff({}) >>> assert wa['similarity'] == 1.0
- keys(non_leaf=False)[source]¶
Iterate over the nested paths in the container.
- Parameters:
non_leaf (bool) – if True, also returns non-leaf indexes, which are indexes to intermediate nested structures. Defaults to False.
- Yields:
List – a path of indexes into the nested structure
Example
>>> import ubelt as ub >>> data = { >>> 'foo': {'bar': 1}, >>> 'baz': [{'biz': 3}, 'mid', {'buz': [4, 5, 6]}], >>> 'emptynest': [[]], >>> } >>> keys = list(ub.IndexableWalker(data).keys()) >>> keys_with_nonleafs = list(ub.IndexableWalker(data).keys(non_leaf=True)) >>> print(f'keys = {ub.urepr(keys, nl=1)}') keys = [ ['baz', 1], ['baz', 2, 'buz', 0], ['baz', 2, 'buz', 1], ['baz', 2, 'buz', 2], ['baz', 0, 'biz'], ['foo', 'bar'], ] >>> assert not any(['emptynest' in p for p in keys]) >>> assert any(['emptynest' in p for p in keys_with_nonleafs])
- throw(typ[, val[, tb]]) raise exception in generator,[source]¶
return next yielded value or raise StopIteration.
- Parameters:
typ (Any) – Type of the exception. Should be a
type[BaseException], type checking is not working right here.val (Optional[object])
tb (Optional[TracebackType])
- Returns:
Any
- Raises:
References
[GeneratorThrow]https://docs.python.org/3/reference/expressions.html#generator.throw
- values(non_leaf=False)[source]¶
Iterate over the values nested within the container.
- Parameters:
non_leaf (bool) – if True, also returns non-leaf indexes, which are indexes to intermediate nested structures. Defaults to False.
- Yields:
Any – the value at each nested path.
Example
>>> import ubelt as ub >>> data = { >>> 'foo': {'bar': 1}, >>> 'baz': [{'biz': 3}, 'mid', {'buz': [4, 5, 6]}], >>> 'emptynest': [[]], >>> } >>> values = list(ub.IndexableWalker(data).values()) >>> keys_with_nonleafs = list(ub.IndexableWalker(data).values(non_leaf=True)) >>> print(values) ['mid', 4, 5, 6, 3, 1] >>> assert not any(isinstance(v, list) for v in values) >>> assert any(isinstance(v, list) for v in keys_with_nonleafs)
- kwcoco.util.NOT(TYPE)¶
- kwcoco.util.OBJECT(PROPERTIES={}, **kw)¶
https://json-schema.org/understanding-json-schema/reference/object.html
Example
>>> import jsonschema >>> schema = elem.OBJECT() >>> jsonschema.validate({}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.ANY(), >>> 'key2': elem.ANY(), >>> }, required=['key1']) >>> jsonschema.validate({'key1': None}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.OBJECT({'arr': elem.ARRAY()}), >>> 'key2': elem.ANY(), >>> }, required=['key1'], title='a title') >>> schema.validate() >>> print('schema = {}'.format(ub.urepr(schema, sort=1, nl=-1))) >>> jsonschema.validate({'key1': {'arr': []}}, schema) schema = { 'properties': { 'key1': { 'properties': { 'arr': {'items': {}, 'type': 'array'} }, 'type': 'object' }, 'key2': {} }, 'required': ['key1'], 'title': 'a title', 'type': 'object' }
- kwcoco.util.ONEOF(*TYPES)¶
- class kwcoco.util.QuantifierElements[source]¶
Bases:
objectQuantifier types
https://json-schema.org/understanding-json-schema/reference/combining.html#allof
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem.ANYOF(elem.STRING, elem.NUMBER).validate() >>> elem.ONEOF(elem.STRING, elem.NUMBER).validate() >>> elem.NOT(elem.NULL).validate() >>> elem.NOT(elem.ANY).validate() >>> elem.ANY.validate()
- property ANY¶
- class kwcoco.util.ScalarElements[source]¶
Bases:
objectSingle-valued elements
- property BOOLEAN¶
//json-schema.org/understanding-json-schema/reference/null.html
- Type:
https
- property INTEGER¶
//json-schema.org/understanding-json-schema/reference/numeric.html#integer
- Type:
https
- property NULL¶
//json-schema.org/understanding-json-schema/reference/null.html
- Type:
https
- property NUMBER¶
//json-schema.org/understanding-json-schema/reference/numeric.html#number
- Type:
https
- property STRING¶
//json-schema.org/understanding-json-schema/reference/string.html
- Type:
https
- class kwcoco.util.SchemaElements[source]¶
Bases:
ScalarElements,QuantifierElements,ContainerElementsFunctional interface into defining jsonschema structures.
See mixin classes for details.
References
https://json-schema.org/understanding-json-schema/
Todo
[ ] Generics: title, description, default, examples
CommandLine
xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/jsonschema_elements.py SchemaElements
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem = SchemaElements() >>> elem.ARRAY(elem.ANY()) >>> schema = OBJECT({ >>> 'prop1': ARRAY(INTEGER, minItems=3), >>> 'prop2': ARRAY(STRING, numItems=2), >>> 'prop3': ARRAY(OBJECT({ >>> 'subprob1': NUMBER, >>> 'subprob2': NUMBER, >>> })) >>> }) >>> print('schema = {}'.format(ub.urepr(schema, nl=2, sort=1))) schema = { 'properties': { 'prop1': {'items': {'type': 'integer'}, 'minItems': 3, 'type': 'array'}, 'prop2': {'items': {'type': 'string'}, 'maxItems': 2, 'minItems': 2, 'type': 'array'}, 'prop3': {'items': {'properties': {'subprob1': {'type': 'number'}, 'subprob2': {'type': 'number'}}, 'type': 'object'}, 'type': 'array'}, }, 'type': 'object', }
>>> TYPE = elem.OBJECT({ >>> 'p1': ANY, >>> 'p2': ANY, >>> }, required=['p1']) >>> import jsonschema >>> inst = {'p1': None} >>> jsonschema.validate(inst, schema=TYPE) >>> #jsonschema.validate({'p2': None}, schema=TYPE)
- class kwcoco.util.StratifiedGroupKFold(n_splits=3, shuffle=False, random_state=None)[source]¶
Bases:
_BaseKFoldStratified K-Folds cross-validator with Grouping
Provides train/test indices to split data in train/test sets.
This cross-validation object is a variation of GroupKFold that returns stratified folds. The folds are made by preserving the percentage of samples for each class.
This is an old interface and should likely be refactored and modernized.
- Parameters:
n_splits (int, default=3) – Number of folds. Must be at least 2.
- _abc_impl = <_abc._abc_data object>¶
- _make_test_folds(X, y=None, groups=None)[source]¶
- Parameters:
X (ndarray) – data
y (ndarray) – labels
groups (ndarray) – groupids for items. Items with the same groupid must be placed in the same group.
- Returns:
test_folds
- Return type:
Example
>>> from kwcoco.util.util_sklearn import * # NOQA >>> import kwarray >>> rng = kwarray.ensure_rng(0) >>> groups = [1, 1, 3, 4, 2, 2, 7, 8, 8] >>> y = [1, 1, 1, 1, 2, 2, 2, 3, 3] >>> X = np.empty((len(y), 0)) >>> self = StratifiedGroupKFold(random_state=rng, shuffle=True) >>> skf_list = list(self.split(X=X, y=y, groups=groups)) >>> import ubelt as ub >>> print(ub.urepr(skf_list, nl=1, with_dtype=False)) [ (np.array([2, 3, 4, 5, 6]), np.array([0, 1, 7, 8])), (np.array([0, 1, 2, 7, 8]), np.array([3, 4, 5, 6])), (np.array([0, 1, 3, 4, 5, 6, 7, 8]), np.array([2])), ]
- kwcoco.util.ensure_json_serializable(dict_, normalize_containers=False, verbose=0)[source]¶
Attempt to convert common types (e.g. numpy) into something json compliant
Convert numpy and tuples into lists
- Parameters:
normalize_containers (bool) – if True, normalizes dict containers to be standard python structures. Defaults to False.
Note
This function is now in kwutil. Might be best to use that variant. See
kwutil.util_json.ensure_json_serializable().Example
>>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = (1, np.array([1, 2, 3]), {3: np.int32(3), 4: np.float16(1.0)}) >>> dict_ = data >>> print(ub.urepr(data, nl=-1)) >>> assert list(find_json_unserializable(data)) >>> result = ensure_json_serializable(data, normalize_containers=True) >>> print(ub.urepr(result, nl=-1)) >>> assert not list(find_json_unserializable(result)) >>> assert type(result) is dict
- kwcoco.util.find_json_unserializable(data, quickcheck=False)[source]¶
Recurse through json datastructure and find any component that causes a serialization error. Record the location of these errors in the datastructure as we recurse through the call tree.
Note
This function is now in kwutil. Might be best to use that variant. See
kwutil.util_json.find_json_unserializable().- Parameters:
data (object) – data that should be json serializable
quickcheck (bool) – if True, check the entire datastructure assuming its ok before doing the python-based recursive logic.
- Returns:
list of “bad part” dictionaries containing items
’value’ - the value that caused the serialization error
’loc’ - which contains a list of key/indexes that can be used to lookup the location of the unserializable value. If the “loc” is a list, then it indicates a rare case where a key in a dictionary is causing the serialization error.
- Return type:
List[Dict]
Example
>>> from kwcoco.util.util_json import * # NOQA >>> part = ub.ddict(lambda: int) >>> part['foo'] = ub.ddict(lambda: int) >>> part['bar'] = np.array([1, 2, 3]) >>> part['foo']['a'] = 1 >>> # Create a dictionary with two unserializable parts >>> data = [1, 2, {'nest1': [2, part]}, {frozenset({'badkey'}): 3, 2: 4}] >>> parts = list(find_json_unserializable(data)) >>> print('parts = {}'.format(ub.urepr(parts, nl=1))) >>> # Check expected structure of bad parts >>> assert len(parts) == 2 >>> part = parts[1] >>> assert list(part['loc']) == [2, 'nest1', 1, 'bar'] >>> # We can use the "loc" to find the bad value >>> for part in parts: >>> # "loc" is a list of directions containing which keys/indexes >>> # to traverse at each descent into the data structure. >>> directions = part['loc'] >>> curr = data >>> special_flag = False >>> for key in directions: >>> if isinstance(key, list): >>> # special case for bad keys >>> special_flag = True >>> break >>> else: >>> # normal case for bad values >>> curr = curr[key] >>> if special_flag: >>> assert part['data'] in curr.keys() >>> assert part['data'] is key[1] >>> else: >>> assert part['data'] is curr
Example
>>> # xdoctest: +SKIP("TODO: circular ref detect algo is wrong, fix it") >>> from kwcoco.util.util_json import * # NOQA >>> import pytest >>> # Test circular reference >>> data = [[], {'a': []}] >>> data[1]['a'].append(data) >>> with pytest.raises(ValueError, match="Circular reference detected at.*1, 'a', 1*"): ... parts = list(find_json_unserializable(data)) >>> # Should be ok here >>> shared_data = {'shared': 1} >>> data = [[shared_data], shared_data] >>> parts = list(find_json_unserializable(data))
- kwcoco.util.indexable_allclose(dct1, dct2, return_info=False)[source]¶
Walks through two nested data structures and ensures that everything is roughly the same.
Note
DEPRECATED. Use
ubelt.IndexableWalker.allclose()instead.- Parameters:
dct1 – a nested indexable item
dct2 – a nested indexable item
Example
>>> from kwcoco.util.util_json import indexable_allclose >>> dct1 = { >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> dct2 = { >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> assert indexable_allclose(dct1, dct2)
- kwcoco.util.resolve_directory_symlinks(path)[source]¶
Only resolve symlinks of directories, not the base file
- kwcoco.util.resolve_relative_to(path, dpath, strict=False)[source]¶
Given a path, try to resolve its symlinks such that it is relative to the given dpath.
Example
>>> from kwcoco.util.util_reroot import * # NOQA >>> import os >>> def _symlink(self, target, verbose=0): >>> return ub.Path(ub.symlink(target, self, verbose=verbose)) >>> ub.Path._symlink = _symlink >>> # >>> # TODO: try to enumerate all basic cases >>> # >>> base = ub.Path.appdir('kwcoco/tests/reroot') >>> base.delete().ensuredir() >>> # >>> drive1 = (base / 'drive1').ensuredir() >>> drive2 = (base / 'drive2').ensuredir() >>> # >>> data_repo1 = (drive1 / 'data_repo1').ensuredir() >>> cache = (data_repo1 / '.cache').ensuredir() >>> real_file1 = (cache / 'real_file1').touch() >>> # >>> real_bundle = (data_repo1 / 'real_bundle').ensuredir() >>> real_assets = (real_bundle / 'assets').ensuredir() >>> # >>> # Symlink file outside of the bundle >>> link_file1 = (real_assets / 'link_file1')._symlink(real_file1) >>> real_file2 = (real_assets / 'real_file2').touch() >>> link_file2 = (real_assets / 'link_file2')._symlink(real_file2) >>> # >>> # >>> # A symlink to the data repo >>> data_repo2 = (drive1 / 'data_repo2')._symlink(data_repo1) >>> data_repo3 = (drive2 / 'data_repo3')._symlink(data_repo1) >>> data_repo4 = (drive2 / 'data_repo4')._symlink(data_repo2) >>> # >>> # A prediction repo TODO >>> pred_repo5 = (drive2 / 'pred_repo5').ensuredir() >>> # >>> # _ = ub.cmd(f'tree -a {base}', verbose=3) >>> # >>> fpaths = [] >>> for r, ds, fs in os.walk(base, followlinks=True): >>> for f in fs: >>> if 'file' in f: >>> fpath = ub.Path(r) / f >>> fpaths.append(fpath) >>> # >>> # >>> dpath = real_bundle.resolve() >>> # >>> for path in fpaths: >>> # print(f'{path}') >>> # print(f'{path.resolve()=}') >>> resolved_rel = resolve_relative_to(path, dpath) >>> print('resolved_rel = {!r}'.format(resolved_rel))
- kwcoco.util.smart_truncate(string, max_length=0, separator=' ', trunc_loc=0.5, trunc_char='~')[source]¶
Truncate a string. :param string (str): string for modification :param max_length (int): output string length :param word_boundary (bool): :param save_order (bool): if True then word order of output string is like input string :param separator (str): separator between words :param trunc_loc (float): fraction of location where to remove the text
trunc_char (str): the character to denote where truncation is starting
- Returns: