kwcoco.util package¶
Subpackages¶
- kwcoco.util.delayed_ops package
- Submodules
- Module contents
Submodules¶
- kwcoco.util.dict_like module
- kwcoco.util.jsonschema_elements module
- kwcoco.util.lazy_frame_backends module
- kwcoco.util.util_archive module
- kwcoco.util.util_delayed_poc module
- kwcoco.util.util_futures module
- kwcoco.util.util_json module
- kwcoco.util.util_monkey module
- kwcoco.util.util_reroot module
- kwcoco.util.util_sklearn module
- kwcoco.util.util_truncate module
Module contents¶
mkinit ~/code/kwcoco/kwcoco/util/__init__.py -w mkinit ~/code/kwcoco/kwcoco/util/__init__.py –lazy
- kwcoco.util.ALLOF(*TYPES)¶
- kwcoco.util.ANYOF(*TYPES)¶
- kwcoco.util.ARRAY(TYPE={}, **kw)¶
https://json-schema.org/understanding-json-schema/reference/array.html
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> ARRAY(numItems=3) >>> schema = ARRAY(minItems=3) >>> schema.validate() {'type': 'array', 'items': {}, 'minItems': 3}
- class kwcoco.util.Archive(fpath=None, mode='r', backend=None, file=None)[source]¶
Bases:
object
Abstraction over zipfile and tarfile
Todo
see if we can use one of these other tools instead
Example
>>> from kwcoco.util.util_archive import Archive >>> import ubelt as ub >>> dpath = ub.Path.appdir('kwcoco', 'tests', 'util', 'archive') >>> dpath.delete().ensuredir() >>> # Test write mode >>> mode = 'w' >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode=mode) >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode=mode) >>> open(dpath / 'data_1only.txt', 'w').write('bazbzzz') >>> open(dpath / 'data_2only.txt', 'w').write('buzzz') >>> open(dpath / 'data_both.txt', 'w').write('foobar') >>> # >>> arc_zip.add(dpath / 'data_both.txt') >>> arc_zip.add(dpath / 'data_1only.txt') >>> # >>> arc_tar.add(dpath / 'data_both.txt') >>> arc_tar.add(dpath / 'data_2only.txt') >>> # >>> arc_zip.close() >>> arc_tar.close() >>> # >>> # Test read mode >>> arc_zip = Archive(str(dpath / 'demo.zip'), mode='r') >>> arc_tar = Archive(str(dpath / 'demo.tar.gz'), mode='r') >>> # Test names >>> name = 'data_both.txt' >>> assert name in arc_zip.names() >>> assert name in arc_tar.names() >>> # Test read >>> assert arc_zip.read(name, mode='r') == 'foobar' >>> assert arc_tar.read(name, mode='r') == 'foobar' >>> # >>> # Test extractall >>> extract_dpath = ub.ensuredir(str(dpath / 'extracted')) >>> extracted1 = arc_zip.extractall(extract_dpath) >>> extracted2 = arc_tar.extractall(extract_dpath) >>> for fpath in extracted2: >>> print(open(fpath, 'r').read()) >>> for fpath in extracted1: >>> print(open(fpath, 'r').read())
- read(name, mode='rb')[source]¶
Read data directly out of the archive.
- Parameters
name (str) – the name of the archive member to read
mode (str) – This is a conceptual parameter that emulates the usual open mode. Defaults to “rb”, which returns data as raw bytes. If “r” will decode the bytes into utf8-text.
- class kwcoco.util.ContainerElements[source]¶
Bases:
object
Types that contain other types
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> print(elem.ARRAY().validate()) >>> print(elem.OBJECT().validate()) >>> print(elem.OBJECT().validate()) {'type': 'array', 'items': {}} {'type': 'object', 'properties': {}} {'type': 'object', 'properties': {}}
- ARRAY(TYPE={}, **kw)[source]¶
https://json-schema.org/understanding-json-schema/reference/array.html
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> ARRAY(numItems=3) >>> schema = ARRAY(minItems=3) >>> schema.validate() {'type': 'array', 'items': {}, 'minItems': 3}
- OBJECT(PROPERTIES={}, **kw)[source]¶
https://json-schema.org/understanding-json-schema/reference/object.html
Example
>>> import jsonschema >>> schema = elem.OBJECT() >>> jsonschema.validate({}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.ANY(), >>> 'key2': elem.ANY(), >>> }, required=['key1']) >>> jsonschema.validate({'key1': None}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.OBJECT({'arr': elem.ARRAY()}), >>> 'key2': elem.ANY(), >>> }, required=['key1'], title='a title') >>> schema.validate() >>> print('schema = {}'.format(ub.repr2(schema, sort=1, nl=-1))) >>> jsonschema.validate({'key1': {'arr': []}}, schema) schema = { 'properties': { 'key1': { 'properties': { 'arr': {'items': {}, 'type': 'array'} }, 'type': 'object' }, 'key2': {} }, 'required': ['key1'], 'title': 'a title', 'type': 'object' }
- class kwcoco.util.DictLike[source]¶
Bases:
NiceRepr
- An inherited class must specify the
getitem
,setitem
, and keys
methods.
A class is dictionary like if it has:
__iter__
,__len__
,__contains__
,__getitem__
,items
,keys
,values
,get
,and if it should be writable it should have:
__delitem__
,__setitem__
,update
,And perhaps:
copy
,__iter__
,__len__
,__contains__
,__getitem__
,items
,keys
,values
,get
,and if it should be writable it should have:
__delitem__
,__setitem__
,update
,And perhaps:
copy
,- asdict()¶
- Return type
Dict
- An inherited class must specify the
- class kwcoco.util.Element(base, options={}, _magic=None)[source]¶
Bases:
dict
A dictionary used to define an element of a JSON Schema.
The exact keys/values for the element will depend on the type of element being described. The
SchemaElements
defines exactly what these are for the core elements. (e.g. OBJECT, INTEGER, NULL, ARRAY, ANYOF)Example
>>> from kwcoco.coco_schema import * # NOQA >>> self = Element(base={'type': 'demo'}, options={'opt1', 'opt2'}) >>> new = self(opt1=3) >>> print('self = {}'.format(ub.repr2(self, nl=1, sort=1))) >>> print('new = {}'.format(ub.repr2(new, nl=1, sort=1))) >>> print('new2 = {}'.format(ub.repr2(new(), nl=1, sort=1))) >>> print('new3 = {}'.format(ub.repr2(new(title='myvar'), nl=1, sort=1))) >>> print('new4 = {}'.format(ub.repr2(new(title='myvar')(examples=['']), nl=1, sort=1))) >>> print('new5 = {}'.format(ub.repr2(new(badattr=True), nl=1, sort=1))) self = { 'type': 'demo', } new = { 'opt1': 3, 'type': 'demo', } new2 = { 'opt1': 3, 'type': 'demo', } new3 = { 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new4 = { 'examples': [''], 'opt1': 3, 'title': 'myvar', 'type': 'demo', } new5 = { 'opt1': 3, 'type': 'demo', }
- class kwcoco.util.IndexableWalker(data, dict_cls=(<class 'dict'>, ), list_cls=(<class 'list'>, <class 'tuple'>))[source]¶
Bases:
Generator
Traverses through a nested tree-liked indexable structure.
Generates a path and value to each node in the structure. The path is a list of indexes which if applied in order will reach the value.
The
__setitem__
method can be used to modify a nested value based on the path returned by the generator.When generating values, you can use “send” to prevent traversal of a particular branch.
- RelatedWork:
- https://pypi.org/project/python-benedict/ - implements a dictionary
subclass with similar nested indexing abilities.
Example
>>> # Given Nested Data >>> data = { >>> 'foo': {'bar': 1}, >>> 'baz': [{'biz': 3}, {'buz': [4, 5, 6]}], >>> } >>> # Create an IndexableWalker >>> walker = IndexableWalker(data) >>> # We iterate over the data as if it was flat >>> # ignore the <want> string due to order issues on older Pythons >>> # xdoctest: +IGNORE_WANT >>> for path, val in walker: >>> print(path) ['foo'] ['baz'] ['baz', 0] ['baz', 1] ['baz', 1, 'buz'] ['baz', 1, 'buz', 0] ['baz', 1, 'buz', 1] ['baz', 1, 'buz', 2] ['baz', 0, 'biz'] ['foo', 'bar'] >>> # We can use "paths" as keys to getitem into the walker >>> path = ['baz', 1, 'buz', 2] >>> val = walker[path] >>> assert val == 6 >>> # We can use "paths" as keys to setitem into the walker >>> assert data['baz'][1]['buz'][2] == 6 >>> walker[path] = 7 >>> assert data['baz'][1]['buz'][2] == 7 >>> # We can use "paths" as keys to delitem into the walker >>> assert data['baz'][1]['buz'][1] == 5 >>> del walker[['baz', 1, 'buz', 1]] >>> assert data['baz'][1]['buz'][1] == 7
Example
>>> # Create nested data >>> # xdoctest: +REQUIRES(module:numpy) >>> import numpy as np >>> import ubelt as ub >>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = np.array([1, 2, 3]) >>> data['foo']['c'] = [1, 2, 3] >>> data['baz'] = 3 >>> print('data = {}'.format(ub.repr2(data, nl=True))) >>> # We can walk through every node in the nested tree >>> walker = IndexableWalker(data) >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> if path[-1] == 'c': >>> # Use send to prevent traversing this branch >>> got = walker.send(False) >>> # We can modify the value based on the returned path >>> walker[path] = 'changed the value of c' >>> print('data = {}'.format(ub.repr2(data, nl=True))) >>> assert data['foo']['c'] == 'changed the value of c'
Example
>>> # Test sending false for every data item >>> # xdoctest: +REQUIRES(CPython) >>> # xdoctest: +REQUIRES(module:numpy) >>> import ubelt as ub >>> import numpy as np >>> data = {1: 1} >>> walker = IndexableWalker(data) >>> for path, value in walker: >>> print('walk path = {}'.format(ub.repr2(path, nl=0))) >>> walker.send(False) >>> data = {} >>> walker = IndexableWalker(data) >>> for path, value in walker: >>> walker.send(False)
- kwcoco.util.NOT(TYPE)¶
- kwcoco.util.OBJECT(PROPERTIES={}, **kw)¶
https://json-schema.org/understanding-json-schema/reference/object.html
Example
>>> import jsonschema >>> schema = elem.OBJECT() >>> jsonschema.validate({}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.ANY(), >>> 'key2': elem.ANY(), >>> }, required=['key1']) >>> jsonschema.validate({'key1': None}, schema) >>> # >>> import jsonschema >>> schema = elem.OBJECT({ >>> 'key1': elem.OBJECT({'arr': elem.ARRAY()}), >>> 'key2': elem.ANY(), >>> }, required=['key1'], title='a title') >>> schema.validate() >>> print('schema = {}'.format(ub.repr2(schema, sort=1, nl=-1))) >>> jsonschema.validate({'key1': {'arr': []}}, schema) schema = { 'properties': { 'key1': { 'properties': { 'arr': {'items': {}, 'type': 'array'} }, 'type': 'object' }, 'key2': {} }, 'required': ['key1'], 'title': 'a title', 'type': 'object' }
- kwcoco.util.ONEOF(*TYPES)¶
- class kwcoco.util.QuantifierElements[source]¶
Bases:
object
Quantifier types
https://json-schema.org/understanding-json-schema/reference/combining.html#allof
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem.ANYOF(elem.STRING, elem.NUMBER).validate() >>> elem.ONEOF(elem.STRING, elem.NUMBER).validate() >>> elem.NOT(elem.NULL).validate() >>> elem.NOT(elem.ANY).validate() >>> elem.ANY.validate()
- property ANY¶
- class kwcoco.util.ScalarElements[source]¶
Bases:
object
Single-valued elements
- property NULL¶
//json-schema.org/understanding-json-schema/reference/null.html
- Type
https
- property BOOLEAN¶
//json-schema.org/understanding-json-schema/reference/null.html
- Type
https
- property STRING¶
//json-schema.org/understanding-json-schema/reference/string.html
- Type
https
- property NUMBER¶
//json-schema.org/understanding-json-schema/reference/numeric.html#number
- Type
https
- property INTEGER¶
//json-schema.org/understanding-json-schema/reference/numeric.html#integer
- Type
https
- class kwcoco.util.SchemaElements[source]¶
Bases:
ScalarElements
,QuantifierElements
,ContainerElements
Functional interface into defining jsonschema structures.
See mixin classes for details.
References
https://json-schema.org/understanding-json-schema/
Todo
[ ] Generics: title, description, default, examples
CommandLine
xdoctest -m /home/joncrall/code/kwcoco/kwcoco/util/jsonschema_elements.py SchemaElements
Example
>>> from kwcoco.util.jsonschema_elements import * # NOQA >>> elem = SchemaElements() >>> elem.ARRAY(elem.ANY()) >>> schema = OBJECT({ >>> 'prop1': ARRAY(INTEGER, minItems=3), >>> 'prop2': ARRAY(STRING, numItems=2), >>> 'prop3': ARRAY(OBJECT({ >>> 'subprob1': NUMBER, >>> 'subprob2': NUMBER, >>> })) >>> }) >>> print('schema = {}'.format(ub.repr2(schema, nl=2, sort=1))) schema = { 'properties': { 'prop1': {'items': {'type': 'integer'}, 'minItems': 3, 'type': 'array'}, 'prop2': {'items': {'type': 'string'}, 'maxItems': 2, 'minItems': 2, 'type': 'array'}, 'prop3': {'items': {'properties': {'subprob1': {'type': 'number'}, 'subprob2': {'type': 'number'}}, 'type': 'object'}, 'type': 'array'}, }, 'type': 'object', }
>>> TYPE = elem.OBJECT({ >>> 'p1': ANY, >>> 'p2': ANY, >>> }, required=['p1']) >>> import jsonschema >>> inst = {'p1': None} >>> jsonschema.validate(inst, schema=TYPE) >>> #jsonschema.validate({'p2': None}, schema=TYPE)
- class kwcoco.util.StratifiedGroupKFold(n_splits=3, shuffle=False, random_state=None)[source]¶
Bases:
_BaseKFold
Stratified K-Folds cross-validator with Grouping
Provides train/test indices to split data in train/test sets.
This cross-validation object is a variation of GroupKFold that returns stratified folds. The folds are made by preserving the percentage of samples for each class.
This is an old interface and should likely be refactored and modernized.
- Parameters
n_splits (int, default=3) – Number of folds. Must be at least 2.
- kwcoco.util.ensure_json_serializable(dict_, normalize_containers=False, verbose=0)[source]¶
Attempt to convert common types (e.g. numpy) into something json complient
Convert numpy and tuples into lists
- Parameters
normalize_containers (bool) – if True, normalizes dict containers to be standard python structures. Defaults to False.
Example
>>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = (1, np.array([1, 2, 3]), {3: np.int32(3), 4: np.float16(1.0)}) >>> dict_ = data >>> print(ub.repr2(data, nl=-1)) >>> assert list(find_json_unserializable(data)) >>> result = ensure_json_serializable(data, normalize_containers=True) >>> print(ub.repr2(result, nl=-1)) >>> assert not list(find_json_unserializable(result)) >>> assert type(result) is dict
- kwcoco.util.find_json_unserializable(data, quickcheck=False)[source]¶
Recurse through json datastructure and find any component that causes a serialization error. Record the location of these errors in the datastructure as we recurse through the call tree.
- Parameters
data (object) – data that should be json serializable
quickcheck (bool) – if True, check the entire datastructure assuming its ok before doing the python-based recursive logic.
- Returns
list of “bad part” dictionaries containing items
’value’ - the value that caused the serialization error
’loc’ - which contains a list of key/indexes that can be used to lookup the location of the unserializable value. If the “loc” is a list, then it indicates a rare case where a key in a dictionary is causing the serialization error.
- Return type
List[Dict]
Example
>>> from kwcoco.util.util_json import * # NOQA >>> part = ub.ddict(lambda: int) >>> part['foo'] = ub.ddict(lambda: int) >>> part['bar'] = np.array([1, 2, 3]) >>> part['foo']['a'] = 1 >>> # Create a dictionary with two unserializable parts >>> data = [1, 2, {'nest1': [2, part]}, {frozenset({'badkey'}): 3, 2: 4}] >>> parts = list(find_json_unserializable(data)) >>> print('parts = {}'.format(ub.repr2(parts, nl=1))) >>> # Check expected structure of bad parts >>> assert len(parts) == 2 >>> part = parts[1] >>> assert list(part['loc']) == [2, 'nest1', 1, 'bar'] >>> # We can use the "loc" to find the bad value >>> for part in parts: >>> # "loc" is a list of directions containing which keys/indexes >>> # to traverse at each descent into the data structure. >>> directions = part['loc'] >>> curr = data >>> special_flag = False >>> for key in directions: >>> if isinstance(key, list): >>> # special case for bad keys >>> special_flag = True >>> break >>> else: >>> # normal case for bad values >>> curr = curr[key] >>> if special_flag: >>> assert part['data'] in curr.keys() >>> assert part['data'] is key[1] >>> else: >>> assert part['data'] is curr
- kwcoco.util.indexable_allclose(dct1, dct2, return_info=False)[source]¶
Walks through two nested data structures and ensures that everything is roughly the same.
Note
Use the version in ubelt instead
- Parameters
dct1 – a nested indexable item
dct2 – a nested indexable item
Example
>>> from kwcoco.util.util_json import indexable_allclose >>> dct1 = { >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> dct2 = { >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> assert indexable_allclose(dct1, dct2)
- kwcoco.util.resolve_directory_symlinks(path)[source]¶
Only resolve symlinks of directories, not the base file
- kwcoco.util.resolve_relative_to(path, dpath, strict=False)[source]¶
Given a path, try to resolve its symlinks such that it is relative to the given dpath.
Example
>>> from kwcoco.util.util_reroot import * # NOQA >>> import os >>> def _symlink(self, target, verbose=0): >>> return ub.Path(ub.symlink(target, self, verbose=verbose)) >>> ub.Path._symlink = _symlink >>> # >>> # TODO: try to enumerate all basic cases >>> # >>> base = ub.Path.appdir('kwcoco/tests/reroot') >>> base.delete().ensuredir() >>> # >>> drive1 = (base / 'drive1').ensuredir() >>> drive2 = (base / 'drive2').ensuredir() >>> # >>> data_repo1 = (drive1 / 'data_repo1').ensuredir() >>> cache = (data_repo1 / '.cache').ensuredir() >>> real_file1 = (cache / 'real_file1').touch() >>> # >>> real_bundle = (data_repo1 / 'real_bundle').ensuredir() >>> real_assets = (real_bundle / 'assets').ensuredir() >>> # >>> # Symlink file outside of the bundle >>> link_file1 = (real_assets / 'link_file1')._symlink(real_file1) >>> real_file2 = (real_assets / 'real_file2').touch() >>> link_file2 = (real_assets / 'link_file2')._symlink(real_file2) >>> # >>> # >>> # A symlink to the data repo >>> data_repo2 = (drive1 / 'data_repo2')._symlink(data_repo1) >>> data_repo3 = (drive2 / 'data_repo3')._symlink(data_repo1) >>> data_repo4 = (drive2 / 'data_repo4')._symlink(data_repo2) >>> # >>> # A prediction repo TODO >>> pred_repo5 = (drive2 / 'pred_repo5').ensuredir() >>> # >>> # _ = ub.cmd(f'tree -a {base}', verbose=3) >>> # >>> fpaths = [] >>> for r, ds, fs in os.walk(base, followlinks=True): >>> for f in fs: >>> if 'file' in f: >>> fpath = ub.Path(r) / f >>> fpaths.append(fpath) >>> # >>> # >>> dpath = real_bundle.resolve() >>> # >>> for path in fpaths: >>> # print(f'{path}') >>> # print(f'{path.resolve()=}') >>> resolved_rel = resolve_relative_to(path, dpath) >>> print('resolved_rel = {!r}'.format(resolved_rel))
- kwcoco.util.smart_truncate(string, max_length=0, separator=' ', trunc_loc=0.5)[source]¶
Truncate a string. :param string (str): string for modification :param max_length (int): output string length :param word_boundary (bool): :param save_order (bool): if True then word order of output string is like input string :param separator (str): separator between words :param trunc_loc (float): fraction of location where to remove the text :return: