kwcoco.util package¶
Subpackages¶
- kwcoco.util.delayed_ops package
- Module contents
DelayedArray
DelayedAsXarray
DelayedChannelConcat
DelayedChannelConcat.channels
DelayedChannelConcat.shape
DelayedChannelConcat._finalize()
DelayedChannelConcat.optimize()
DelayedChannelConcat.take_channels()
DelayedChannelConcat.num_overviews
DelayedChannelConcat.as_xarray()
DelayedChannelConcat._push_operation_under()
DelayedChannelConcat._validate()
DelayedChannelConcat.undo_warps()
DelayedConcat
DelayedCrop
DelayedDequantize
DelayedFrameStack
DelayedIdentity
DelayedImage
DelayedImage.shape
DelayedImage.num_channels
DelayedImage.dsize
DelayedImage.channels
DelayedImage.num_overviews
DelayedImage.take_channels()
DelayedImage._validate()
DelayedImage._transform_from_subdata()
DelayedImage.get_transform_from_leaf()
DelayedImage.evaluate()
DelayedImage._opt_push_under_concat()
DelayedImage.undo_warp()
DelayedImageLeaf
DelayedLoad
DelayedNans
DelayedNaryOperation
DelayedOperation
DelayedOperation.nesting()
DelayedOperation.as_graph()
DelayedOperation._traverse()
DelayedOperation.leafs()
DelayedOperation._leafs()
DelayedOperation._leaf_paths()
DelayedOperation._traversed_graph()
DelayedOperation.print_graph()
DelayedOperation.write_network_text()
DelayedOperation.shape
DelayedOperation.children()
DelayedOperation.prepare()
DelayedOperation._finalize()
DelayedOperation.finalize()
DelayedOperation.optimize()
DelayedOperation._set_nested_params()
DelayedOverview
DelayedOverview.num_overviews
DelayedOverview._finalize()
DelayedOverview.optimize()
DelayedOverview._transform_from_subdata()
DelayedOverview._opt_overview_as_warp()
DelayedOverview._opt_crop_after_overview()
DelayedOverview._opt_fuse_overview()
DelayedOverview._opt_dequant_after_overview()
DelayedOverview._opt_warp_after_overview()
DelayedStack
DelayedUnaryOperation
DelayedWarp
ImageOpsMixin
- Module contents
Submodules¶
- kwcoco.util.dict_like module
- kwcoco.util.dict_proxy2 module
- kwcoco.util.jsonschema_elements module
- kwcoco.util.lazy_frame_backends module
- kwcoco.util.util_archive module
- kwcoco.util.util_deprecate module
- kwcoco.util.util_eval module
- kwcoco.util.util_futures module
- kwcoco.util.util_json module
- kwcoco.util.util_monkey module
- kwcoco.util.util_parallel module
- kwcoco.util.util_reroot module
- kwcoco.util.util_sklearn module
- kwcoco.util.util_special_json module
- kwcoco.util.util_truncate module
- kwcoco.util.util_windows module
Module contents¶
mkinit ~/code/kwcoco/kwcoco/util/__init__.py -w mkinit ~/code/kwcoco/kwcoco/util/__init__.py –lazy
- kwcoco.util.ALLOF(*TYPES)¶
- kwcoco.util.ANYOF(*TYPES)¶
- kwcoco.util.ARRAY(TYPE={}, **kw)¶
https://json-schema.org/understanding-json-schema/reference/array.html
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> ARRAY(numItems=3) >>> schema = ARRAY(minItems=3) >>> schema.validate() {'type': 'array', 'items': {}, 'minItems': 3}
- class kwcoco.util.Archive(fpath=None, mode='r', backend=None, file=None)[source]¶
Bases:
object
Abstraction over zipfile and tarfile
Todo
see if we can use one of these other tools instead
Example
>>> from kwcoco.util.util_archive import Archive >>> import ubelt as ub >>> dpath = ub.Path.appdir('kwcoco', 'tests', 'util', 'archive') >>> dpath.delete().ensuredir() >>> # Test write mode >>> mode = 'w' >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode=mode) >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode=mode) >>> open(dpath / 'data_1only.txt', 'w').write('bazbzzz') >>> open(dpath / 'data_2only.txt', 'w').write('buzzz') >>> open(dpath / 'data_both.txt', 'w').write('foobar') >>> # >>> arc_zip.add(dpath / 'data_both.txt') >>> arc_zip.add(dpath / 'data_1only.txt') >>> # >>> arc_tar.add(dpath / 'data_both.txt') >>> arc_tar.add(dpath / 'data_2only.txt') >>> # >>> arc_zip.close() >>> arc_tar.close() >>> # >>> # Test read mode >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode='r') >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode='r') >>> # Test names >>> name = 'data_both.txt' >>> assert name in arc_zip.names() >>> assert name in arc_tar.names() >>> # Test read >>> assert arc_zip.read(name, mode='r') == 'foobar' >>> assert arc_tar.read(name, mode='r') == 'foobar' >>> # >>> # Test extractall >>> extract_dpath = ub.ensuredir(str(dpath / 'extracted')) >>> extracted1 = arc_zip.extractall(extract_dpath) >>> extracted2 = arc_tar.extractall(extract_dpath) >>> for fpath in extracted2: >>> print(open(fpath, 'r').read()) >>> for fpath in extracted1: >>> print(open(fpath, 'r').read())
- Parameters:
fpath (str | None) – path to open
mode (str) – either r or w
backend (str | ModuleType | None) – either tarfile, zipfile string or module.
file (tarfile.TarFile | zipfile.ZipFile | None) – the open backend file if it already exists. If not set, than fpath will open it.
- _available_backends = {'tarfile': <module 'tarfile' from '/home/docs/.asdf/installs/python/3.11.6/lib/python3.11/tarfile.py'>, 'zipfile': <module 'zipfile' from '/home/docs/.asdf/installs/python/3.11.6/lib/python3.11/zipfile.py'>}¶
- read(name, mode='rb')[source]¶
Read data directly out of the archive.
- Parameters:
name (str) – the name of the archive member to read
mode (str) – This is a conceptual parameter that emulates the usual open mode. Defaults to “rb”, which returns data as raw bytes. If “r” will decode the bytes into utf8-text.
- class kwcoco.util.ContainerElements[source]¶
Bases:
object
Types that contain other types
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> print(elem.ARRAY().validate()) >>> print(elem.OBJECT().validate()) >>> print(elem.OBJECT().validate()) {'type': 'array', 'items': {}} {'type': 'object', 'properties': {}} {'type': 'object', 'properties': {}}
- ARRAY(TYPE={}, **kw)[source]¶
https://json-schema.org/understanding-json-schema/reference/array.html
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> ARRAY(numItems=3) >>> schema = ARRAY(minItems=3) >>> schema.validate() {'type': 'array', 'items': {}, 'minItems': 3}
- OBJECT(PROPERTIES={}, **kw)[source]¶
https://json-schema.org/understanding-json-schema/reference/object.html
Example
>>> import jsonschema >>> schema = elem.OBJECT() >>> jsonschema.validate({}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.ANY(), >>> 'key2': elem.ANY(), >>> }, required=['key1']) >>> jsonschema.validate({'key1': None}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.OBJECT({'arr': elem.ARRAY()}), >>> 'key2': elem.ANY(), >>> }, required=['key1'], title='a title') >>> schema.validate() >>> print('schema = {}'.format(ub.urepr(schema, sort=1, nl=-1))) >>> jsonschema.validate({'key1': {'arr': []}}, schema) schema = { 'properties': { 'key1': { 'properties': { 'arr': {'items': {}, 'type': 'array'} }, 'type': 'object' }, 'key2': {} }, 'required': ['key1'], 'title': 'a title', 'type': 'object' }
- class kwcoco.util.DictLike[source]¶
Bases:
NiceRepr
- An inherited class must specify the
getitem
,setitem
, and keys
methods.
A class is dictionary like if it has:
__iter__
,__len__
,__contains__
,__getitem__
,items
,keys
,values
,get
,and if it should be writable it should have:
__delitem__
,__setitem__
,update
,And perhaps:
copy
,__iter__
,__len__
,__contains__
,__getitem__
,items
,keys
,values
,get
,and if it should be writable it should have:
__delitem__
,__setitem__
,update
,And perhaps:
copy
,- asdict()¶
- Return type:
Dict
- An inherited class must specify the
- class kwcoco.util.Element(base, options={}, _magic=None)[source]¶
Bases:
dict
A dictionary used to define an element of a JSON Schema.
The exact keys/values for the element will depend on the type of element being described. The
SchemaElements
defines exactly what these are for the core elements. (e.g. OBJECT, INTEGER, NULL, ARRAY, ANYOF)Example
>>> from kwcoco.coco_schema import * # NOQA >>> self = Element(base={'type': 'demo'}, options={'opt1', 'opt2'}) >>> new = self(opt1=3) >>> print('self = {}'.format(ub.urepr(self, nl=1, sort=1))) >>> print('new = {}'.format(ub.urepr(new, nl=1, sort=1))) >>> print('new2 = {}'.format(ub.urepr(new(), nl=1, sort=1))) >>> print('new3 = {}'.format(ub.urepr(new(title='myvar'), nl=1, sort=1))) >>> print('new4 = {}'.format(ub.urepr(new(title='myvar')(examples=['']), nl=1, sort=1))) >>> print('new5 = {}'.format(ub.urepr(new(badattr=True), nl=1, sort=1))) self = { 'type': 'demo', } new = { 'opt1': 3, 'type': 'demo', } new2 = { 'opt1': 3, 'type': 'demo', } new3 = { 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new4 = { 'examples': [''], 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new5 = { 'opt1': 3, 'type': 'demo', }
- Parameters:
base (dict) – the keys / values this schema must contain
options (dict) – the keys / values this schema may contain
_magic (callable | None) – called when creating an instance of this schema element. Allows convinience attributes to be converted to the formal jsonschema specs. TODO: _magic is a terrible name, we need to rename it with something descriptive.
- class kwcoco.util.IndexableWalker(data, dict_cls=(<class 'dict'>, ), list_cls=(<class 'list'>, <class 'tuple'>))[source]¶
Bases:
Generator
Traverses through a nested tree-liked indexable structure.
Generates a path and value to each node in the structure. The path is a list of indexes which if applied in order will reach the value.
The
__setitem__
method can be used to modify a nested value based on the path returned by the generator.When generating values, you can use “send” to prevent traversal of a particular branch.
- RelatedWork:
- https://pypi.org/project/python-benedict/ - implements a dictionary
subclass with similar nested indexing abilities.
- Variables:
Example
>>> import ubelt as ub >>> # Given Nested Data >>> data = { >>> 'foo': {'bar': 1}, >>> 'baz': [{'biz': 3}, {'buz': [4, 5, 6]}], >>> } >>> # Create an IndexableWalker >>> walker = ub.IndexableWalker(data) >>> # We iterate over the data as if it was flat >>> # ignore the <want> string due to order issues on older Pythons >>> # xdoctest: +IGNORE_WANT >>> for path, val in walker: >>> print(path) ['foo'] ['baz'] ['baz', 0] ['baz', 1] ['baz', 1, 'buz'] ['baz', 1, 'buz', 0] ['baz', 1, 'buz', 1] ['baz', 1, 'buz', 2] ['baz', 0, 'biz'] ['foo', 'bar'] >>> # We can use "paths" as keys to getitem into the walker >>> path = ['baz', 1, 'buz', 2] >>> val = walker[path] >>> assert val == 6 >>> # We can use "paths" as keys to setitem into the walker >>> assert data['baz'][1]['buz'][2] == 6 >>> walker[path] = 7 >>> assert data['baz'][1]['buz'][2] == 7 >>> # We can use "paths" as keys to delitem into the walker >>> assert data['baz'][1]['buz'][1] == 5 >>> del walker[['baz', 1, 'buz', 1]] >>> assert data['baz'][1]['buz'][1] == 7
Example
>>> # Create nested data >>> # xdoctest: +REQUIRES(module:numpy) >>> import numpy as np >>> import ubelt as ub >>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = np.array([1, 2, 3]) >>> data['foo']['c'] = [1, 2, 3] >>> data['baz'] = 3 >>> print('data = {}'.format(ub.repr2(data, nl=True))) >>> # We can walk through every node in the nested tree >>> walker = ub.IndexableWalker(data) >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> if path[-1] == 'c': >>> # Use send to prevent traversing this branch >>> got = walker.send(False) >>> # We can modify the value based on the returned path >>> walker[path] = 'changed the value of c' >>> print('data = {}'.format(ub.repr2(data, nl=True))) >>> assert data['foo']['c'] == 'changed the value of c'
Example
>>> # Test sending false for every data item >>> import ubelt as ub >>> data = {1: [1, 2, 3], 2: [1, 2, 3]} >>> walker = ub.IndexableWalker(data) >>> # Sending false means you wont traverse any further on that path >>> num_iters_v1 = 0 >>> for path, value in walker: >>> print('[v1] walk path = {}'.format(ub.repr2(path, nl=0))) >>> walker.send(False) >>> num_iters_v1 += 1 >>> num_iters_v2 = 0 >>> for path, value in walker: >>> # When we dont send false we walk all the way down >>> print('[v2] walk path = {}'.format(ub.repr2(path, nl=0))) >>> num_iters_v2 += 1 >>> assert num_iters_v1 == 2 >>> assert num_iters_v2 == 8
Example
>>> # Test numpy >>> # xdoctest: +REQUIRES(CPython) >>> # xdoctest: +REQUIRES(module:numpy) >>> import ubelt as ub >>> import numpy as np >>> # By default we don't recurse into ndarrays because they >>> # Are registered as an indexable class >>> data = {2: np.array([1, 2, 3])} >>> walker = ub.IndexableWalker(data) >>> num_iters = 0 >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> num_iters += 1 >>> assert num_iters == 1 >>> # Currently to use top-level ndarrays, you need to extend what the >>> # list class is. This API may change in the future to be easier >>> # to work with. >>> data = np.random.rand(3, 5) >>> walker = ub.IndexableWalker(data, list_cls=(list, tuple, np.ndarray)) >>> num_iters = 0 >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> num_iters += 1 >>> assert num_iters == 3 + 3 * 5
- throw(typ[, val[, tb]]) raise exception in generator, [source]¶
return next yielded value or raise StopIteration.
- _walk(data=None, prefix=[])[source]¶
Defines the underlying generator used by IndexableWalker
- Yields:
- Tuple[List, Any] | None – path (List) - a “path” through the nested data structure
value (Any) - the value indexed by that “path”.
Can also yield None in the case that send is called on the generator.
- allclose(other, rel_tol=1e-09, abs_tol=0.0, return_info=False)[source]¶
Walks through this and another nested data structures and checks if everything is roughly the same.
- Parameters:
other (IndexableWalker | List | Dict) – a nested indexable item to compare against.
rel_tol (float) – maximum difference for being considered “close”, relative to the magnitude of the input values
abs_tol (float) – maximum difference for being considered “close”, regardless of the magnitude of the input values
return_info (bool, default=False) – if true, return extra info dict
- Returns:
A boolean result if
return_info
is false, otherwise a tuple of the boolean result and an “info” dict containing detailed results indicating what matched and what did not.- Return type:
Example
>>> import ubelt as ub >>> items1 = ub.IndexableWalker({ >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> items2 = ub.IndexableWalker({ >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> flag, return_info = items1.allclose(items2, return_info=True) >>> print('return_info = {}'.format(ub.repr2(return_info, nl=1))) >>> print('flag = {!r}'.format(flag)) >>> for p1, v1, v2 in return_info['faillist']: >>> v1_ = items1[p1] >>> print('*fail p1, v1, v2 = {}, {}, {}'.format(p1, v1, v2)) >>> for p1 in return_info['passlist']: >>> v1_ = items1[p1] >>> print('*pass p1, v1_ = {}, {}'.format(p1, v1_)) >>> assert not flag
>>> import ubelt as ub >>> items1 = ub.IndexableWalker({ >>> 'foo': [1.0000000000000000000000001, 1.], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> items2 = ub.IndexableWalker({ >>> 'foo': [0.9999999999999999, 1.], >>> 'bar': 1, >>> 'baz': [], >>> }) >>> flag, return_info = items1.allclose(items2, return_info=True) >>> print('return_info = {}'.format(ub.repr2(return_info, nl=1))) >>> print('flag = {!r}'.format(flag)) >>> assert flag
Example
>>> import ubelt as ub >>> flag, return_info = ub.IndexableWalker([]).allclose(ub.IndexableWalker([]), return_info=True) >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag)) >>> assert flag
Example
>>> import ubelt as ub >>> flag = ub.IndexableWalker([]).allclose([], return_info=False) >>> print('flag = {!r}'.format(flag)) >>> assert flag
Example
>>> import ubelt as ub >>> flag, return_info = ub.IndexableWalker([]).allclose([1], return_info=True) >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag)) >>> assert not flag
Example
>>> # xdoctest: +REQUIRES(module:numpy) >>> import ubelt as ub >>> import numpy as np >>> a = np.random.rand(3, 5) >>> b = a + 1 >>> wa = ub.IndexableWalker(a, list_cls=(np.ndarray,)) >>> wb = ub.IndexableWalker(b, list_cls=(np.ndarray,)) >>> flag, return_info = wa.allclose(wb, return_info=True) >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag)) >>> assert not flag >>> a = np.random.rand(3, 5) >>> b = a.copy() + 1e-17 >>> wa = ub.IndexableWalker([a], list_cls=(np.ndarray, list)) >>> wb = ub.IndexableWalker([b], list_cls=(np.ndarray, list)) >>> flag, return_info = wa.allclose(wb, return_info=True) >>> assert flag >>> print('return_info = {!r}'.format(return_info)) >>> print('flag = {!r}'.format(flag))
- _abc_impl = <_abc._abc_data object>¶
- kwcoco.util.NOT(TYPE)¶
- kwcoco.util.OBJECT(PROPERTIES={}, **kw)¶
https://json-schema.org/understanding-json-schema/reference/object.html
Example
>>> import jsonschema >>> schema = elem.OBJECT() >>> jsonschema.validate({}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.ANY(), >>> 'key2': elem.ANY(), >>> }, required=['key1']) >>> jsonschema.validate({'key1': None}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.OBJECT({'arr': elem.ARRAY()}), >>> 'key2': elem.ANY(), >>> }, required=['key1'], title='a title') >>> schema.validate() >>> print('schema = {}'.format(ub.urepr(schema, sort=1, nl=-1))) >>> jsonschema.validate({'key1': {'arr': []}}, schema) schema = { 'properties': { 'key1': { 'properties': { 'arr': {'items': {}, 'type': 'array'} }, 'type': 'object' }, 'key2': {} }, 'required': ['key1'], 'title': 'a title', 'type': 'object' }
- kwcoco.util.ONEOF(*TYPES)¶
- class kwcoco.util.QuantifierElements[source]¶
Bases:
object
Quantifier types
https://json-schema.org/understanding-json-schema/reference/combining.html#allof
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem.ANYOF(elem.STRING, elem.NUMBER).validate() >>> elem.ONEOF(elem.STRING, elem.NUMBER).validate() >>> elem.NOT(elem.NULL).validate() >>> elem.NOT(elem.ANY).validate() >>> elem.ANY.validate()
- property ANY¶
- class kwcoco.util.ScalarElements[source]¶
Bases:
object
Single-valued elements
- property NULL¶
//json-schema.org/understanding-json-schema/reference/null.html
- Type:
https
- property BOOLEAN¶
//json-schema.org/understanding-json-schema/reference/null.html
- Type:
https
- property STRING¶
//json-schema.org/understanding-json-schema/reference/string.html
- Type:
https
- property NUMBER¶
//json-schema.org/understanding-json-schema/reference/numeric.html#number
- Type:
https
- property INTEGER¶
//json-schema.org/understanding-json-schema/reference/numeric.html#integer
- Type:
https
- class kwcoco.util.SchemaElements[source]¶
Bases:
ScalarElements
,QuantifierElements
,ContainerElements
Functional interface into defining jsonschema structures.
See mixin classes for details.
References
https://json-schema.org/understanding-json-schema/
Todo
[ ] Generics: title, description, default, examples
CommandLine
xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/jsonschema_elements.py SchemaElements
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem = SchemaElements() >>> elem.ARRAY(elem.ANY()) >>> schema = OBJECT({ >>> 'prop1': ARRAY(INTEGER, minItems=3), >>> 'prop2': ARRAY(STRING, numItems=2), >>> 'prop3': ARRAY(OBJECT({ >>> 'subprob1': NUMBER, >>> 'subprob2': NUMBER, >>> })) >>> }) >>> print('schema = {}'.format(ub.urepr(schema, nl=2, sort=1))) schema = { 'properties': { 'prop1': {'items': {'type': 'integer'}, 'minItems': 3, 'type': 'array'}, 'prop2': {'items': {'type': 'string'}, 'maxItems': 2, 'minItems': 2, 'type': 'array'}, 'prop3': {'items': {'properties': {'subprob1': {'type': 'number'}, 'subprob2': {'type': 'number'}}, 'type': 'object'}, 'type': 'array'}, }, 'type': 'object', }
>>> TYPE = elem.OBJECT({ >>> 'p1': ANY, >>> 'p2': ANY, >>> }, required=['p1']) >>> import jsonschema >>> inst = {'p1': None} >>> jsonschema.validate(inst, schema=TYPE) >>> #jsonschema.validate({'p2': None}, schema=TYPE)
- class kwcoco.util.StratifiedGroupKFold(n_splits=3, shuffle=False, random_state=None)[source]¶
Bases:
_BaseKFold
Stratified K-Folds cross-validator with Grouping
Provides train/test indices to split data in train/test sets.
This cross-validation object is a variation of GroupKFold that returns stratified folds. The folds are made by preserving the percentage of samples for each class.
This is an old interface and should likely be refactored and modernized.
- Parameters:
n_splits (int, default=3) – Number of folds. Must be at least 2.
- _make_test_folds(X, y=None, groups=None)[source]¶
- Parameters:
X (ndarray) – data
y (ndarray) – labels
groups (ndarray) – groupids for items. Items with the same groupid must be placed in the same group.
- Returns:
test_folds
- Return type:
Example
>>> from kwcoco.util.util_sklearn import * # NOQA >>> import kwarray >>> rng = kwarray.ensure_rng(0) >>> groups = [1, 1, 3, 4, 2, 2, 7, 8, 8] >>> y = [1, 1, 1, 1, 2, 2, 2, 3, 3] >>> X = np.empty((len(y), 0)) >>> self = StratifiedGroupKFold(random_state=rng, shuffle=True) >>> skf_list = list(self.split(X=X, y=y, groups=groups)) >>> import ubelt as ub >>> print(ub.urepr(skf_list, nl=1, with_dtype=False)) [ (np.array([2, 3, 4, 5, 6]), np.array([0, 1, 7, 8])), (np.array([0, 1, 2, 7, 8]), np.array([3, 4, 5, 6])), (np.array([0, 1, 3, 4, 5, 6, 7, 8]), np.array([2])), ]
- _abc_impl = <_abc._abc_data object>¶
- kwcoco.util.ensure_json_serializable(dict_, normalize_containers=False, verbose=0)[source]¶
Attempt to convert common types (e.g. numpy) into something json complient
Convert numpy and tuples into lists
- Parameters:
normalize_containers (bool) – if True, normalizes dict containers to be standard python structures. Defaults to False.
Example
>>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = (1, np.array([1, 2, 3]), {3: np.int32(3), 4: np.float16(1.0)}) >>> dict_ = data >>> print(ub.urepr(data, nl=-1)) >>> assert list(find_json_unserializable(data)) >>> result = ensure_json_serializable(data, normalize_containers=True) >>> print(ub.urepr(result, nl=-1)) >>> assert not list(find_json_unserializable(result)) >>> assert type(result) is dict
- kwcoco.util.find_json_unserializable(data, quickcheck=False)[source]¶
Recurse through json datastructure and find any component that causes a serialization error. Record the location of these errors in the datastructure as we recurse through the call tree.
- Parameters:
data (object) – data that should be json serializable
quickcheck (bool) – if True, check the entire datastructure assuming its ok before doing the python-based recursive logic.
- Returns:
list of “bad part” dictionaries containing items
’value’ - the value that caused the serialization error
’loc’ - which contains a list of key/indexes that can be used to lookup the location of the unserializable value. If the “loc” is a list, then it indicates a rare case where a key in a dictionary is causing the serialization error.
- Return type:
List[Dict]
Example
>>> from kwcoco.util.util_json import * # NOQA >>> part = ub.ddict(lambda: int) >>> part['foo'] = ub.ddict(lambda: int) >>> part['bar'] = np.array([1, 2, 3]) >>> part['foo']['a'] = 1 >>> # Create a dictionary with two unserializable parts >>> data = [1, 2, {'nest1': [2, part]}, {frozenset({'badkey'}): 3, 2: 4}] >>> parts = list(find_json_unserializable(data)) >>> print('parts = {}'.format(ub.urepr(parts, nl=1))) >>> # Check expected structure of bad parts >>> assert len(parts) == 2 >>> part = parts[1] >>> assert list(part['loc']) == [2, 'nest1', 1, 'bar'] >>> # We can use the "loc" to find the bad value >>> for part in parts: >>> # "loc" is a list of directions containing which keys/indexes >>> # to traverse at each descent into the data structure. >>> directions = part['loc'] >>> curr = data >>> special_flag = False >>> for key in directions: >>> if isinstance(key, list): >>> # special case for bad keys >>> special_flag = True >>> break >>> else: >>> # normal case for bad values >>> curr = curr[key] >>> if special_flag: >>> assert part['data'] in curr.keys() >>> assert part['data'] is key[1] >>> else: >>> assert part['data'] is curr
- kwcoco.util.indexable_allclose(dct1, dct2, return_info=False)[source]¶
Walks through two nested data structures and ensures that everything is roughly the same.
Note
Use the version in ubelt instead
- Parameters:
dct1 – a nested indexable item
dct2 – a nested indexable item
Example
>>> from kwcoco.util.util_json import indexable_allclose >>> dct1 = { >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> dct2 = { >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> assert indexable_allclose(dct1, dct2)
- kwcoco.util.resolve_directory_symlinks(path)[source]¶
Only resolve symlinks of directories, not the base file
- kwcoco.util.resolve_relative_to(path, dpath, strict=False)[source]¶
Given a path, try to resolve its symlinks such that it is relative to the given dpath.
Example
>>> from kwcoco.util.util_reroot import * # NOQA >>> import os >>> def _symlink(self, target, verbose=0): >>> return ub.Path(ub.symlink(target, self, verbose=verbose)) >>> ub.Path._symlink = _symlink >>> # >>> # TODO: try to enumerate all basic cases >>> # >>> base = ub.Path.appdir('kwcoco/tests/reroot') >>> base.delete().ensuredir() >>> # >>> drive1 = (base / 'drive1').ensuredir() >>> drive2 = (base / 'drive2').ensuredir() >>> # >>> data_repo1 = (drive1 / 'data_repo1').ensuredir() >>> cache = (data_repo1 / '.cache').ensuredir() >>> real_file1 = (cache / 'real_file1').touch() >>> # >>> real_bundle = (data_repo1 / 'real_bundle').ensuredir() >>> real_assets = (real_bundle / 'assets').ensuredir() >>> # >>> # Symlink file outside of the bundle >>> link_file1 = (real_assets / 'link_file1')._symlink(real_file1) >>> real_file2 = (real_assets / 'real_file2').touch() >>> link_file2 = (real_assets / 'link_file2')._symlink(real_file2) >>> # >>> # >>> # A symlink to the data repo >>> data_repo2 = (drive1 / 'data_repo2')._symlink(data_repo1) >>> data_repo3 = (drive2 / 'data_repo3')._symlink(data_repo1) >>> data_repo4 = (drive2 / 'data_repo4')._symlink(data_repo2) >>> # >>> # A prediction repo TODO >>> pred_repo5 = (drive2 / 'pred_repo5').ensuredir() >>> # >>> # _ = ub.cmd(f'tree -a {base}', verbose=3) >>> # >>> fpaths = [] >>> for r, ds, fs in os.walk(base, followlinks=True): >>> for f in fs: >>> if 'file' in f: >>> fpath = ub.Path(r) / f >>> fpaths.append(fpath) >>> # >>> # >>> dpath = real_bundle.resolve() >>> # >>> for path in fpaths: >>> # print(f'{path}') >>> # print(f'{path.resolve()=}') >>> resolved_rel = resolve_relative_to(path, dpath) >>> print('resolved_rel = {!r}'.format(resolved_rel))
- kwcoco.util.smart_truncate(string, max_length=0, separator=' ', trunc_loc=0.5, trunc_char='~')[source]¶
Truncate a string. :param string (str): string for modification :param max_length (int): output string length :param word_boundary (bool): :param save_order (bool): if True then word order of output string is like input string :param separator (str): separator between words :param trunc_loc (float): fraction of location where to remove the text
trunc_char (str): the character to denote where truncation is starting
- Returns: