"""
This module defines the KWCOCO Channel Specification and API.
The KWCOCO Channel specification is a way to semantically express how a
combination of image channels are grouped. This can specify how these channels
(somtimes called bands or features) are arranged on disk or input to an
algorithm. The core idea reduces to a ``Set[List[str]]`` --- or a unordered set
of ordered sequences of strings corresponding to channel "names". The way these
are specified is with a "," to separate lists in an unordered set and with a
"|" to separate the channel names. Other syntax exists for convinience, but
a strict normalized channel spec only contains these core symbols.
Another way to think of a channel spec is that splitting the spec by ","
gives groups of channels that should be processed together and "late-fused".
Within each group the "|" operator "early-fuses" the channels.
For instance, say we had a network and we wanted to process 3-channel rgb
images in one stream and 1-channel infrared images in a second stream and then
fuse them together. The channel specification for channels labled as
'red', 'green', 'blue', and 'infrared' would be:
``infrared,red|green|blue``
Note, it is up to an algorithm to do any early-late fusion. KWCoco simply
provides the specification as a tool to quickly access a particular combination
of channels from disk.
The ChannelSpec has these simple rules:
.. code::
* each 1D channel is a alphanumeric string.
* The pipe ('|') separates aligned early fused stremas (non-communative)
* The comma (',') separates late-fused streams, (happens after pipe operations, and is communative)
* Certain common sets of early fused channels have codenames, for example:
rgb = r|g|b
rgba = r|g|b|a
dxdy = dy|dy
* Multiple channels can be specified via a "slice" notation. For example:
mychan.0:4
represents 4 channels:
mychan.0, mychan.1, mychan.2, and mychan.3
slices after the "." work like python slices
The detailed grammar for the spec is
.. code::
?start: stream
// An identifier can contain spaces
IDEN: ("_"|LETTER) ("_"|" "|LETTER|DIGIT)*
chan_single : IDEN
chan_getitem : IDEN "." INT
chan_getslice_0b : IDEN ":" INT
chan_getslice_ab : IDEN "." INT ":" INT
// A channel code can just be an ID, or it can have a getitem
// style syntax with a scalar or slice as an argument
chan_code : chan_single | chan_getslice_0b | chan_getslice_ab | chan_getitem
// Fused channels are an ordered sequence of channel codes (without sensors)
fused : chan_code ("|" chan_code)*
// Channels can be specified in a sequence but must contain parens
fused_seq : "(" fused ("," fused)* ")"
channel_rhs : fused | fused_seq
stream : channel_rhs ("," channel_rhs)*
%import common.DIGIT
%import common.LETTER
%import common.INT
Note that a stream refers to a the full ChannelSpec and fused refers to
FusedChannelSpec.
For single arrays, the spec is always an early fused spec.
TODO:
- [X] : normalize representations? e.g: rgb = r|g|b? - OPTIONAL
- [X] : rename to BandsSpec or SensorSpec? - REJECTED
- [ ] : allow bands to be coerced, i.e. rgb -> gray, or gray->rgb
TODO:
- [x]: Use FusedChannelSpec as a member of ChannelSpec
- [x]: Handle special slice suffix for length calculations
SeeAlso:
:module:delayed_image.sensorchan_spec - The generalized sensor / channel specification
Note:
* do not specify the same channel in FusedChannelSpec twice
Example:
>>> import delayed_image
>>> spec = delayed_image.ChannelSpec('b1|b2|b3,m.0:4|x1|x2,x.3|x.4|x.5')
>>> print(spec)
<ChannelSpec(b1|b2|b3,m.0:4|x1|x2,x.3|x.4|x.5)>
>>> for stream in spec.streams():
>>> print(stream)
<FusedChannelSpec(b1|b2|b3)>
<FusedChannelSpec(m.0:4|x1|x2)>
<FusedChannelSpec(x.3|x.4|x.5)>
>>> # Normalization
>>> normalized = spec.normalize()
>>> print(normalized)
<ChannelSpec(b1|b2|b3,m.0|m.1|m.2|m.3|x1|x2,x.3|x.4|x.5)>
>>> print(normalized.fuse().spec)
b1|b2|b3|m.0|m.1|m.2|m.3|x1|x2|x.3|x.4|x.5
>>> print(normalized.fuse().concise().spec)
b1|b2|b3|m:4|x1|x2|x.3:6
"""
import abc
import functools
import ubelt as ub
import warnings
class BaseChannelSpec(ub.NiceRepr):
"""
Common code API between :class:`FusedChannelSpec` and :class:`ChannelSpec`
TODO:
- [ ] Keep working on this base spec and ensure the inheriting classes
conform to it.
"""
@property
@abc.abstractmethod
def spec(self):
"""
The string encodeing of this spec
Returns:
str
"""
...
@classmethod
@abc.abstractmethod
def coerce(cls, data):
"""
Try and interpret the input data as some sort of spec
Args:
data (str | int | list | dict | BaseChannelSpec):
any input data that is known to represent a spec
Returns:
BaseChannelSpec
"""
...
@abc.abstractmethod
def streams(self):
"""
Breakup this spec into individual early-fused components
Returns:
List[FusedChannelSpec]
"""
...
@abc.abstractmethod
def normalize(self):
"""
Expand all channel codes into their normalized long-form
Returns:
BaseChannelSpec
"""
...
@abc.abstractmethod
def intersection(self, other):
...
@abc.abstractmethod
def union(self, other):
...
@abc.abstractmethod
def difference(self):
...
@abc.abstractmethod
def issubset(self, other):
...
@abc.abstractmethod
def issuperset(self, other):
...
def __sub__(self, other):
return self.difference(other)
def __nice__(self):
return self.spec
def __json__(self):
return self.spec
def __and__(self, other):
# the parent implementation of this is backwards
return self.intersection(other)
def __or__(self, other):
"""
Union operator is overloaded to early fusion
"""
return self.union(other)
def late_fuse(self, other):
"""
Example:
>>> import delayed_image
>>> a = delayed_image.ChannelSpec.coerce('A|B|C,edf')
>>> b = delayed_image.ChannelSpec.coerce('A12')
>>> c = delayed_image.ChannelSpec.coerce('')
>>> d = delayed_image.ChannelSpec.coerce('rgb')
>>> print(a.late_fuse(b).spec)
>>> print((a + b).spec)
>>> print((b + a).spec)
>>> print((a + b + c).spec)
>>> print(sum([a, b, c, d]).spec)
A|B|C,edf,A12
A|B|C,edf,A12
A12,A|B|C,edf
A|B|C,edf,A12
A|B|C,edf,A12,rgb
"""
if not self.spec:
return other
if not other.spec:
return self
return ChannelSpec.coerce(self.spec + ',' + other.spec)
def __add__(self, other):
"""
Addition operator is overloaded to late fusion
"""
return self.late_fuse(other)
def __radd__(self, other):
"""
Addition operator is overloaded to late fusion
"""
if other == 0:
return self
return other + self
def path_sanitize(self, maxlen=128):
"""
Clean up the channel spec so it can be used in a pathname.
Args:
maxlen (int):
if specified, and the name is longer than this length, it is
shortened. Must be 8 or greater.
Returns:
str: path suitable for usage in a filename
Note:
This mapping is not invertible and should not be relied on
to reconstruct the path spec. This is only a convenience.
Example:
>>> import delayed_image
>>> print(delayed_image.FusedChannelSpec.coerce('a chan with space|bar|baz').path_sanitize())
a chan with space_bar_baz
>>> print(delayed_image.ChannelSpec.coerce('foo|bar|baz,biz').path_sanitize())
foo_bar_baz,biz
Example:
>>> import delayed_image
>>> print(delayed_image.ChannelSpec.coerce('foo.0:3').normalize().path_sanitize(24))
foo.0_foo.1_foo.2
>>> print(delayed_image.ChannelSpec.coerce('foo.0:256').normalize().path_sanitize(24))
tuuxtfnrsvdhezkdndysxo_256
"""
pname = _path_sanitize_v2(self.spec, maxlen=maxlen, hash_suffix=self.numel)
return pname
[docs]
class FusedChannelSpec(BaseChannelSpec):
"""
A specific type of channel spec with only one early fused stream.
The channels in this stream are non-communative
Behaves like a list of atomic-channel codes
(which may represent more than 1 channel), normalized codes always
represent exactly 1 channel.
Note:
This class name and API is in flux and subject to change.
TODO:
A special code indicating a name and some number of bands that that
names contains, this would primarilly be used for large numbers of
channels produced by a network. Like:
resnet_d35d060_L5:512
or
resnet_d35d060_L5[:512]
might refer to a very specific (hashed) set of resnet parameters
with 512 bands
maybe we can do something slicly like:
resnet_d35d060_L5[A:B]
resnet_d35d060_L5:A:B
Do we want to "just store the code" and allow for parsing later?
Or do we want to ensure the serialization is parsed before we
construct the data structure?
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> import pickle
>>> self = FusedChannelSpec.coerce(3)
>>> recon = pickle.loads(pickle.dumps(self))
>>> self = ChannelSpec.coerce('a|b,c|d')
>>> recon = pickle.loads(pickle.dumps(self))
"""
_alias_lut = {
'rgb': ['r', 'g', 'b'],
'rgba': ['r', 'g', 'b', 'a'],
'dxdy': ['dx', 'dy'],
'fxfy': ['fx', 'fy'],
}
# Efficiency memorization of coerced string codes
_memo = {}
_size_lut = {k: len(v) for k, v in _alias_lut.items()}
def __init__(self, parsed, _is_normalized=False):
if __debug__ and not isinstance(parsed, list):
raise TypeError(
'FusedChannelSpec is only directly constructable via a list. '
'Use coerce for a general constructor')
self.parsed = parsed
# denote if we are already normalized or not for speed.
self._is_normalized = _is_normalized
def __len__(self):
if not self._is_normalized:
text = ub.paragraph(
'''
Length Definition for unormalized FusedChannelSpec is in flux.
It is unclear if it should be the (1) number of atomic codes or
(2) the expanded "numel", which is the number of "normalized"
atomic codes. Currently it returns the number "unnormalized"
atomic codes. Normalizing the FusedChannelSpec object or using
"numel" will supress this warning.
''')
warnings.warn(text)
return len(self.parsed)
def __getitem__(self, index):
norm = self.normalize()
if isinstance(index, slice):
return self.__class__(norm.parsed[index])
elif ub.iterable(index):
return self.__class__(list(ub.take(norm.parsed, index)))
else:
return norm.parsed[index]
[docs]
@classmethod
def concat(cls, items):
combined = list(ub.flatten(item.parsed for item in items))
self = cls(combined)
return self
@ub.memoize_property
def spec(self):
return '|'.join(self.parsed)
[docs]
@ub.memoize_method
def unique(self):
return set(self.parsed)
[docs]
@classmethod
def parse(cls, spec):
if not spec:
self = cls([])
else:
self = cls(spec.split('|'))
return self
def __eq__(self, other):
return self.parsed == other.parsed
[docs]
@classmethod
def coerce(cls, data):
"""
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> FusedChannelSpec.coerce(['a', 'b', 'c'])
>>> FusedChannelSpec.coerce('a|b|c')
>>> FusedChannelSpec.coerce(3)
>>> FusedChannelSpec.coerce(FusedChannelSpec(['a']))
>>> assert FusedChannelSpec.coerce('').numel() == 0
"""
if 1:
try:
# Efficiency hack
return cls._memo[data]
except (KeyError, TypeError):
pass
if isinstance(data, list):
self = cls(data)
elif isinstance(data, str):
self = cls.parse(data)
cls._memo[data] = self
elif isinstance(data, int):
# we know the number of channels, but not their names
self = cls(['u{}'.format(i) for i in range(data)])
cls._memo[data] = self
elif isinstance(data, cls):
self = data
elif isinstance(data, ChannelSpec):
parsed = data.parse()
if len(parsed) == 1:
self = cls(ub.peek(parsed.values()).parsed)
else:
raise ValueError(
'Cannot coerce ChannelSpec to a FusedChannelSpec '
'when there are multiple streams')
else:
raise TypeError('unknown type {}'.format(type(data)))
return self
[docs]
def concise(self):
"""
Shorted the channel spec by de-normaliz slice syntax
Returns:
FusedChannelSpec : concise spec
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> self = FusedChannelSpec.coerce(
>>> 'b|a|a.0|a.1|a.2|a.5|c|a.8|a.9|b.0:3|c.0')
>>> short = self.concise()
>>> long = short.normalize()
>>> numels = [c.numel() for c in [self, short, long]]
>>> print('self.spec = {!r}'.format(self.spec))
>>> print('short.spec = {!r}'.format(short.spec))
>>> print('long.spec = {!r}'.format(long.spec))
>>> print('numels = {!r}'.format(numels))
self.spec = 'b|a|a.0|a.1|a.2|a.5|c|a.8|a.9|b.0:3|c.0'
short.spec = 'b|a|a:3|a.5|c|a.8:10|b:3|c.0'
long.spec = 'b|a|a.0|a.1|a.2|a.5|c|a.8|a.9|b.0|b.1|b.2|c.0'
numels = [13, 13, 13]
>>> assert long.concise().spec == short.spec
"""
self_norm = self.normalize()
# TODO: build some helper API for building this sort of contiguous
# chain, I think we do several similar things in other places
# This accum logic is hard to reason about, so an API would be better.
new_parts = []
accum_root = None
accum_stop = None
accum_start = None
ready = None
def format_ready(r, start, stop):
if start + 1 == stop:
code = '{}.{}'.format(r, start)
elif start == 0:
code = '{}:{}'.format(r, stop)
else:
code = '{}.{}:{}'.format(r, start, stop)
return code
for part in self_norm.parsed:
# print('---')
# print('part = {!r}'.format(part))
# print('accum_root = {!r}'.format(accum_root))
if '.' in part:
# Part might be part of a contiguous streak
# (There should be a library for this)
root, index_suffix = part.split('.')
index = int(index_suffix)
if accum_root == root:
# Check if we can continue an existing segment
if index == accum_stop:
# print('continue segment')
accum_stop = index + 1
else:
# print('cannot continue, v1')
ready = format_ready(accum_root, accum_start, accum_stop)
accum_root = None
elif accum_root is not None:
# print('cannot continue, v2')
ready = format_ready(accum_root, accum_start, accum_stop)
accum_root = None
if accum_root is None:
# print('Start new segment')
accum_root = root
accum_start = index
accum_stop = index + 1
else:
if accum_root is not None:
# print('cannot continue, v3')
ready = format_ready(accum_root, accum_start, accum_stop)
accum_root = None
if ready is not None:
# print('Append ready={}'.format(ready))
new_parts.append(ready)
ready = None
if accum_root is None:
# print('Append part={}'.format(part))
new_parts.append(part)
ready = None
if accum_root is not None:
# print('end of iter, finalize last accum')
ready = format_ready(accum_root, accum_start, accum_stop)
new_parts.append(ready)
new = FusedChannelSpec(new_parts, _is_normalized=False)
return new
[docs]
def normalize(self):
"""
Replace aliases with explicit single-band-per-code specs
Returns:
FusedChannelSpec : normalize spec
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> self = FusedChannelSpec.coerce('b1|b2|b3|rgb')
>>> normed = self.normalize()
>>> print('self = {}'.format(self))
>>> print('normed = {}'.format(normed))
self = <FusedChannelSpec(b1|b2|b3|rgb)>
normed = <FusedChannelSpec(b1|b2|b3|r|g|b)>
>>> self = FusedChannelSpec.coerce('B:1:11')
>>> normed = self.normalize()
>>> print('self = {}'.format(self))
>>> print('normed = {}'.format(normed))
self = <FusedChannelSpec(B:1:11)>
normed = <FusedChannelSpec(B.1|B.2|B.3|B.4|B.5|B.6|B.7|B.8|B.9|B.10)>
>>> self = FusedChannelSpec.coerce('B.1:11')
>>> normed = self.normalize()
>>> print('self = {}'.format(self))
>>> print('normed = {}'.format(normed))
self = <FusedChannelSpec(B.1:11)>
normed = <FusedChannelSpec(B.1|B.2|B.3|B.4|B.5|B.6|B.7|B.8|B.9|B.10)>
"""
if self._is_normalized:
return self
norm_parsed = []
needed_normalization = False
for v in self.parsed:
if v in self._alias_lut:
norm_parsed.extend(self._alias_lut.get(v))
needed_normalization = True
else:
# Handle concise slice notation
if ':' in v:
root, start, stop, step = _parse_concise_slice_syntax(v)
for idx in range(start, stop, step):
norm_parsed.append('{}.{}'.format(root, idx))
needed_normalization = True
else:
norm_parsed.append(v)
if not needed_normalization:
# If we went through the normalized process and we didn't need it
# update ourself so we don't redo the work.
self._is_normalized = True
return self
normed = FusedChannelSpec(norm_parsed, _is_normalized=True)
return normed
[docs]
def numel(self):
"""
Total number of channels in this spec
"""
if self._is_normalized:
return len(self.parsed)
else:
return sum(self.sizes())
[docs]
def sizes(self):
"""
Returns a list indicating the size of each atomic code
Returns:
List[int]
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> self = FusedChannelSpec.coerce('b1|Z:3|b2|b3|rgb')
>>> self.sizes()
[1, 3, 1, 1, 3]
>>> assert(FusedChannelSpec.parse('a.0').numel()) == 1
>>> assert(FusedChannelSpec.parse('a:0').numel()) == 0
>>> assert(FusedChannelSpec.parse('a:1').numel()) == 1
"""
if self._is_normalized:
return [1] * len(self.parsed)
size_list = []
for v in self.parsed:
if v in self._alias_lut:
num = len(self._alias_lut.get(v))
else:
if ':' in v:
root, start, stop, step = _parse_concise_slice_syntax(v)
num = len(range(start, stop, step))
else:
num = 1
size_list.append(num)
return size_list
def __contains__(self, key):
"""
Example:
>>> FCS = FusedChannelSpec.coerce
>>> 'disparity' in FCS('rgb|disparity|flowx|flowy')
True
>>> 'gray' in FCS('rgb|disparity|flowx|flowy')
False
"""
return key in self.unique()
# def can_coerce(self, other):
# # return if we can coerce this band repr to another, like
# # gray to rgb or rgb to gray
[docs]
def code_list(self):
"""
Return the expanded code list
"""
return self.parsed
# @ub.memoize_property
# def code_oset(self):
# return ub.oset(self.normalize().parsed)
[docs]
@ub.memoize_method
def as_list(self):
return self.normalize().parsed
[docs]
@ub.memoize_method
def as_oset(self):
return ub.oset(self.normalize().parsed)
[docs]
@ub.memoize_method
def as_set(self):
return set(self.normalize().parsed)
# TODO: deprecate "as" methods in favor of "to" methods
to_set = as_set
to_oset = as_oset
to_list = as_list
[docs]
def as_path(self):
"""
Returns a string suitable for use in a path.
Note, this may no longer be a valid channel spec
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> self = FusedChannelSpec.coerce('b1|Z:3|b2|b3|rgb')
>>> self.as_path()
b1_Z..3_b2_b3_rgb
"""
return self.path_sanitize()
def __set__(self):
return self.as_set()
[docs]
def difference(self, other):
"""
Set difference
Example:
>>> FCS = FusedChannelSpec.coerce
>>> self = FCS('rgb|disparity|flowx|flowy')
>>> other = FCS('r|b')
>>> self.difference(other)
>>> other = FCS('flowx')
>>> self.difference(other)
>>> FCS = FusedChannelSpec.coerce
>>> assert len((FCS('a') - {'a'}).parsed) == 0
>>> assert len((FCS('a.0:3') - {'a.0'}).parsed) == 2
"""
try:
other_norm = ub.oset(other.normalize().parsed)
except Exception:
other_norm = other
self_norm = ub.oset(self.normalize().parsed)
new_parsed = list(self_norm - other_norm)
new = self.__class__(new_parsed, _is_normalized=True)
return new
[docs]
def intersection(self, other):
"""
Example:
>>> FCS = FusedChannelSpec.coerce
>>> self = FCS('rgb|disparity|flowx|flowy')
>>> other = FCS('r|b|XX')
>>> self.intersection(other)
"""
try:
other_norm = ub.oset(other.normalize().parsed)
except Exception:
other_norm = other
self_norm = ub.oset(self.normalize().parsed)
new_parsed = list(self_norm & other_norm)
new = self.__class__(new_parsed, _is_normalized=True)
return new
[docs]
def union(self, other):
"""
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> FCS = FusedChannelSpec.coerce
>>> self = FCS('rgb|disparity|flowx|flowy')
>>> other = FCS('r|b|XX')
>>> self.union(other)
"""
try:
other_norm = ub.oset(other.normalize().parsed)
except Exception:
other_norm = other
self_norm = ub.oset(self.normalize().parsed)
new_parsed = list(self_norm | other_norm)
new = self.__class__(new_parsed, _is_normalized=True)
return new
[docs]
def issubset(self, other):
try:
other_norm = ub.oset(other.normalize().parsed)
except Exception:
other_norm = other
self_norm = ub.oset(self.normalize().parsed)
return self_norm.issubset(other_norm)
[docs]
def issuperset(self, other):
try:
other_norm = ub.oset(other.normalize().parsed)
except Exception:
other_norm = other
self_norm = ub.oset(self.normalize().parsed)
return self_norm.issuperset(other_norm)
[docs]
def component_indices(self, axis=2):
"""
Look up component indices within this stream
Example:
>>> FCS = FusedChannelSpec.coerce
>>> self = FCS('disparity|rgb|flowx|flowy')
>>> component_indices = self.component_indices()
>>> print('component_indices = {}'.format(ub.urepr(component_indices, nl=1, _dict_sort_behavior='old')))
component_indices = {
'disparity': (slice(...), slice(...), slice(0, 1, None)),
'flowx': (slice(...), slice(...), slice(4, 5, None)),
'flowy': (slice(...), slice(...), slice(5, 6, None)),
'rgb': (slice(...), slice(...), slice(1, 4, None)),
}
"""
component_indices = dict()
idx1 = 0
for part in self.parsed:
size = self._size_lut.get(part, 1)
idx2 = idx1 + size
index = tuple([slice(None)] * axis + [slice(idx1, idx2)])
idx1 = idx2
component_indices[part] = index
return component_indices
[docs]
def streams(self):
"""
Idempotence with :func:`ChannelSpec.streams`
"""
return [self]
[docs]
def fuse(self):
"""
Idempotence with :func:`ChannelSpec.streams`
"""
return self
[docs]
class ChannelSpec(BaseChannelSpec):
"""
Parse and extract information about network input channel specs for
early or late fusion networks.
Behaves like a dictionary of FusedChannelSpec objects
TODO:
- [ ] Rename to something that indicates this is a collection of
FusedChannelSpec? MultiChannelSpec?
Note:
This class name and API is in flux and subject to change.
Note:
The pipe ('|') character represents an early-fused input stream, and
order matters (it is non-communative).
The comma (',') character separates different inputs streams/branches
for a multi-stream/branch network which will be lated fused. Order does
not matter
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> # Integer spec
>>> ChannelSpec.coerce(3)
<ChannelSpec(u0|u1|u2) ...>
>>> # single mode spec
>>> ChannelSpec.coerce('rgb')
<ChannelSpec(rgb) ...>
>>> # early fused input spec
>>> ChannelSpec.coerce('rgb|disprity')
<ChannelSpec(rgb|disprity) ...>
>>> # late fused input spec
>>> ChannelSpec.coerce('rgb,disprity')
<ChannelSpec(rgb,disprity) ...>
>>> # early and late fused input spec
>>> ChannelSpec.coerce('rgb|ir,disprity')
<ChannelSpec(rgb|ir,disprity) ...>
Example:
>>> self = ChannelSpec('gray')
>>> print('self.info = {}'.format(ub.urepr(self.info, nl=1)))
>>> self = ChannelSpec('rgb')
>>> print('self.info = {}'.format(ub.urepr(self.info, nl=1)))
>>> self = ChannelSpec('rgb|disparity')
>>> print('self.info = {}'.format(ub.urepr(self.info, nl=1)))
>>> self = ChannelSpec('rgb|disparity,disparity')
>>> print('self.info = {}'.format(ub.urepr(self.info, nl=1)))
>>> self = ChannelSpec('rgb,disparity,flowx|flowy')
>>> print('self.info = {}'.format(ub.urepr(self.info, nl=1)))
Example:
>>> specs = [
>>> 'rgb', # and rgb input
>>> 'rgb|disprity', # rgb early fused with disparity
>>> 'rgb,disprity', # rgb early late with disparity
>>> 'rgb|ir,disprity', # rgb early fused with ir and late fused with disparity
>>> 3, # 3 unknown channels
>>> ]
>>> for spec in specs:
>>> print('=======================')
>>> print('spec = {!r}'.format(spec))
>>> #
>>> self = ChannelSpec.coerce(spec)
>>> print('self = {!r}'.format(self))
>>> sizes = self.sizes()
>>> print('sizes = {!r}'.format(sizes))
>>> print('self.info = {}'.format(ub.urepr(self.info, nl=1)))
>>> #
>>> item = self._demo_item((1, 1), rng=0)
>>> inputs = self.encode(item)
>>> components = self.decode(inputs)
>>> input_shapes = ub.map_vals(lambda x: x.shape, inputs)
>>> component_shapes = ub.map_vals(lambda x: x.shape, components)
>>> print('item = {}'.format(ub.urepr(item, precision=1)))
>>> print('inputs = {}'.format(ub.urepr(inputs, precision=1)))
>>> print('input_shapes = {}'.format(ub.urepr(input_shapes)))
>>> print('components = {}'.format(ub.urepr(components, precision=1)))
>>> print('component_shapes = {}'.format(ub.urepr(component_shapes, nl=1)))
"""
def __init__(self, spec, parsed=None):
# TODO: allow integer specs
self._spec = spec
self._info = {
'spec': spec,
'parsed': parsed,
}
@property
def spec(self):
return self._spec
def __contains__(self, key):
"""
Example:
>>> 'disparity' in ChannelSpec('rgb,disparity,flowx|flowy')
True
>>> 'gray' in ChannelSpec('rgb,disparity,flowx|flowy')
False
"""
return key in self.unique()
@property
def info(self):
return ub.dict_union(self._info, {
'unique': self.unique(),
'normed': self.normalize(),
})
[docs]
@classmethod
def coerce(cls, data):
"""
Attempt to interpret the data as a channel specification
Returns:
ChannelSpec
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> data = FusedChannelSpec.coerce(3)
>>> assert ChannelSpec.coerce(data).spec == 'u0|u1|u2'
>>> data = ChannelSpec.coerce(3)
>>> assert data.spec == 'u0|u1|u2'
>>> assert ChannelSpec.coerce(data).spec == 'u0|u1|u2'
>>> data = ChannelSpec.coerce('u:3')
>>> assert data.normalize().spec == 'u.0|u.1|u.2'
"""
if isinstance(data, cls):
self = data
return self
elif isinstance(data, FusedChannelSpec):
spec = data.spec
parsed = {spec: data}
self = cls(spec, parsed)
return self
else:
if isinstance(data, int):
# we know the number of channels, but not their names
spec = '|'.join(['u{}'.format(i) for i in range(data)])
elif isinstance(data, str):
spec = data
else:
raise TypeError('type(data)={}, data={!r}'.format(
type(data), data))
self = cls(spec)
return self
[docs]
def parse(self):
"""
Build internal representation
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> self = ChannelSpec('b1|b2|b3|rgb,B:3')
>>> print(self.parse())
>>> print(self.normalize().parse())
>>> ChannelSpec('').parse()
Example:
>>> base = ChannelSpec('rgb|disparity,flowx|r|flowy')
>>> other = ChannelSpec('rgb')
>>> self = base.intersection(other)
>>> assert self.numel() == 4
"""
if self._info.get('parsed', None) is None:
# commas break inputs into multiple streams
stream_specs = self.spec.split(',')
# parsed = {ss: ss.split('|') for ss in stream_specs}
parsed = {
ss: FusedChannelSpec(ss.split('|'))
for ss in stream_specs if ss
}
self._info['parsed'] = parsed
return self._info['parsed']
[docs]
def concise(self):
"""
Example:
>>> self = ChannelSpec('b1|b2,b3|rgb|B.0,B.1|B.2')
>>> print(self.concise().spec)
b1|b2,b3|r|g|b|B.0,B.1:3
"""
new_parsed = {}
for k1, v1 in self.parse().items():
norm_vals = v1.concise()
norm_key = norm_vals.spec
new_parsed[norm_key] = norm_vals
new_spec = ','.join(list(new_parsed.keys()))
short = ChannelSpec(new_spec, parsed=new_parsed)
return short
[docs]
def normalize(self):
"""
Replace aliases with explicit single-band-per-code specs
Returns:
ChannelSpec : normalized spec
Example:
>>> self = ChannelSpec('b1|b2,b3|rgb,B:3')
>>> normed = self.normalize()
>>> print('self = {}'.format(self))
>>> print('normed = {}'.format(normed))
self = <ChannelSpec(b1|b2,b3|rgb,B:3)>
normed = <ChannelSpec(b1|b2,b3|r|g|b,B.0|B.1|B.2)>
"""
new_parsed = {}
for k1, v1 in self.parse().items():
norm_vals = v1.normalize()
norm_key = norm_vals.spec
new_parsed[norm_key] = norm_vals
new_spec = ','.join(list(new_parsed.keys()))
normed = ChannelSpec(new_spec, parsed=new_parsed)
return normed
[docs]
def keys(self):
spec = self.spec
stream_specs = spec.split(',')
for spec in stream_specs:
yield spec
[docs]
def values(self):
return self.parse().values()
[docs]
def items(self):
return self.parse().items()
[docs]
def fuse(self):
"""
Fuse all parts into an early fused channel spec
Returns:
FusedChannelSpec
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> self = ChannelSpec.coerce('b1|b2,b3|rgb,B:3')
>>> fused = self.fuse()
>>> print('self = {}'.format(self))
>>> print('fused = {}'.format(fused))
self = <ChannelSpec(b1|b2,b3|rgb,B:3)>
fused = <FusedChannelSpec(b1|b2|b3|rgb|B:3)>
"""
parts = self.streams()
if len(parts) == 1:
return parts[0]
else:
return FusedChannelSpec(list(ub.flatten([p.parsed for p in parts])))
[docs]
def streams(self):
"""
Breaks this spec up into one spec for each early-fused input stream
Example:
self = ChannelSpec.coerce('r|g,B1|B2,fx|fy')
list(map(len, self.streams()))
"""
streams = [FusedChannelSpec.coerce(spec) for spec in self.keys()]
return streams
[docs]
def code_list(self):
parsed = self.parse()
if len(parsed) > 1:
raise Exception(
'Can only work on single-streams. '
'TODO make class for single streams')
return ub.peek(parsed.values())
[docs]
def as_path(self):
"""
Returns a string suitable for use in a path.
Note, this may no longer be a valid channel spec
Example:
>>> from delayed_image.channel_spec import *
>>> self = ChannelSpec('rgb|disparity,flowx|r|flowy')
>>> self.as_path()
rgb_disparity,flowx_r_flowy
"""
return self.path_sanitize()
[docs]
def difference(self, other):
"""
Set difference. Remove all instances of other channels from
this set of channels.
Example:
>>> from delayed_image.channel_spec import *
>>> self = ChannelSpec('rgb|disparity,flowx|r|flowy')
>>> other = ChannelSpec('rgb')
>>> print(self.difference(other))
>>> other = ChannelSpec('flowx')
>>> print(self.difference(other))
<ChannelSpec(disparity,flowx|flowy)>
<ChannelSpec(r|g|b|disparity,r|flowy)>
Example:
>>> from delayed_image.channel_spec import *
>>> self = ChannelSpec('a|b,c|d')
>>> new = self - {'a', 'b'}
>>> len(new.sizes()) == 1
>>> empty = new - 'c|d'
>>> assert empty.numel() == 0
"""
# assert len(list(other.keys())) == 1, 'can take diff with one stream'
try:
other_norm = ChannelSpec.coerce(other).fuse().normalize()
except Exception:
other_norm = other
self_norm = self.normalize()
new_streams = []
for parts in self_norm.values():
new_stream = parts.difference(other_norm)
if len(new_stream.parsed) > 0:
new_streams.append(new_stream)
new_spec = ','.join([s.spec for s in new_streams])
new = self.__class__(new_spec)
return new
[docs]
def intersection(self, other):
"""
Set difference. Remove all instances of other channels from
this set of channels.
Example:
>>> from delayed_image.channel_spec import *
>>> self = ChannelSpec('rgb|disparity,flowx|r|flowy')
>>> other = ChannelSpec('rgb')
>>> new = self.intersection(other)
>>> print(new)
>>> print(new.numel())
>>> other = ChannelSpec('flowx')
>>> new = self.intersection(other)
>>> print(new)
>>> print(new.numel())
<ChannelSpec(r|g|b,r)>
4
<ChannelSpec(flowx)>
1
"""
# assert len(list(other.keys())) == 1, 'can take diff with one stream'
try:
other_norm = ChannelSpec.coerce(other).fuse().normalize()
except Exception:
other_norm = other
self_norm = self.normalize()
new_streams = []
for parts in self_norm.values():
new_stream = parts.intersection(other_norm)
if len(new_stream.parsed) > 0:
new_streams.append(new_stream)
new_spec = ','.join([s.spec for s in new_streams])
new = self.__class__(new_spec)
return new
[docs]
def union(self, other):
"""
Union simply tags on a second channel spec onto this one.
Duplicates are maintained.
Example:
>>> from delayed_image.channel_spec import *
>>> self = ChannelSpec('rgb|disparity,flowx|r|flowy')
>>> other = ChannelSpec('rgb')
>>> new = self.union(other)
>>> print(new)
>>> print(new.numel())
>>> other = ChannelSpec('flowx')
>>> new = self.union(other)
>>> print(new)
>>> print(new.numel())
<ChannelSpec(r|g|b|disparity,flowx|r|flowy,r|g|b)>
10
<ChannelSpec(r|g|b|disparity,flowx|r|flowy,flowx)>
8
"""
# assert len(list(other.keys())) == 1, 'can take diff with one stream'
try:
other_norm = ChannelSpec.coerce(other).normalize()
except Exception:
other_norm = other
self_norm = self.normalize()
new_streams = list(self_norm.values())
new_streams += list(other_norm.values())
new_spec = ','.join([s.spec for s in new_streams])
new = self.__class__(new_spec)
return new
[docs]
def issubset(self, other):
raise NotImplementedError('Implemention of subset for a general channel spec is unclear. The fused implemenation does exist')
[docs]
def issuperset(self, other):
raise NotImplementedError('Implemention of superset for a general channel spec is unclear. The fused implemenation does exist')
[docs]
def numel(self):
"""
Total number of channels in this spec
"""
return sum(self.sizes().values())
[docs]
def sizes(self):
"""
Number of dimensions for each fused stream channel
IE: The EARLY-FUSED channel sizes
Example:
>>> self = ChannelSpec('rgb|disparity,flowx|flowy,B:10')
>>> self.normalize().concise()
>>> self.sizes()
"""
sizes = {
key: vals.numel()
for key, vals in self.parse().items()
}
return sizes
[docs]
def unique(self, normalize=False):
"""
Returns the unique channels that will need to be given or loaded
"""
import warnings
if normalize:
warnings.warn(
'FIXME: These kwargs are broken, but does anything use it?')
if normalize:
return set(ub.flatten(self.parse().values()))
else:
return set(ub.flatten(self.normalize().values()))
[docs]
def _item_shapes(self, dims):
"""
Expected shape for an input item
Args:
dims (Tuple[int, int]): the spatial dimension
Returns:
Dict[int, tuple]
"""
item_shapes = {}
parsed = self.parse()
fused_keys = list(self.keys())
for fused_key in fused_keys:
components = parsed[fused_key]
for mode_key, c in zip(components.parsed, components.sizes()):
shape = (c,) + tuple(dims)
item_shapes[mode_key] = shape
return item_shapes
[docs]
def _demo_item(self, dims=(4, 4), rng=None):
"""
Create an input that satisfies this spec
Returns:
dict: an item like it might appear when its returned from the
`__getitem__` method of a :class:`torch...Dataset`.
Example:
>>> dims = (1, 1)
>>> ChannelSpec.coerce(3)._demo_item(dims, rng=0)
>>> ChannelSpec.coerce('r|g|b|disaprity')._demo_item(dims, rng=0)
>>> ChannelSpec.coerce('rgb|disaprity')._demo_item(dims, rng=0)
>>> ChannelSpec.coerce('rgb,disaprity')._demo_item(dims, rng=0)
>>> ChannelSpec.coerce('rgb')._demo_item(dims, rng=0)
>>> ChannelSpec.coerce('gray')._demo_item(dims, rng=0)
"""
import kwarray
rng = kwarray.ensure_rng(rng)
item_shapes = self._item_shapes(dims)
item = {
key: rng.rand(*shape)
for key, shape in item_shapes.items()
}
return item
[docs]
def encode(self, item, axis=0, mode=1):
"""
Given a dictionary containing preloaded components of the network
inputs, build a concatenated (fused) network representations of each
input stream.
Args:
item (Dict[str, Tensor]): a batch item containing unfused parts.
each key should be a single-stream (optionally early fused)
channel key.
axis (int, default=0): concatenation dimension
Returns:
Dict[str, Tensor]:
mapping between input stream and its early fused tensor input.
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> import numpy as np
>>> dims = (4, 4)
>>> item = {
>>> 'rgb': np.random.rand(3, *dims),
>>> 'disparity': np.random.rand(1, *dims),
>>> 'flowx': np.random.rand(1, *dims),
>>> 'flowy': np.random.rand(1, *dims),
>>> }
>>> # Complex Case
>>> self = ChannelSpec('rgb,disparity,rgb|disparity|flowx|flowy,flowx|flowy')
>>> fused = self.encode(item)
>>> input_shapes = ub.map_vals(lambda x: x.shape, fused)
>>> print('input_shapes = {}'.format(ub.urepr(input_shapes, nl=1)))
>>> # Simpler case
>>> self = ChannelSpec('rgb|disparity')
>>> fused = self.encode(item)
>>> input_shapes = ub.map_vals(lambda x: x.shape, fused)
>>> print('input_shapes = {}'.format(ub.urepr(input_shapes, nl=1)))
Example:
>>> # Case where we have to break up early fused data
>>> import numpy as np
>>> dims = (40, 40)
>>> item = {
>>> 'rgb|disparity': np.random.rand(4, *dims),
>>> 'flowx': np.random.rand(1, *dims),
>>> 'flowy': np.random.rand(1, *dims),
>>> }
>>> # Complex Case
>>> self = ChannelSpec('rgb,disparity,rgb|disparity,rgb|disparity|flowx|flowy,flowx|flowy,flowx,disparity')
>>> inputs = self.encode(item)
>>> input_shapes = ub.map_vals(lambda x: x.shape, inputs)
>>> print('input_shapes = {}'.format(ub.urepr(input_shapes, nl=1)))
>>> # xdoctest: +REQUIRES(--bench)
>>> #self = ChannelSpec('rgb|disparity,flowx|flowy')
>>> import timerit
>>> ti = timerit.Timerit(100, bestof=10, verbose=2)
>>> for timer in ti.reset('mode=simple'):
>>> with timer:
>>> inputs = self.encode(item, mode=0)
>>> for timer in ti.reset('mode=minimize-concat'):
>>> with timer:
>>> inputs = self.encode(item, mode=1)
Ignore:
import xdev
_ = xdev.profile_now(self.encode)(item, mode=1)
"""
import kwarray
if len(item) == 0:
raise ValueError('Cannot encode empty item')
_impl = kwarray.ArrayAPI.coerce(ub.peek(item.values()))
parsed = self.parse()
# unique = self.unique()
# TODO: This can be made much more efficient by determining if the
# channels item can be directly translated to the result inputs. We
# probably don't need to do the full decoding each and every time.
if mode == 1:
# Slightly more complex implementation that attempts to minimize
# concat operations.
item_keys = tuple(sorted(item.keys()))
parsed_items = tuple(sorted([(k, tuple(v.parsed))
for k, v in parsed.items()]))
new_fused_indices = _cached_single_fused_mapping(
item_keys, parsed_items, axis=axis)
fused = {}
for key, idx_list in new_fused_indices.items():
parts = [item[item_key][item_sl] for item_key, item_sl in idx_list]
if len(parts) == 1:
fused[key] = parts[0]
else:
fused[key] = _impl.cat(parts, axis=axis)
elif mode == 0:
# Simple implementation that always does the full break down of
# item components.
components = {}
# Determine the layout of the channels in the input item
key_specs = {key: ChannelSpec(key) for key in item.keys()}
for key, spec in key_specs.items():
decoded = spec.decode({key: item[key]}, axis=axis)
for subkey, subval in decoded.items():
components[subkey] = subval
fused = {}
for key, parts in parsed.items():
fused[key] = _impl.cat([components[k] for k in parts], axis=axis)
else:
raise KeyError(mode)
return fused
[docs]
def decode(self, inputs, axis=1):
"""
break an early fused item into its components
Args:
inputs (Dict[str, Tensor]): dictionary of components
axis (int, default=1): channel dimension
Example:
>>> from delayed_image.channel_spec import * # NOQA
>>> import numpy as np
>>> dims = (4, 4)
>>> item_components = {
>>> 'rgb': np.random.rand(3, *dims),
>>> 'ir': np.random.rand(1, *dims),
>>> }
>>> self = ChannelSpec('rgb|ir')
>>> item_encoded = self.encode(item_components)
>>> batch = {k: np.concatenate([v[None, :], v[None, :]], axis=0)
... for k, v in item_encoded.items()}
>>> components = self.decode(batch)
Example:
>>> # xdoctest: +REQUIRES(module:netharn, module:torch)
>>> import torch
>>> import numpy as np
>>> dims = (4, 4)
>>> components = {
>>> 'rgb': np.random.rand(3, *dims),
>>> 'ir': np.random.rand(1, *dims),
>>> }
>>> components = ub.map_vals(torch.from_numpy, components)
>>> self = ChannelSpec('rgb|ir')
>>> encoded = self.encode(components)
>>> from netharn.data import data_containers
>>> item = {k: data_containers.ItemContainer(v, stack=True)
>>> for k, v in encoded.items()}
>>> batch = data_containers.container_collate([item, item])
>>> components = self.decode(batch)
"""
parsed = self.parse()
components = dict()
for key, parts in parsed.items():
idx1 = 0
for part, size in zip(parts.parsed, parts.sizes()):
# size = self._size_lut.get(part, 1)
idx2 = idx1 + size
fused = inputs[key]
index = tuple([slice(None)] * axis + [slice(idx1, idx2)])
component = fused[index]
components[part] = component
idx1 = idx2
return components
[docs]
def component_indices(self, axis=2):
"""
Look up component indices within fused streams
Example:
>>> dims = (4, 4)
>>> inputs = ['flowx', 'flowy', 'disparity']
>>> self = ChannelSpec('disparity,flowx|flowy')
>>> component_indices = self.component_indices()
>>> print('component_indices = {}'.format(ub.urepr(component_indices, nl=1)))
component_indices = {
'disparity': ('disparity', (slice(None, None, None), slice(None, None, None), slice(0, 1, None))),
'flowx': ('flowx|flowy', (slice(None, None, None), slice(None, None, None), slice(0, 1, None))),
'flowy': ('flowx|flowy', (slice(None, None, None), slice(None, None, None), slice(1, 2, None))),
}
"""
parsed = self.parse()
component_indices = dict()
for key, parts in parsed.items():
idx1 = 0
for part, size in zip(parts.parsed, parts.sizes()):
idx2 = idx1 + size
index = tuple([slice(None)] * axis + [slice(idx1, idx2)])
idx1 = idx2
component_indices[part] = (key, index)
return component_indices
@functools.lru_cache(maxsize=None)
def _cached_single_fused_mapping(item_keys, parsed_items, axis=0):
item_indices = {}
for key in item_keys:
key_idxs = _cached_single_stream_idxs(key, axis=axis)
for subkey, subsl in key_idxs.items():
item_indices[subkey] = subsl
fused_indices = {}
for key, parts in parsed_items:
fused_indices[key] = [item_indices[k] for k in parts]
new_fused_indices = {}
for key, idx_list in fused_indices.items():
# Determine which continguous slices can be merged into a
# single slice
prev_key = None
prev_sl = None
accepted = []
accum = []
for item_key, item_sl in idx_list:
if prev_key == item_key:
if prev_sl.stop == item_sl[-1].start and prev_sl.step == item_sl[-1].step:
accum.append((item_key, item_sl))
continue
if accum:
accepted.append(accum)
accum = []
prev_key = item_key
prev_sl = item_sl[-1]
accum.append((item_key, item_sl))
if accum:
accepted.append(accum)
accum = []
# Merge the accumulated contiguous slices
new_idx_list = []
for accum in accepted:
if len(accum) > 1:
item_key = accum[0][0]
first = accum[0][1]
last = accum[-1][1]
new_sl = list(first)
new_sl[-1] = slice(first[-1].start, last[-1].stop, last[-1].step)
new_sl = tuple(new_sl)
new_idx_list.append((item_key, new_sl))
else:
new_idx_list.append(accum[0])
val = new_idx_list
new_fused_indices[key] = val
return new_fused_indices
@functools.lru_cache(maxsize=None)
def _cached_single_stream_idxs(key, axis=0):
"""
Ignore:
hack for speed
axis = 0
key = 'rgb|disparity'
# xdoctest: +REQUIRES(--bench)
import timerit
ti = timerit.Timerit(100, bestof=10, verbose=2)
for timer in ti.reset('time'):
with timer:
_cached_single_stream_idxs(key, axis=axis)
for timer in ti.reset('time'):
with timer:
ChannelSpec(key).component_indices(axis=axis)
"""
# concat operations.
key_idxs = ChannelSpec(key).component_indices(axis=axis)
return key_idxs
def subsequence_index(oset1, oset2):
"""
Returns a slice into the first items indicating the position of
the second items if they exist.
This is a variant of the substring problem.
Returns:
None | slice
Example:
>>> oset1 = ub.oset([1, 2, 3, 4, 5, 6])
>>> oset2 = ub.oset([2, 3, 4])
>>> index = subsequence_index(oset1, oset2)
>>> assert index
>>> oset1 = ub.oset([1, 2, 3, 4, 5, 6])
>>> oset2 = ub.oset([2, 4, 3])
>>> index = subsequence_index(oset1, oset2)
>>> assert not index
"""
if len(oset2) == 0:
base = 0
else:
item1 = oset2[0]
try:
base = oset1.index(item1)
except (IndexError, KeyError):
base = None
index = None
if base is not None:
sl = slice(base, base + len(oset2))
subset = oset1[sl]
if subset == oset2:
index = sl
return index
def _parse_concise_slice_syntax(v):
"""
Helper for our slice syntax, which is may be a bit strange
Example:
>>> print(_parse_concise_slice_syntax('B:10'))
>>> print(_parse_concise_slice_syntax('B.0:10:3'))
>>> print(_parse_concise_slice_syntax('B.:10:3'))
>>> print(_parse_concise_slice_syntax('B::10:3'))
>>> # Careful, this next one is quite different
>>> print(_parse_concise_slice_syntax('B:10:3'))
>>> print(_parse_concise_slice_syntax('B:3:10:3'))
>>> print(_parse_concise_slice_syntax('B.:10'))
>>> print(_parse_concise_slice_syntax('B.:3:'))
>>> print(_parse_concise_slice_syntax('B.:3:2'))
>>> print(_parse_concise_slice_syntax('B::2:3'))
>>> print(_parse_concise_slice_syntax('B.0:10:3'))
>>> print(_parse_concise_slice_syntax('B.:10:3'))
('B', 0, 10, 1)
('B', 0, 10, 3)
('B', 0, 10, 3)
('B', 0, 10, 3)
('B', 10, 3, 1)
('B', 3, 10, 3)
('B', 0, 10, 1)
('B', 0, 3, 1)
('B', 0, 3, 2)
('B', 0, 2, 3)
('B', 0, 10, 3)
('B', 0, 10, 3)
>>> import pytest
>>> with pytest.raises(ValueError):
>>> _parse_concise_slice_syntax('B.0')
>>> with pytest.raises(ValueError):
>>> _parse_concise_slice_syntax('B0')
>>> with pytest.raises(ValueError):
>>> _parse_concise_slice_syntax('B:')
>>> with pytest.raises(ValueError):
>>> _parse_concise_slice_syntax('B:0.10')
>>> with pytest.raises(ValueError):
>>> _parse_concise_slice_syntax('B.::')
"""
# The separator can be a ':' or a '.'
if '.' in v:
root, slice_suffix = v.split('.', 1)
slice_args = slice_suffix.split(':')
if len(slice_args) <= 1:
raise ValueError('invalid slice syntax: {}'.format(v))
else:
# import warnings
# warnings.warn('It is recommended to use . as the getitem op')
root, slice_suffix = v.split(':', 1)
slice_args = slice_suffix.split(':')
if len(slice_args) == 1:
start = 0
stop, = map(int, slice_args)
step = 1
elif len(slice_args) == 2:
start = int(slice_args[0]) if slice_args[0] else 0
stop = int(slice_args[1]) if slice_args[1] else None
step = 1
elif len(slice_args) == 3:
start = int(slice_args[0]) if slice_args[0] else 0
stop = int(slice_args[1]) if slice_args[1] else None
step = int(slice_args[2]) if slice_args[2] else 1
else:
raise ValueError('invalid slice syntax: {}'.format(v))
if stop is None:
raise ValueError('Must explicitly specify the endpoint: {}'.format(v))
CHECK_ERRORS = 1
if CHECK_ERRORS:
if '.' in root or ':' in root:
raise ValueError('invalid slice syntax: {}'.format(v))
return root, start, stop, step
def _path_sanitize_v2(path, maxlen=128, hash_suffix=None):
"""
Clean input text so it can be used as a path.
Args:
path (str): the path name to santaize
maxlen (int | None):
if specified, and the name is longer than this length, it is
shortened. Must be 8 or greater.
hash_suffix (str | None | callable):
if specified, add an extra suffix to the name if it was hashed.
Can also be a callable.
Returns:
str: path suitable for usage in a filename
Example:
>>> from delayed_image.channel_spec import _path_sanitize_v2
>>> print(_path_sanitize_v2('a chan with space|bar|baz'))
a chan with space_bar_baz
>>> print(_path_sanitize_v2('dont|use<these>chars:in?a*path.'))
dont_use-LT-these-GT-chars..in_Q_a_S_path._
>>> print(_path_sanitize_v2('dont|use<these>chars:in?a*path.', maxlen=8))
iderkhwc
"""
# https://stackoverflow.com/questions/1976007/what-characters-are-forbidden-in-windows-and-linux-directory-names
illegal_character_mapping = {
'|': '_',
'<': '-LT-',
'>': '-GT-',
':': '..',
'?': '_Q_',
'*': '_S_',
}
new_name = path
for k, v in illegal_character_mapping.items():
new_name = new_name.replace(k, v)
if new_name.endswith(('.', ' ')):
# filenames cannot end with a dot or space.
new_name = new_name + '_'
if maxlen is not None and len(new_name) > maxlen:
# prevent long names for docker (limit is 242 chars)
hashlen = maxlen - 2
hashlen = max(8, hashlen)
hashstr = ub.hash_data(new_name, base='abc')[0:hashlen]
if hash_suffix is not None:
if callable(hash_suffix):
hash_suffix = hash_suffix()
new_name = '{}_{}'.format(hashstr, hash_suffix)
else:
new_name = hashstr
return new_name
def oset_insert(self, index, obj):
"""
Ignore:
self = ub.oset()
oset_insert(self, 0, 'a')
oset_insert(self, 0, 'b')
oset_insert(self, 0, 'c')
oset_insert(self, 1, 'd')
oset_insert(self, 2, 'e')
oset_insert(self, 0, 'f')
"""
if obj not in self:
# Bump index of every item after the insert position
for key in self.items[index:]:
self.map[key] = self.map[key] + 1
self.items.insert(index, obj)
self.map[obj] = index
def oset_delitem(self, index):
"""
for ubelt oset, todo contribute back to luminosoinsight
>>> self = ub.oset([1, 2, 3, 4, 5, 6, 7, 8, 9])
>>> index = slice(3, 5)
>>> oset_delitem(self, index)
Ignore:
self = ub.oset(['r', 'g', 'b', 'disparity'])
index = slice(0, 3)
oset_delitem(self, index)
"""
if isinstance(index, slice) and index == ub.orderedset.SLICE_ALL:
self.clear()
else:
if ub.orderedset.is_iterable(index):
to_remove = [self.items[i] for i in index]
elif isinstance(index, slice) or hasattr(index, "__index__"):
to_remove = self.items[index]
else:
raise TypeError("Don't know how to index an OrderedSet by %r" % index)
if isinstance(to_remove, list):
# Modified version of discard slightly more efficient for multiple
# items
remove_idxs = sorted([self.map[key] for key in to_remove], reverse=True)
for key in to_remove:
del self.map[key]
for idx in remove_idxs:
del self.items[idx]
for k, v in self.map.items():
# I think there is a more efficient way to do this?
num_after = sum(v >= i for i in remove_idxs)
if num_after:
self.map[k] = v - num_after
else:
self.discard(to_remove)