Source code for kwcoco._helpers

"""
These items were split out of coco_dataset.py which is becoming too big

These are helper data structures used to do things like auto-increment ids,
recycle ids, do renaming, extend sortedcontainers etc...
"""
import sortedcontainers


[docs] class _NextId: """ Helper class to tracks unused ids for new items """ def __init__(self, parent): self.parent = parent # TODO: use a single source of truth for what the top-level tables with # ids are. self.unused = { 'categories': None, 'images': None, 'annotations': None, 'videos': None, 'tracks': None, }
[docs] def _update_unused(self, key): """ Scans for what the next safe id can be for ``key`` """ try: item_list = self.parent.dataset[key] max_id = max(item['id'] for item in item_list) if item_list else 0 next_id = max(max_id + 1, len(item_list)) except KeyError: # The table doesn't exist, so we can use anything next_id = 1 self.unused[key] = next_id
[docs] def get(self, key): """ Get the next safe item id for ``key`` """ if self.unused[key] is None: self._update_unused(key) new_id = self.unused[key] self.unused[key] += 1 return new_id
[docs] class _ID_Remapper: """ Helper to recycle ids for unions. For each dataset we create a mapping between each old id and a new id. If possible and reuse=True we allow the new id to match the old id. After each dataset is finished we mark all those ids as used and subsequent new-ids cannot be chosen from that pool. Args: reuse (bool): if True we are allowed to reuse ids as long as they haven't been used before. Example: >>> video_trackids = [[1, 1, 3, 3, 200, 4], [204, 1, 2, 3, 3, 4, 5, 9]] >>> self = _ID_Remapper(reuse=True) >>> for tids in video_trackids: >>> new_tids = [self.remap(old_tid) for old_tid in tids] >>> self.block_seen() >>> print('new_tids = {!r}'.format(new_tids)) new_tids = [1, 1, 3, 3, 200, 4] new_tids = [204, 205, 2, 206, 206, 207, 5, 9] >>> # >>> self = _ID_Remapper(reuse=False) >>> for tids in video_trackids: >>> new_tids = [self.remap(old_tid) for old_tid in tids] >>> self.block_seen() >>> print('new_tids = {!r}'.format(new_tids)) new_tids = [0, 0, 1, 1, 2, 3] new_tids = [4, 5, 6, 7, 7, 8, 9, 10] """ def __init__(self, reuse=False): self.blocklist = set() self.mapping = dict() self.reuse = reuse self._nextid = 0
[docs] def remap(self, old_id): """ Convert a old-id into a new-id. If self.reuse is True then we will return the same id if it hasn't been blocked yet. """ if old_id in self.mapping: new_id = self.mapping[old_id] else: if not self.reuse or old_id in self.blocklist: # We cannot reuse the old-id new_id = self.next_id() else: # We can reuse the old-id new_id = old_id if isinstance(old_id, int) and old_id >= self._nextid: self._nextid = old_id + 1 self.mapping[old_id] = new_id return new_id
[docs] def block_seen(self): """ Mark all seen ids as unable to be used. Any ids sent to remap will now generate new ids. """ self.blocklist.update(self.mapping.values()) self.mapping = dict()
[docs] def next_id(self): """ Generate a new id that hasnt been used yet """ next_id = self._nextid self._nextid += 1 return next_id
# maybe use an enum? the StrEnum requires 3.11 # import enum # class RenamePolicy(enum.StrEnum): # IGNORE = enum.auto() # WARN = enum.auto() # ERROR = enum.auto()
[docs] class UniqueNameRemapper: """ Helper to ensure names will be unique by appending suffixes. By default will notify users about this action based on `policy`. Example: >>> from kwcoco._helpers import UniqueNameRemapper >>> self = UniqueNameRemapper(policy='ignore') >>> assert self.remap('foo') == 'foo' >>> assert self.remap('foo') == 'foo_v001' >>> assert self.remap('foo') == 'foo_v002' >>> assert self.remap('foo_v001') == 'foo_v003' >>> assert 'foo' in self Example: >>> from kwcoco._helpers import UniqueNameRemapper >>> import pytest >>> # Test error policy >>> self = UniqueNameRemapper(policy='error') >>> assert self.remap('foo') == 'foo' >>> with pytest.raises(Exception) as ex: >>> self.remap('foo') >>> print(f'ex={ex}') """ def __init__(self, policy='warn', name_type='unspecified'): """ Args: policy (str): if "ignore", will not notify the user of a rename. if "warn", will emit a warning when a rename occurs. if "error", will raise an exception if a rename occurs. name_type (str): A hint to the user about what type of name this was when an error or warning message is emitted. """ import re self._seen = set() self.suffix_pat = re.compile(r'(.*)_v(\d+)') if policy not in {'ignore', 'warn', 'error'}: raise ValueError('policy must be ignore, warn, or ignore') self.policy = policy self.name_type = name_type def __contains__(self, name): return name in self._seen
[docs] def remap(self, name): """ Args: name (str): name to check / rename Returns: str: a name guarenteed to be unique """ suffix_pat = self.suffix_pat match = suffix_pat.match(name) if match: prefix, _num = match.groups() num = int(_num) else: prefix = name num = 0 input_name = name did_rename = False while name in self._seen: num += 1 name = '{}_v{:03d}'.format(prefix, num) did_rename = True if did_rename: if self.policy == 'ignore': ... elif self.policy == 'warn': import warnings warnings.warn(f'A {self.name_type!r} name was renamed from {input_name!r} to {name!r}') elif self.policy == 'error': raise Exception(f'A {self.name_type!r} name was renamed from {input_name!r} to {name!r}') self._seen.add(name) return name
[docs] class _CategoryID_Remapper: """ Helper for a category union that re-uses ids whenever possible. Given an old category dictionary, calling :func:`remap` will return a new category dictionary with updated properties if necessary. Example: >>> from kwcoco._helpers import _CategoryID_Remapper >>> self = _CategoryID_Remapper() >>> self.remap({'name': 'cat5', 'id': 5}) >>> self.remap({'name': 'cat6', 'id': 9}) >>> self.remap({'name': 'cat9', 'id': 5}) >>> self.remap({'name': 'cat5', 'id': 9, 'special_property': 5}) >>> assert self._id_to_cat == { >>> 5: {'name': 'cat5', 'id': 5, 'special_property': 5}, >>> 9: {'name': 'cat6', 'id': 9}, >>> 10: {'name': 'cat9', 'id': 10}} """ def __init__(self): self._name_to_cat = {} self._id_to_cat = {} self._categories = [] self._nextid = 1
[docs] def remap(self, old_cat): import ubelt as ub catname = old_cat['name'] new_cat = self._name_to_cat.get(catname, None) if new_cat is None: old_id = old_cat['id'] if old_id in self._id_to_cat: # Need to update the ID new_id = self._nextid self._nextid += 1 else: new_id = old_id if new_id >= self._nextid: self._nextid = new_id + 1 new_cat = {**old_cat} new_cat['id'] = new_id self._id_to_cat[new_id] = new_cat self._name_to_cat[catname] = new_cat self._categories.append(new_cat) else: # add in any special properties that dont disagree with # what already has been seen new_cat.update(ub.udict(old_cat) - new_cat.keys()) return new_cat
# Defined as a global for pickle
[docs] def _lut_image_frame_index(imgs, gid): return imgs[gid]['frame_index']
# backwards compat for pickles _lut_frame_index = _lut_image_frame_index
[docs] def _lut_annot_frame_index(imgs, anns, aid): return imgs[anns[aid]['image_id']].get('frame_index', -1)
[docs] class SortedSet(sortedcontainers.SortedSet): def __repr__(self): """Return string representation of sorted set. ``ss.__repr__()`` <==> ``repr(ss)`` :return: string representation """ type_name = type(self).__name__ return '{0}({1!r})'.format(type_name, list(self))
# Do not use. # Just exist for backwards compatibility with older pickeled data. SortedSetQuiet = SortedSet
[docs] def _delitems(items, remove_idxs, thresh=750): """ Args: items (List): list which will be modified remove_idxs (List[int]): integers to remove (MUST BE UNIQUE) """ if len(remove_idxs) > thresh: # Its typically faster to just make a new list when there are # lots and lots of items to remove. keep_idxs = sorted(set(range(len(items))) - set(remove_idxs)) newlist = [items[idx] for idx in keep_idxs] items[:] = newlist else: # However, when there are a few hundred items to remove, del is faster. for idx in sorted(remove_idxs, reverse=True): del items[idx]
[docs] def _load_and_postprocess(data, loader, postprocess, **loadkw): # Helper for CocoDataset.load_multiple dset = loader(data, **loadkw) if postprocess is not None: dset = postprocess(dset) return dset
[docs] def _image_corruption_check(fpath, only_shape=False, imread_kwargs=None): """ Helper that checks if an image is readable or not """ import kwimage from os.path import exists imread_kwargs = imread_kwargs or {} info = {'fpath': fpath} if not exists(fpath): info['failed'] = True info['error'] = 'does not exist' else: try: if only_shape: kwimage.load_image_shape(fpath) else: kwimage.imread(fpath, **imread_kwargs) info['failed'] = False except Exception as ex: err = str(ex) info['failed'] = True info['error'] = err return info
[docs] def _query_image_ids(coco_dset, select_images=None, select_videos=None, valid_image_ids=None): """ Filters to a specific set of images given query parameters based on json-query (jq). Args: select_images (str | List[int] | None): Can be a coercable YAML list of image ids, or... A json query (via the jq spec) that specifies which images belong in the subset. Note, this is a passed as the body of the following jq query format string to filter valid ids '.images[] | select({select_images}) | .id'. Examples for this argument are as follows: '.id < 3' will select all image ids less than 3. '.file_name | test(".*png")' will select only images with file names that end with png. '.file_name | test(".*png") | not' will select only images with file names that do not end with png. '.myattr == "foo"' will select only image dictionaries where the value of myattr is "foo". '.id < 3 and (.file_name | test(".*png"))' will select only images with id less than 3 that are also pngs. '.myattr | in({"val1": 1, "val4": 1})' will take images where myattr is either val1 or val4. An alternative syntax is: '[.myattr] | inside(["val1", "val4"])' Requires the "jq" python library is installed. select_videos (str | List[int] | None): Can be a coercable YAML list of video ids, or... A json query (via the jq spec) that specifies which videos belong in the subset. Note, this is a passed as the body of the following jq query format string to filter valid ids '.videos[] | select({select_images}) | .id'. Examples for this argument are as follows: '.file_name | startswith("foo")' will select only videos where the name starts with foo. or '.file_name | contains("foo")' will select videos where any part of the filename contains foo. Only applicable for dataset that contain videos. Requires the "jq" python library is installed. valid_image_ids (Set[int] | None): if specified use this initial set of image ids, otherwise select from all available. Returns: List[int]: sorted list of filtered image ids SeeAlso: Based on ~/code/geowatch/geowatch/utils/kwcoco_extensions.py::filter_image_ids Example: >>> # xdoctest: +REQUIRES(module:jq) >>> from kwcoco._helpers import _query_image_ids >>> import kwcoco >>> coco_dset = kwcoco.CocoDataset.demo('vidshapes8', verbose=0) >>> _query_image_ids(coco_dset, select_images='.id < 3') >>> _query_image_ids(coco_dset, select_images='.file_name | test(".*.png")') >>> _query_image_ids(coco_dset, select_images='.file_name | test(".*.png") | not') >>> _query_image_ids(coco_dset, select_images='.id < 3 and (.file_name | test(".*.png"))') >>> _query_image_ids(coco_dset, select_images='.id < 3 or (.file_name | test(".*.png"))') Example: >>> # xdoctest: +REQUIRES(module:kwutil) >>> from kwcoco._helpers import _query_image_ids >>> import kwcoco >>> coco_dset = kwcoco.CocoDataset.demo('vidshapes8', verbose=0) >>> assert _query_image_ids(coco_dset, select_images='[2, 3, 4]') == [2, 3, 4] >>> assert _query_image_ids(coco_dset, select_videos='[3]') == [5, 6] Ignore: # JQ Dev examples import jq dataset = [ {'id': 1, 'name': 'foo'}, {'id': 2, 'name': 'bar'}, {'id': 3, 'name': 'baz'}, {'id': 4, 'name': 'biz'}, ] # The IN keyword doesnt seem to do what I want very well # jq.compile('.[] | select(.id | IN([1, 3]))').input(dataset).all() # This sorta works jq.compile('.[] | select(.id as $id | [1, 3] | index($id) != null)').input(dataset).all() # THERE WE GO, this is more reasonable jq.compile('.[] | select([.id] | inside([1, 3]))').input(dataset).all() jq.compile('.[] | select([.id] | inside([2, 4]))').input(dataset).all() jq.compile('.[] | select([.name] | inside(["foo", "baz"]))').input(dataset).all() jq.compile('.[] | select(.id < 3)').input(dataset).all() jq.compile('.[] | select(.name | test("b.*"))').input(dataset).all() """ import ubelt as ub try: import kwutil except Exception: kwutil = None # TODO: sane messages depending on if kwutil / jq are installed or not if valid_image_ids is None: # Start with all images valid_image_ids = set(coco_dset.images()) if select_images is not None: coerced = None if kwutil is not None: try: coerced = kwutil.Yaml.coerce(select_images) if isinstance(coerced, list): # Allow the user to specify a YAML list of image ids image_selected_gids = set(coerced) valid_image_ids &= image_selected_gids else: coerced = None # probably a query string except Exception: # yaml coerce failed, try jq ... if coerced is None: try: import jq except Exception: print('The jq library is required to run a generic image query') raise try: query_text = ".images[] | select({}) | .id".format(select_images) query = jq.compile(query_text) image_selected_gids = set(query.input(coco_dset.dataset).all()) valid_image_ids &= image_selected_gids except Exception as ex: print('JQ Query Failed: {}, ex={}'.format(query_text, ex)) raise if select_videos is not None: coerced = None if kwutil is not None: try: coerced = kwutil.Yaml.coerce(select_videos) if isinstance(coerced, list): # Allow the user to specify a YAML list of video ids selected_vidids = set(coerced) else: coerced = None # Probably given as a query string except Exception: # yaml coerce failed, try jq ... if coerced is None: try: import jq except Exception: print('The jq library is required to run a generic image query') raise try: query_text = ".videos[] | select({}) | .id".format(select_videos) query = jq.compile(query_text) selected_vidids = query.input(coco_dset.dataset).all() except Exception: print('JQ Query Failed: {}'.format(query_text)) raise vid_selected_gids = set(ub.flatten(coco_dset.index.vidid_to_gids[vidid] for vidid in selected_vidids)) valid_image_ids &= vid_selected_gids valid_image_ids = sorted(valid_image_ids) return valid_image_ids