Source code for kwcoco.util.util_json

import copy
import numpy as np
import ubelt as ub
import json
from collections import OrderedDict
import pathlib

# backwards compat
[docs]IndexableWalker = ub.IndexableWalker
[docs]def ensure_json_serializable(dict_, normalize_containers=False, verbose=0): """ Attempt to convert common types (e.g. numpy) into something json complient Convert numpy and tuples into lists Args: normalize_containers (bool, default=False): if True, normalizes dict containers to be standard python structures. Example: >>> data = ub.ddict(lambda: int) >>> data['foo'] = ub.ddict(lambda: int) >>> data['bar'] = np.array([1, 2, 3]) >>> data['foo']['a'] = 1 >>> data['foo']['b'] = (1, np.array([1, 2, 3]), {3: np.int32(3), 4: np.float16(1.0)}) >>> dict_ = data >>> print(ub.repr2(data, nl=-1)) >>> assert list(find_json_unserializable(data)) >>> result = ensure_json_serializable(data, normalize_containers=True) >>> print(ub.repr2(result, nl=-1)) >>> assert not list(find_json_unserializable(result)) >>> assert type(result) is dict """ dict_ = copy.deepcopy(dict_) def _norm_container(c): if isinstance(c, dict): # Cast to a normal dictionary if isinstance(c, OrderedDict): if type(c) is not OrderedDict: c = OrderedDict(c) else: if type(c) is not dict: c = dict(c) return c walker = ub.IndexableWalker(dict_) for prefix, value in walker: if isinstance(value, tuple): new_value = list(value) walker[prefix] = new_value elif isinstance(value, np.ndarray): new_value = value.tolist() walker[prefix] = new_value elif isinstance(value, (np.integer)): new_value = int(value) walker[prefix] = new_value elif isinstance(value, (np.floating)): new_value = float(value) walker[prefix] = new_value elif isinstance(value, (np.complexfloating)): new_value = complex(value) walker[prefix] = new_value elif isinstance(value, pathlib.Path): new_value = str(value) walker[prefix] = new_value elif hasattr(value, '__json__'): new_value = value.__json__() walker[prefix] = new_value elif normalize_containers: if isinstance(value, dict): new_value = _norm_container(value) walker[prefix] = new_value if normalize_containers: # normalize the outer layer dict_ = _norm_container(dict_) return dict_
[docs]def find_json_unserializable(data, quickcheck=False): """ Recurse through json datastructure and find any component that causes a serialization error. Record the location of these errors in the datastructure as we recurse through the call tree. Args: data (object): data that should be json serializable quickcheck (bool): if True, check the entire datastructure assuming its ok before doing the python-based recursive logic. Returns: List[Dict]: list of "bad part" dictionaries containing items 'value' - the value that caused the serialization error 'loc' - which contains a list of key/indexes that can be used to lookup the location of the unserializable value. If the "loc" is a list, then it indicates a rare case where a key in a dictionary is causing the serialization error. Example: >>> from kwcoco.util.util_json import * # NOQA >>> part = ub.ddict(lambda: int) >>> part['foo'] = ub.ddict(lambda: int) >>> part['bar'] = np.array([1, 2, 3]) >>> part['foo']['a'] = 1 >>> # Create a dictionary with two unserializable parts >>> data = [1, 2, {'nest1': [2, part]}, {frozenset({'badkey'}): 3, 2: 4}] >>> parts = list(find_json_unserializable(data)) >>> print('parts = {}'.format(ub.repr2(parts, nl=1))) >>> # Check expected structure of bad parts >>> assert len(parts) == 2 >>> part = parts[1] >>> assert list(part['loc']) == [2, 'nest1', 1, 'bar'] >>> # We can use the "loc" to find the bad value >>> for part in parts: >>> # "loc" is a list of directions containing which keys/indexes >>> # to traverse at each descent into the data structure. >>> directions = part['loc'] >>> curr = data >>> special_flag = False >>> for key in directions: >>> if isinstance(key, list): >>> # special case for bad keys >>> special_flag = True >>> break >>> else: >>> # normal case for bad values >>> curr = curr[key] >>> if special_flag: >>> assert part['data'] in curr.keys() >>> assert part['data'] is key[1] >>> else: >>> assert part['data'] is curr """ needs_check = True if quickcheck: try: # Might be a more efficient way to do this check. We duplicate a lot of # work by doing the check for unserializable data this way. json.dumps(data) except Exception: # If there is unserializable data, find out where it is. # is_serializable = False pass else: # is_serializable = True needs_check = False if needs_check: # mode = 'new' # if mode == 'new': scalar_types = (int, float, str, type(None)) container_types = (tuple, list, dict) serializable_types = scalar_types + container_types walker = ub.IndexableWalker(data) for prefix, value in walker: *root, key = prefix if not isinstance(key, scalar_types): # Special case where a dict key is the error value # Purposely make loc non-hashable so its not confused with # an address. All we can know in this case is that they key # is at this level, there is no concept of where. yield {'loc': root + [['.keys', key]], 'data': key} elif not isinstance(value, serializable_types): yield {'loc': prefix, 'data': value}
[docs]def indexable_allclose(dct1, dct2, return_info=False): """ Walks through two nested data structures and ensures that everything is roughly the same. Args: dct1: a nested indexable item dct2: a nested indexable item Example: >>> from kwcoco.util.util_json import indexable_allclose >>> dct1 = { >>> 'foo': [1.222222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> dct2 = { >>> 'foo': [1.22222, 1.333], >>> 'bar': 1, >>> 'baz': [], >>> } >>> assert indexable_allclose(dct1, dct2) """ walker1 = ub.IndexableWalker(dct1) walker2 = ub.IndexableWalker(dct2) flat_items1 = [ (path, value) for path, value in walker1 if not isinstance(value, walker1.indexable_cls) or len(value) == 0] flat_items2 = [ (path, value) for path, value in walker2 if not isinstance(value, walker1.indexable_cls) or len(value) == 0] flat_items1 = sorted(flat_items1) flat_items2 = sorted(flat_items2) if len(flat_items1) != len(flat_items2): info = { 'faillist': ['length mismatch'] } final_flag = False else: passlist = [] faillist = [] for t1, t2 in zip(flat_items1, flat_items2): p1, v1 = t1 p2, v2 = t2 assert p1 == p2 flag = (v1 == v2) if not flag: if isinstance(v1, float) and isinstance(v2, float) and np.isclose(v1, v2): flag = True if flag: passlist.append(p1) else: faillist.append((p1, v1, v2)) final_flag = len(faillist) == 0 info = { 'passlist': passlist, 'faillist': faillist, } if return_info: return final_flag, info else: return final_flag