kwcoco.util package

Subpackages

Submodules

Module contents

mkinit ~/code/kwcoco/kwcoco/util/__init__.py -w mkinit ~/code/kwcoco/kwcoco/util/__init__.py –lazy

kwcoco.util.ALLOF(*TYPES)
kwcoco.util.ANYOF(*TYPES)
kwcoco.util.ARRAY(TYPE={}, **kw)

https://json-schema.org/understanding-json-schema/reference/array.html

Example

>>> from kwcoco.util.jsonschema_elements import *  # NOQA
>>> ARRAY(numItems=3)
>>> schema = ARRAY(minItems=3)
>>> schema.validate()
{'type': 'array', 'items': {}, 'minItems': 3}
class kwcoco.util.Archive(fpath=None, mode='r', backend=None, file=None)[source]

Bases: object

Abstraction over zipfile and tarfile

Todo

see if we can use one of these other tools instead

SeeAlso:

https://github.com/RKrahl/archive-tools https://pypi.org/project/arlib/

Example

>>> from kwcoco.util.util_archive import Archive
>>> import ubelt as ub
>>> dpath = ub.Path.appdir('kwcoco', 'tests', 'util', 'archive')
>>> dpath.delete().ensuredir()
>>> # Test write mode
>>> mode = 'w'
>>> arc_zip = Archive(str(dpath / 'demo.zip'), mode=mode)
>>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode=mode)
>>> open(dpath / 'data_1only.txt', 'w').write('bazbzzz')
>>> open(dpath / 'data_2only.txt', 'w').write('buzzz')
>>> open(dpath / 'data_both.txt', 'w').write('foobar')
>>> #
>>> arc_zip.add(dpath / 'data_both.txt')
>>> arc_zip.add(dpath / 'data_1only.txt')
>>> #
>>> arc_tar.add(dpath / 'data_both.txt')
>>> arc_tar.add(dpath / 'data_2only.txt')
>>> #
>>> arc_zip.close()
>>> arc_tar.close()
>>> #
>>> # Test read mode
>>> arc_zip = Archive(str(dpath / 'demo.zip'), mode='r')
>>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode='r')
>>> # Test names
>>> name = 'data_both.txt'
>>> assert name in arc_zip.names()
>>> assert name in arc_tar.names()
>>> # Test read
>>> assert arc_zip.read(name, mode='r') == 'foobar'
>>> assert arc_tar.read(name, mode='r') == 'foobar'
>>> #
>>> # Test extractall
>>> extract_dpath = ub.ensuredir(str(dpath / 'extracted'))
>>> extracted1 = arc_zip.extractall(extract_dpath)
>>> extracted2 = arc_tar.extractall(extract_dpath)
>>> for fpath in extracted2:
>>>     print(open(fpath, 'r').read())
>>> for fpath in extracted1:
>>>     print(open(fpath, 'r').read())
Parameters:
  • fpath (str | None) – path to open

  • mode (str) – either r or w

  • backend (str | ModuleType | None) – either tarfile, zipfile string or module.

  • file (tarfile.TarFile | zipfile.ZipFile | None) – the open backend file if it already exists. If not set, than fpath will open it.

_available_backends = {'tarfile': <module 'tarfile' from '/home/docs/.asdf/installs/python/3.11.6/lib/python3.11/tarfile.py'>, 'zipfile': <module 'zipfile' from '/home/docs/.asdf/installs/python/3.11.6/lib/python3.11/zipfile.py'>}
classmethod _open(fpath, mode, backend=None)[source]
names()[source]
read(name, mode='rb')[source]

Read data directly out of the archive.

Parameters:
  • name (str) – the name of the archive member to read

  • mode (str) – This is a conceptual parameter that emulates the usual open mode. Defaults to “rb”, which returns data as raw bytes. If “r” will decode the bytes into utf8-text.

classmethod coerce(data)[source]

Either open an archive file path or coerce an existing ZipFile or tarfile structure into this wrapper class

add(fpath, arcname=None)[source]
close()[source]
extractall(output_dpath='.', verbose=1, overwrite=True)[source]
class kwcoco.util.ContainerElements[source]

Bases: object

Types that contain other types

Example

>>> from kwcoco.util.jsonschema_elements import *  # NOQA
>>> print(elem.ARRAY().validate())
>>> print(elem.OBJECT().validate())
>>> print(elem.OBJECT().validate())
{'type': 'array', 'items': {}}
{'type': 'object', 'properties': {}}
{'type': 'object', 'properties': {}}
ARRAY(TYPE={}, **kw)[source]

https://json-schema.org/understanding-json-schema/reference/array.html

Example

>>> from kwcoco.util.jsonschema_elements import *  # NOQA
>>> ARRAY(numItems=3)
>>> schema = ARRAY(minItems=3)
>>> schema.validate()
{'type': 'array', 'items': {}, 'minItems': 3}
OBJECT(PROPERTIES={}, **kw)[source]

https://json-schema.org/understanding-json-schema/reference/object.html

Example

>>> import jsonschema
>>> schema = elem.OBJECT()
>>> jsonschema.validate({}, schema)
>>> #
>>> import jsonschema
>>> schema = elem.OBJECT({
>>>     'key1': elem.ANY(),
>>>     'key2': elem.ANY(),
>>> }, required=['key1'])
>>> jsonschema.validate({'key1': None}, schema)
>>> #
>>> import jsonschema
>>> schema = elem.OBJECT({
>>>     'key1': elem.OBJECT({'arr': elem.ARRAY()}),
>>>     'key2': elem.ANY(),
>>> }, required=['key1'], title='a title')
>>> schema.validate()
>>> print('schema = {}'.format(ub.urepr(schema, sort=1, nl=-1)))
>>> jsonschema.validate({'key1': {'arr': []}}, schema)
schema = {
    'properties': {
        'key1': {
            'properties': {
                'arr': {'items': {}, 'type': 'array'}
            },
            'type': 'object'
        },
        'key2': {}
    },
    'required': ['key1'],
    'title': 'a title',
    'type': 'object'
}
class kwcoco.util.DictLike[source]

Bases: NiceRepr

An inherited class must specify the getitem, setitem, and

keys methods.

A class is dictionary like if it has:

__iter__, __len__, __contains__, __getitem__, items, keys, values, get,

and if it should be writable it should have: __delitem__, __setitem__, update,

And perhaps: copy,

__iter__, __len__, __contains__, __getitem__, items, keys, values, get,

and if it should be writable it should have: __delitem__, __setitem__, update,

And perhaps: copy,

getitem(key)[source]
Parameters:

key (Any) – a key

Returns:

a value

Return type:

Any

setitem(key, value)[source]
Parameters:
  • key (Any)

  • value (Any)

delitem(key)[source]
Parameters:

key (Any)

keys()[source]
Yields:

Any – a key

items()[source]
Yields:

Tuple[Any, Any] – a key value pair

values()[source]
Yields:

Any – a value

copy()[source]
Return type:

Dict

to_dict()[source]
Return type:

Dict

asdict()
Return type:

Dict

update(other)[source]
get(key, default=None)[source]
Parameters:
  • key (Any)

  • default (Any)

Return type:

Any

class kwcoco.util.Element(base, options={}, _magic=None)[source]

Bases: dict

A dictionary used to define an element of a JSON Schema.

The exact keys/values for the element will depend on the type of element being described. The SchemaElements defines exactly what these are for the core elements. (e.g. OBJECT, INTEGER, NULL, ARRAY, ANYOF)

Example

>>> from kwcoco.coco_schema import *  # NOQA
>>> self = Element(base={'type': 'demo'}, options={'opt1', 'opt2'})
>>> new = self(opt1=3)
>>> print('self = {}'.format(ub.urepr(self, nl=1, sort=1)))
>>> print('new = {}'.format(ub.urepr(new, nl=1, sort=1)))
>>> print('new2 = {}'.format(ub.urepr(new(), nl=1, sort=1)))
>>> print('new3 = {}'.format(ub.urepr(new(title='myvar'), nl=1, sort=1)))
>>> print('new4 = {}'.format(ub.urepr(new(title='myvar')(examples=['']), nl=1, sort=1)))
>>> print('new5 = {}'.format(ub.urepr(new(badattr=True), nl=1, sort=1)))
self = {
    'type': 'demo',
}
new = {
    'opt1': 3,
    'type': 'demo',
}
new2 = {
    'opt1': 3,
    'type': 'demo',
}
new3 = {
    'opt1': 3,
    'title': 'myvar',
    'type': 'demo',
}
new4 = {
    'examples': [''],
    'opt1': 3,
    'title': 'myvar',
    'type': 'demo',
}
new5 = {
    'opt1': 3,
    'type': 'demo',
}
Parameters:
  • base (dict) – the keys / values this schema must contain

  • options (dict) – the keys / values this schema may contain

  • _magic (callable | None) – called when creating an instance of this schema element. Allows convinience attributes to be converted to the formal jsonschema specs. TODO: _magic is a terrible name, we need to rename it with something descriptive.

validate(instance=NoParam)[source]

If instance is given, validates that that dictionary conforms to this schema. Otherwise validates that this is a valid schema element.

Parameters:

instance (dict) – a dictionary to validate

class kwcoco.util.IndexableWalker(data, dict_cls=(<class 'dict'>, ), list_cls=(<class 'list'>, <class 'tuple'>))[source]

Bases: Generator

Traverses through a nested tree-liked indexable structure.

Generates a path and value to each node in the structure. The path is a list of indexes which if applied in order will reach the value.

The __setitem__ method can be used to modify a nested value based on the path returned by the generator.

When generating values, you can use “send” to prevent traversal of a particular branch.

RelatedWork:
Variables:
  • data (dict | list | tuple) – the wrapped indexable data

  • dict_cls (Tuple[type]) – the types that should be considered dictionary mappings for the purpose of nested iteration. Defaults to dict.

  • list_cls (Tuple[type]) – the types that should be considered list-like for the purposes of nested iteration. Defaults to (list, tuple).

Example

>>> import ubelt as ub
>>> # Given Nested Data
>>> data = {
>>>     'foo': {'bar': 1},
>>>     'baz': [{'biz': 3}, {'buz': [4, 5, 6]}],
>>> }
>>> # Create an IndexableWalker
>>> walker = ub.IndexableWalker(data)
>>> # We iterate over the data as if it was flat
>>> # ignore the <want> string due to order issues on older Pythons
>>> # xdoctest: +IGNORE_WANT
>>> for path, val in walker:
>>>     print(path)
['foo']
['baz']
['baz', 0]
['baz', 1]
['baz', 1, 'buz']
['baz', 1, 'buz', 0]
['baz', 1, 'buz', 1]
['baz', 1, 'buz', 2]
['baz', 0, 'biz']
['foo', 'bar']
>>> # We can use "paths" as keys to getitem into the walker
>>> path = ['baz', 1, 'buz', 2]
>>> val = walker[path]
>>> assert val == 6
>>> # We can use "paths" as keys to setitem into the walker
>>> assert data['baz'][1]['buz'][2] == 6
>>> walker[path] = 7
>>> assert data['baz'][1]['buz'][2] == 7
>>> # We can use "paths" as keys to delitem into the walker
>>> assert data['baz'][1]['buz'][1] == 5
>>> del walker[['baz', 1, 'buz', 1]]
>>> assert data['baz'][1]['buz'][1] == 7

Example

>>> # Create nested data
>>> # xdoctest: +REQUIRES(module:numpy)
>>> import numpy as np
>>> import ubelt as ub
>>> data = ub.ddict(lambda: int)
>>> data['foo'] = ub.ddict(lambda: int)
>>> data['bar'] = np.array([1, 2, 3])
>>> data['foo']['a'] = 1
>>> data['foo']['b'] = np.array([1, 2, 3])
>>> data['foo']['c'] = [1, 2, 3]
>>> data['baz'] = 3
>>> print('data = {}'.format(ub.repr2(data, nl=True)))
>>> # We can walk through every node in the nested tree
>>> walker = ub.IndexableWalker(data)
>>> for path, value in walker:
>>>     print('walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     if path[-1] == 'c':
>>>         # Use send to prevent traversing this branch
>>>         got = walker.send(False)
>>>         # We can modify the value based on the returned path
>>>         walker[path] = 'changed the value of c'
>>> print('data = {}'.format(ub.repr2(data, nl=True)))
>>> assert data['foo']['c'] == 'changed the value of c'

Example

>>> # Test sending false for every data item
>>> import ubelt as ub
>>> data = {1: [1, 2, 3], 2: [1, 2, 3]}
>>> walker = ub.IndexableWalker(data)
>>> # Sending false means you wont traverse any further on that path
>>> num_iters_v1 = 0
>>> for path, value in walker:
>>>     print('[v1] walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     walker.send(False)
>>>     num_iters_v1 += 1
>>> num_iters_v2 = 0
>>> for path, value in walker:
>>>     # When we dont send false we walk all the way down
>>>     print('[v2] walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     num_iters_v2 += 1
>>> assert num_iters_v1 == 2
>>> assert num_iters_v2 == 8

Example

>>> # Test numpy
>>> # xdoctest: +REQUIRES(CPython)
>>> # xdoctest: +REQUIRES(module:numpy)
>>> import ubelt as ub
>>> import numpy as np
>>> # By default we don't recurse into ndarrays because they
>>> # Are registered as an indexable class
>>> data = {2: np.array([1, 2, 3])}
>>> walker = ub.IndexableWalker(data)
>>> num_iters = 0
>>> for path, value in walker:
>>>     print('walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     num_iters += 1
>>> assert num_iters == 1
>>> # Currently to use top-level ndarrays, you need to extend what the
>>> # list class is. This API may change in the future to be easier
>>> # to work with.
>>> data = np.random.rand(3, 5)
>>> walker = ub.IndexableWalker(data, list_cls=(list, tuple, np.ndarray))
>>> num_iters = 0
>>> for path, value in walker:
>>>     print('walk path = {}'.format(ub.repr2(path, nl=0)))
>>>     num_iters += 1
>>> assert num_iters == 3 + 3 * 5
send(arg) send 'arg' into generator,[source]

return next yielded value or raise StopIteration.

throw(typ[, val[, tb]]) raise exception in generator,[source]

return next yielded value or raise StopIteration.

_walk(data=None, prefix=[])[source]

Defines the underlying generator used by IndexableWalker

Yields:
Tuple[List, Any] | None – path (List) - a “path” through the nested data structure

value (Any) - the value indexed by that “path”.

Can also yield None in the case that send is called on the generator.

allclose(other, rel_tol=1e-09, abs_tol=0.0, return_info=False)[source]

Walks through this and another nested data structures and checks if everything is roughly the same.

Parameters:
  • other (IndexableWalker | List | Dict) – a nested indexable item to compare against.

  • rel_tol (float) – maximum difference for being considered “close”, relative to the magnitude of the input values

  • abs_tol (float) – maximum difference for being considered “close”, regardless of the magnitude of the input values

  • return_info (bool, default=False) – if true, return extra info dict

Returns:

A boolean result if return_info is false, otherwise a tuple of the boolean result and an “info” dict containing detailed results indicating what matched and what did not.

Return type:

bool | Tuple[bool, Dict]

Example

>>> import ubelt as ub
>>> items1 = ub.IndexableWalker({
>>>     'foo': [1.222222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> items2 = ub.IndexableWalker({
>>>     'foo': [1.22222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> flag, return_info =  items1.allclose(items2, return_info=True)
>>> print('return_info = {}'.format(ub.repr2(return_info, nl=1)))
>>> print('flag = {!r}'.format(flag))
>>> for p1, v1, v2  in return_info['faillist']:
>>>     v1_ = items1[p1]
>>>     print('*fail p1, v1, v2 = {}, {}, {}'.format(p1, v1, v2))
>>> for p1 in return_info['passlist']:
>>>     v1_ = items1[p1]
>>>     print('*pass p1, v1_ = {}, {}'.format(p1, v1_))
>>> assert not flag
>>> import ubelt as ub
>>> items1 = ub.IndexableWalker({
>>>     'foo': [1.0000000000000000000000001, 1.],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> items2 = ub.IndexableWalker({
>>>     'foo': [0.9999999999999999, 1.],
>>>     'bar': 1,
>>>     'baz': [],
>>> })
>>> flag, return_info =  items1.allclose(items2, return_info=True)
>>> print('return_info = {}'.format(ub.repr2(return_info, nl=1)))
>>> print('flag = {!r}'.format(flag))
>>> assert flag

Example

>>> import ubelt as ub
>>> flag, return_info =  ub.IndexableWalker([]).allclose(ub.IndexableWalker([]), return_info=True)
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
>>> assert flag

Example

>>> import ubelt as ub
>>> flag =  ub.IndexableWalker([]).allclose([], return_info=False)
>>> print('flag = {!r}'.format(flag))
>>> assert flag

Example

>>> import ubelt as ub
>>> flag, return_info =  ub.IndexableWalker([]).allclose([1], return_info=True)
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
>>> assert not flag

Example

>>> # xdoctest: +REQUIRES(module:numpy)
>>> import ubelt as ub
>>> import numpy as np
>>> a = np.random.rand(3, 5)
>>> b = a + 1
>>> wa = ub.IndexableWalker(a, list_cls=(np.ndarray,))
>>> wb = ub.IndexableWalker(b, list_cls=(np.ndarray,))
>>> flag, return_info =  wa.allclose(wb, return_info=True)
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
>>> assert not flag
>>> a = np.random.rand(3, 5)
>>> b = a.copy() + 1e-17
>>> wa = ub.IndexableWalker([a], list_cls=(np.ndarray, list))
>>> wb = ub.IndexableWalker([b], list_cls=(np.ndarray, list))
>>> flag, return_info =  wa.allclose(wb, return_info=True)
>>> assert flag
>>> print('return_info = {!r}'.format(return_info))
>>> print('flag = {!r}'.format(flag))
_abc_impl = <_abc._abc_data object>
kwcoco.util.NOT(TYPE)
kwcoco.util.OBJECT(PROPERTIES={}, **kw)

https://json-schema.org/understanding-json-schema/reference/object.html

Example

>>> import jsonschema
>>> schema = elem.OBJECT()
>>> jsonschema.validate({}, schema)
>>> #
>>> import jsonschema
>>> schema = elem.OBJECT({
>>>     'key1': elem.ANY(),
>>>     'key2': elem.ANY(),
>>> }, required=['key1'])
>>> jsonschema.validate({'key1': None}, schema)
>>> #
>>> import jsonschema
>>> schema = elem.OBJECT({
>>>     'key1': elem.OBJECT({'arr': elem.ARRAY()}),
>>>     'key2': elem.ANY(),
>>> }, required=['key1'], title='a title')
>>> schema.validate()
>>> print('schema = {}'.format(ub.urepr(schema, sort=1, nl=-1)))
>>> jsonschema.validate({'key1': {'arr': []}}, schema)
schema = {
    'properties': {
        'key1': {
            'properties': {
                'arr': {'items': {}, 'type': 'array'}
            },
            'type': 'object'
        },
        'key2': {}
    },
    'required': ['key1'],
    'title': 'a title',
    'type': 'object'
}
kwcoco.util.ONEOF(*TYPES)
class kwcoco.util.QuantifierElements[source]

Bases: object

Quantifier types

https://json-schema.org/understanding-json-schema/reference/combining.html#allof

Example

>>> from kwcoco.util.jsonschema_elements import *  # NOQA
>>> elem.ANYOF(elem.STRING, elem.NUMBER).validate()
>>> elem.ONEOF(elem.STRING, elem.NUMBER).validate()
>>> elem.NOT(elem.NULL).validate()
>>> elem.NOT(elem.ANY).validate()
>>> elem.ANY.validate()
property ANY
ALLOF(*TYPES)[source]
ANYOF(*TYPES)[source]
ONEOF(*TYPES)[source]
NOT(TYPE)[source]
class kwcoco.util.ScalarElements[source]

Bases: object

Single-valued elements

property NULL

//json-schema.org/understanding-json-schema/reference/null.html

Type:

https

property BOOLEAN

//json-schema.org/understanding-json-schema/reference/null.html

Type:

https

property STRING

//json-schema.org/understanding-json-schema/reference/string.html

Type:

https

property NUMBER

//json-schema.org/understanding-json-schema/reference/numeric.html#number

Type:

https

property INTEGER

//json-schema.org/understanding-json-schema/reference/numeric.html#integer

Type:

https

class kwcoco.util.SchemaElements[source]

Bases: ScalarElements, QuantifierElements, ContainerElements

Functional interface into defining jsonschema structures.

See mixin classes for details.

References

https://json-schema.org/understanding-json-schema/

Todo

  • [ ] Generics: title, description, default, examples

CommandLine

xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/jsonschema_elements.py SchemaElements

Example

>>> from kwcoco.util.jsonschema_elements import *  # NOQA
>>> elem = SchemaElements()
>>> elem.ARRAY(elem.ANY())
>>> schema = OBJECT({
>>>     'prop1': ARRAY(INTEGER, minItems=3),
>>>     'prop2': ARRAY(STRING, numItems=2),
>>>     'prop3': ARRAY(OBJECT({
>>>         'subprob1': NUMBER,
>>>         'subprob2': NUMBER,
>>>     }))
>>> })
>>> print('schema = {}'.format(ub.urepr(schema, nl=2, sort=1)))
schema = {
    'properties': {
        'prop1': {'items': {'type': 'integer'}, 'minItems': 3, 'type': 'array'},
        'prop2': {'items': {'type': 'string'}, 'maxItems': 2, 'minItems': 2, 'type': 'array'},
        'prop3': {'items': {'properties': {'subprob1': {'type': 'number'}, 'subprob2': {'type': 'number'}}, 'type': 'object'}, 'type': 'array'},
    },
    'type': 'object',
}
>>> TYPE = elem.OBJECT({
>>>     'p1': ANY,
>>>     'p2': ANY,
>>> }, required=['p1'])
>>> import jsonschema
>>> inst = {'p1': None}
>>> jsonschema.validate(inst, schema=TYPE)
>>> #jsonschema.validate({'p2': None}, schema=TYPE)
class kwcoco.util.StratifiedGroupKFold(n_splits=3, shuffle=False, random_state=None)[source]

Bases: _BaseKFold

Stratified K-Folds cross-validator with Grouping

Provides train/test indices to split data in train/test sets.

This cross-validation object is a variation of GroupKFold that returns stratified folds. The folds are made by preserving the percentage of samples for each class.

This is an old interface and should likely be refactored and modernized.

Parameters:

n_splits (int, default=3) – Number of folds. Must be at least 2.

_make_test_folds(X, y=None, groups=None)[source]
Parameters:
  • X (ndarray) – data

  • y (ndarray) – labels

  • groups (ndarray) – groupids for items. Items with the same groupid must be placed in the same group.

Returns:

test_folds

Return type:

list

Example

>>> from kwcoco.util.util_sklearn import *  # NOQA
>>> import kwarray
>>> rng = kwarray.ensure_rng(0)
>>> groups = [1, 1, 3, 4, 2, 2, 7, 8, 8]
>>> y      = [1, 1, 1, 1, 2, 2, 2, 3, 3]
>>> X = np.empty((len(y), 0))
>>> self = StratifiedGroupKFold(random_state=rng, shuffle=True)
>>> skf_list = list(self.split(X=X, y=y, groups=groups))
>>> import ubelt as ub
>>> print(ub.urepr(skf_list, nl=1, with_dtype=False))
[
    (np.array([2, 3, 4, 5, 6]), np.array([0, 1, 7, 8])),
    (np.array([0, 1, 2, 7, 8]), np.array([3, 4, 5, 6])),
    (np.array([0, 1, 3, 4, 5, 6, 7, 8]), np.array([2])),
]
_iter_test_masks(X, y=None, groups=None)[source]
split(X, y, groups=None)[source]

Generate indices to split data into training and test set.

_abc_impl = <_abc._abc_data object>
kwcoco.util.ensure_json_serializable(dict_, normalize_containers=False, verbose=0)[source]

Attempt to convert common types (e.g. numpy) into something json complient

Convert numpy and tuples into lists

Parameters:

normalize_containers (bool) – if True, normalizes dict containers to be standard python structures. Defaults to False.

Example

>>> data = ub.ddict(lambda: int)
>>> data['foo'] = ub.ddict(lambda: int)
>>> data['bar'] = np.array([1, 2, 3])
>>> data['foo']['a'] = 1
>>> data['foo']['b'] = (1, np.array([1, 2, 3]), {3: np.int32(3), 4: np.float16(1.0)})
>>> dict_ = data
>>> print(ub.urepr(data, nl=-1))
>>> assert list(find_json_unserializable(data))
>>> result = ensure_json_serializable(data, normalize_containers=True)
>>> print(ub.urepr(result, nl=-1))
>>> assert not list(find_json_unserializable(result))
>>> assert type(result) is dict
kwcoco.util.find_json_unserializable(data, quickcheck=False)[source]

Recurse through json datastructure and find any component that causes a serialization error. Record the location of these errors in the datastructure as we recurse through the call tree.

Parameters:
  • data (object) – data that should be json serializable

  • quickcheck (bool) – if True, check the entire datastructure assuming its ok before doing the python-based recursive logic.

Returns:

list of “bad part” dictionaries containing items

’value’ - the value that caused the serialization error

’loc’ - which contains a list of key/indexes that can be used to lookup the location of the unserializable value. If the “loc” is a list, then it indicates a rare case where a key in a dictionary is causing the serialization error.

Return type:

List[Dict]

Example

>>> from kwcoco.util.util_json import *  # NOQA
>>> part = ub.ddict(lambda: int)
>>> part['foo'] = ub.ddict(lambda: int)
>>> part['bar'] = np.array([1, 2, 3])
>>> part['foo']['a'] = 1
>>> # Create a dictionary with two unserializable parts
>>> data = [1, 2, {'nest1': [2, part]}, {frozenset({'badkey'}): 3, 2: 4}]
>>> parts = list(find_json_unserializable(data))
>>> print('parts = {}'.format(ub.urepr(parts, nl=1)))
>>> # Check expected structure of bad parts
>>> assert len(parts) == 2
>>> part = parts[1]
>>> assert list(part['loc']) == [2, 'nest1', 1, 'bar']
>>> # We can use the "loc" to find the bad value
>>> for part in parts:
>>>     # "loc" is a list of directions containing which keys/indexes
>>>     # to traverse at each descent into the data structure.
>>>     directions = part['loc']
>>>     curr = data
>>>     special_flag = False
>>>     for key in directions:
>>>         if isinstance(key, list):
>>>             # special case for bad keys
>>>             special_flag = True
>>>             break
>>>         else:
>>>             # normal case for bad values
>>>             curr = curr[key]
>>>     if special_flag:
>>>         assert part['data'] in curr.keys()
>>>         assert part['data'] is key[1]
>>>     else:
>>>         assert part['data'] is curr

Example

>>> # xdoctest: +SKIP("TODO: circular ref detect algo is wrong, fix it")
>>> from kwcoco.util.util_json import *  # NOQA
>>> import pytest
>>> # Test circular reference
>>> data = [[], {'a': []}]
>>> data[1]['a'].append(data)
>>> with pytest.raises(ValueError, match="Circular reference detected at.*1, 'a', 1*"):
...     parts = list(find_json_unserializable(data))
>>> # Should be ok here
>>> shared_data = {'shared': 1}
>>> data = [[shared_data], shared_data]
>>> parts = list(find_json_unserializable(data))
kwcoco.util.indexable_allclose(dct1, dct2, return_info=False)[source]

Walks through two nested data structures and ensures that everything is roughly the same.

Note

Use the version in ubelt instead

Parameters:
  • dct1 – a nested indexable item

  • dct2 – a nested indexable item

Example

>>> from kwcoco.util.util_json import indexable_allclose
>>> dct1 = {
>>>     'foo': [1.222222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>> }
>>> dct2 = {
>>>     'foo': [1.22222, 1.333],
>>>     'bar': 1,
>>>     'baz': [],
>>> }
>>> assert indexable_allclose(dct1, dct2)

Only resolve symlinks of directories, not the base file

kwcoco.util.resolve_relative_to(path, dpath, strict=False)[source]

Given a path, try to resolve its symlinks such that it is relative to the given dpath.

Example

>>> from kwcoco.util.util_reroot import *  # NOQA
>>> import os
>>> def _symlink(self, target, verbose=0):
>>>     return ub.Path(ub.symlink(target, self, verbose=verbose))
>>> ub.Path._symlink = _symlink
>>> #
>>> # TODO: try to enumerate all basic cases
>>> #
>>> base = ub.Path.appdir('kwcoco/tests/reroot')
>>> base.delete().ensuredir()
>>> #
>>> drive1 = (base / 'drive1').ensuredir()
>>> drive2 = (base / 'drive2').ensuredir()
>>> #
>>> data_repo1 = (drive1 / 'data_repo1').ensuredir()
>>> cache = (data_repo1 / '.cache').ensuredir()
>>> real_file1 = (cache / 'real_file1').touch()
>>> #
>>> real_bundle = (data_repo1 / 'real_bundle').ensuredir()
>>> real_assets = (real_bundle / 'assets').ensuredir()
>>> #
>>> # Symlink file outside of the bundle
>>> link_file1 = (real_assets / 'link_file1')._symlink(real_file1)
>>> real_file2 = (real_assets / 'real_file2').touch()
>>> link_file2 = (real_assets / 'link_file2')._symlink(real_file2)
>>> #
>>> #
>>> # A symlink to the data repo
>>> data_repo2 = (drive1 / 'data_repo2')._symlink(data_repo1)
>>> data_repo3 = (drive2 / 'data_repo3')._symlink(data_repo1)
>>> data_repo4 = (drive2 / 'data_repo4')._symlink(data_repo2)
>>> #
>>> # A prediction repo TODO
>>> pred_repo5 = (drive2 / 'pred_repo5').ensuredir()
>>> #
>>> # _ = ub.cmd(f'tree -a {base}', verbose=3)
>>> #
>>> fpaths = []
>>> for r, ds, fs in os.walk(base, followlinks=True):
>>>     for f in fs:
>>>         if 'file' in f:
>>>             fpath = ub.Path(r) / f
>>>             fpaths.append(fpath)
>>> #
>>> #
>>> dpath = real_bundle.resolve()
>>> #
>>> for path in fpaths:
>>>     # print(f'{path}')
>>>     # print(f'{path.resolve()=}')
>>>     resolved_rel = resolve_relative_to(path, dpath)
>>>     print('resolved_rel = {!r}'.format(resolved_rel))
kwcoco.util.smart_truncate(string, max_length=0, separator=' ', trunc_loc=0.5, trunc_char='~')[source]

Truncate a string. :param string (str): string for modification :param max_length (int): output string length :param word_boundary (bool): :param save_order (bool): if True then word order of output string is like input string :param separator (str): separator between words :param trunc_loc (float): fraction of location where to remove the text

trunc_char (str): the character to denote where truncation is starting

Returns:

kwcoco.util.special_reroot_single(dset, verbose=0)[source]
kwcoco.util.unarchive_file(archive_fpath, output_dpath='.', verbose=1, overwrite=True)[source]