# -*- coding: utf-8 -*-
import copy
import numpy as np
import ubelt as ub
import json
from collections import OrderedDict
import pathlib
# backwards compat
[docs]IndexableWalker = ub.IndexableWalker
[docs]def ensure_json_serializable(dict_, normalize_containers=False, verbose=0):
"""
Attempt to convert common types (e.g. numpy) into something json complient
Convert numpy and tuples into lists
Args:
normalize_containers (bool, default=False):
if True, normalizes dict containers to be standard python
structures.
Example:
>>> data = ub.ddict(lambda: int)
>>> data['foo'] = ub.ddict(lambda: int)
>>> data['bar'] = np.array([1, 2, 3])
>>> data['foo']['a'] = 1
>>> data['foo']['b'] = (1, np.array([1, 2, 3]), {3: np.int32(3), 4: np.float16(1.0)})
>>> dict_ = data
>>> print(ub.repr2(data, nl=-1))
>>> assert list(find_json_unserializable(data))
>>> result = ensure_json_serializable(data, normalize_containers=True)
>>> print(ub.repr2(result, nl=-1))
>>> assert not list(find_json_unserializable(result))
>>> assert type(result) is dict
"""
dict_ = copy.deepcopy(dict_)
def _norm_container(c):
if isinstance(c, dict):
# Cast to a normal dictionary
if isinstance(c, OrderedDict):
if type(c) is not OrderedDict:
c = OrderedDict(c)
else:
if type(c) is not dict:
c = dict(c)
return c
walker = ub.IndexableWalker(dict_)
for prefix, value in walker:
if isinstance(value, tuple):
new_value = list(value)
walker[prefix] = new_value
elif isinstance(value, np.ndarray):
new_value = value.tolist()
walker[prefix] = new_value
elif isinstance(value, (np.integer)):
new_value = int(value)
walker[prefix] = new_value
elif isinstance(value, (np.floating)):
new_value = float(value)
walker[prefix] = new_value
elif isinstance(value, (np.complexfloating)):
new_value = complex(value)
walker[prefix] = new_value
elif isinstance(value, pathlib.Path):
new_value = str(value)
walker[prefix] = new_value
elif hasattr(value, '__json__'):
new_value = value.__json__()
walker[prefix] = new_value
elif normalize_containers:
if isinstance(value, dict):
new_value = _norm_container(value)
walker[prefix] = new_value
if normalize_containers:
# normalize the outer layer
dict_ = _norm_container(dict_)
return dict_
[docs]def find_json_unserializable(data, quickcheck=False):
"""
Recurse through json datastructure and find any component that
causes a serialization error. Record the location of these errors
in the datastructure as we recurse through the call tree.
Args:
data (object): data that should be json serializable
quickcheck (bool): if True, check the entire datastructure assuming
its ok before doing the python-based recursive logic.
Returns:
List[Dict]: list of "bad part" dictionaries containing items
'value' - the value that caused the serialization error
'loc' - which contains a list of key/indexes that can be used
to lookup the location of the unserializable value.
If the "loc" is a list, then it indicates a rare case where
a key in a dictionary is causing the serialization error.
Example:
>>> from kwcoco.util.util_json import * # NOQA
>>> part = ub.ddict(lambda: int)
>>> part['foo'] = ub.ddict(lambda: int)
>>> part['bar'] = np.array([1, 2, 3])
>>> part['foo']['a'] = 1
>>> # Create a dictionary with two unserializable parts
>>> data = [1, 2, {'nest1': [2, part]}, {frozenset({'badkey'}): 3, 2: 4}]
>>> parts = list(find_json_unserializable(data))
>>> print('parts = {}'.format(ub.repr2(parts, nl=1)))
>>> # Check expected structure of bad parts
>>> assert len(parts) == 2
>>> part = parts[1]
>>> assert list(part['loc']) == [2, 'nest1', 1, 'bar']
>>> # We can use the "loc" to find the bad value
>>> for part in parts:
>>> # "loc" is a list of directions containing which keys/indexes
>>> # to traverse at each descent into the data structure.
>>> directions = part['loc']
>>> curr = data
>>> special_flag = False
>>> for key in directions:
>>> if isinstance(key, list):
>>> # special case for bad keys
>>> special_flag = True
>>> break
>>> else:
>>> # normal case for bad values
>>> curr = curr[key]
>>> if special_flag:
>>> assert part['data'] in curr.keys()
>>> assert part['data'] is key[1]
>>> else:
>>> assert part['data'] is curr
"""
needs_check = True
if quickcheck:
try:
# Might be a more efficient way to do this check. We duplicate a lot of
# work by doing the check for unserializable data this way.
json.dumps(data)
except Exception:
# If there is unserializable data, find out where it is.
# is_serializable = False
pass
else:
# is_serializable = True
needs_check = False
if needs_check:
# mode = 'new'
# if mode == 'new':
scalar_types = (int, float, str, type(None))
container_types = (tuple, list, dict)
serializable_types = scalar_types + container_types
walker = ub.IndexableWalker(data)
for prefix, value in walker:
*root, key = prefix
if not isinstance(key, scalar_types):
# Special case where a dict key is the error value
# Purposely make loc non-hashable so its not confused with
# an address. All we can know in this case is that they key
# is at this level, there is no concept of where.
yield {'loc': root + [['.keys', key]], 'data': key}
elif not isinstance(value, serializable_types):
yield {'loc': prefix, 'data': value}
[docs]def indexable_allclose(dct1, dct2, return_info=False):
"""
Walks through two nested data structures and ensures that everything is
roughly the same.
Args:
dct1: a nested indexable item
dct2: a nested indexable item
Example:
>>> from kwcoco.util.util_json import indexable_allclose
>>> dct1 = {
>>> 'foo': [1.222222, 1.333],
>>> 'bar': 1,
>>> 'baz': [],
>>> }
>>> dct2 = {
>>> 'foo': [1.22222, 1.333],
>>> 'bar': 1,
>>> 'baz': [],
>>> }
>>> assert indexable_allclose(dct1, dct2)
"""
walker1 = ub.IndexableWalker(dct1)
walker2 = ub.IndexableWalker(dct2)
flat_items1 = [
(path, value) for path, value in walker1
if not isinstance(value, walker1.indexable_cls) or len(value) == 0]
flat_items2 = [
(path, value) for path, value in walker2
if not isinstance(value, walker1.indexable_cls) or len(value) == 0]
flat_items1 = sorted(flat_items1)
flat_items2 = sorted(flat_items2)
if len(flat_items1) != len(flat_items2):
info = {
'faillist': ['length mismatch']
}
final_flag = False
else:
passlist = []
faillist = []
for t1, t2 in zip(flat_items1, flat_items2):
p1, v1 = t1
p2, v2 = t2
assert p1 == p2
flag = (v1 == v2)
if not flag:
if isinstance(v1, float) and isinstance(v2, float) and np.isclose(v1, v2):
flag = True
if flag:
passlist.append(p1)
else:
faillist.append((p1, v1, v2))
final_flag = len(faillist) == 0
info = {
'passlist': passlist,
'faillist': faillist,
}
if return_info:
return final_flag, info
else:
return final_flag