kwcoco.channel_spec
¶
The ChannelSpec has these simple rules:
* each 1D channel is a alphanumeric string.
* The pipe ('|') separates aligned early fused stremas (non-communative)
* The comma (',') separates late-fused streams, (happens after pipe operations, and is communative)
* Certain common sets of early fused channels have codenames, for example:
rgb = r|g|b
rgba = r|g|b|a
dxdy = dy|dy
For single arrays, the spec is always an early fused spec.
Todo
[X] : normalize representations? e.g: rgb = r|g|b? - OPTIONAL
[X] : rename to BandsSpec or SensorSpec? - REJECTED
[ ] : allow bands to be coerced, i.e. rgb -> gray, or gray->rgb
Module Contents¶
Classes¶
A specific type of channel spec with only one early fused stream. |
|
Parse and extract information about network input channel specs for |
Functions¶
|
|
|
|
|
Returns a slice into the first items indicating the position of |
|
|
|
for ubelt oset, todo contribute back to luminosoinsight |
- class kwcoco.channel_spec.FusedChannelSpec(parsed)[source]¶
Bases:
ubelt.NiceRepr
A specific type of channel spec with only one early fused stream.
The channels in this stream are non-communative
Note
This class name and API is in flux and subject to change.
Todo
A special code indicating a name and some number of bands that that names contains, this would primarilly be used for large numbers of channels produced by a network. Like:
resnet_d35d060_L5:512
or
resnet_d35d060_L5[:512]
might refer to a very specific (hashed) set of resnet parameters with 512 bands
maybe we can do something slicly like:
resnet_d35d060_L5[A:B] resnet_d35d060_L5:A:B
Do we want to “just store the code” and allow for parsing later?
Or do we want to ensure the serialization is parsed before we construct the data structure?
- classmethod coerce(cls, data)[source]¶
Example
>>> FusedChannelSpec.coerce(['a', 'b', 'c']) >>> FusedChannelSpec.coerce('a|b|c') >>> FusedChannelSpec.coerce(3) >>> FusedChannelSpec.coerce(FusedChannelSpec(['a']))
- normalize(self)[source]¶
Replace aliases with explicit single-band-per-code specs
Example
>>> self = FusedChannelSpec.coerce('b1|b2|b3|rgb') >>> normed = self.normalize() >>> print('normed = {}'.format(ub.repr2(normed, nl=1)))
- __contains__(self, key)[source]¶
Example
>>> FCS = FusedChannelSpec.coerce >>> 'disparity' in FCS('rgb|disparity|flowx|flowy') True >>> 'gray' in FCS('rgb|disparity|flowx|flowy') False
- difference(self, other)[source]¶
Set difference
Example
>>> FCS = FusedChannelSpec.coerce >>> self = FCS('rgb|disparity|flowx|flowy') >>> other = FCS('r|b') >>> self.difference(other) >>> other = FCS('flowx') >>> self.difference(other)
- intersection(self, other)[source]¶
Example
>>> FCS = FusedChannelSpec.coerce >>> self = FCS('rgb|disparity|flowx|flowy') >>> other = FCS('r|b|XX') >>> self.intersection(other)
- component_indices(self, axis=2)[source]¶
Look up component indices within this stream
Example
>>> FCS = FusedChannelSpec.coerce >>> self = FCS('disparity|rgb|flowx|flowy') >>> component_indices = self.component_indices() >>> print('component_indices = {}'.format(ub.repr2(component_indices, nl=1))) component_indices = { 'disparity': (slice(...), slice(...), slice(0, 1, None)), 'flowx': (slice(...), slice(...), slice(4, 5, None)), 'flowy': (slice(...), slice(...), slice(5, 6, None)), 'rgb': (slice(...), slice(...), slice(1, 4, None)), }
- class kwcoco.channel_spec.ChannelSpec(spec, parsed=None)[source]¶
Bases:
ubelt.NiceRepr
Parse and extract information about network input channel specs for early or late fusion networks.
Note
This class name and API is in flux and subject to change.
Note
The pipe (‘|’) character represents an early-fused input stream, and order matters (it is non-communative).
The comma (‘,’) character separates different inputs streams/branches for a multi-stream/branch network which will be lated fused. Order does not matter
Example
>>> # Integer spec >>> ChannelSpec.coerce(3) <ChannelSpec(u0|u1|u2) ...>
>>> # single mode spec >>> ChannelSpec.coerce('rgb') <ChannelSpec(rgb) ...>
>>> # early fused input spec >>> ChannelSpec.coerce('rgb|disprity') <ChannelSpec(rgb|disprity) ...>
>>> # late fused input spec >>> ChannelSpec.coerce('rgb,disprity') <ChannelSpec(rgb,disprity) ...>
>>> # early and late fused input spec >>> ChannelSpec.coerce('rgb|ir,disprity') <ChannelSpec(rgb|ir,disprity) ...>
Example
>>> self = ChannelSpec('gray') >>> print('self.info = {}'.format(ub.repr2(self.info, nl=1))) >>> self = ChannelSpec('rgb') >>> print('self.info = {}'.format(ub.repr2(self.info, nl=1))) >>> self = ChannelSpec('rgb|disparity') >>> print('self.info = {}'.format(ub.repr2(self.info, nl=1))) >>> self = ChannelSpec('rgb|disparity,disparity') >>> print('self.info = {}'.format(ub.repr2(self.info, nl=1))) >>> self = ChannelSpec('rgb,disparity,flowx|flowy') >>> print('self.info = {}'.format(ub.repr2(self.info, nl=1)))
Example
>>> specs = [ >>> 'rgb', # and rgb input >>> 'rgb|disprity', # rgb early fused with disparity >>> 'rgb,disprity', # rgb early late with disparity >>> 'rgb|ir,disprity', # rgb early fused with ir and late fused with disparity >>> 3, # 3 unknown channels >>> ] >>> for spec in specs: >>> print('=======================') >>> print('spec = {!r}'.format(spec)) >>> # >>> self = ChannelSpec.coerce(spec) >>> print('self = {!r}'.format(self)) >>> sizes = self.sizes() >>> print('sizes = {!r}'.format(sizes)) >>> print('self.info = {}'.format(ub.repr2(self.info, nl=1))) >>> # >>> item = self._demo_item((1, 1), rng=0) >>> inputs = self.encode(item) >>> components = self.decode(inputs) >>> input_shapes = ub.map_vals(lambda x: x.shape, inputs) >>> component_shapes = ub.map_vals(lambda x: x.shape, components) >>> print('item = {}'.format(ub.repr2(item, precision=1))) >>> print('inputs = {}'.format(ub.repr2(inputs, precision=1))) >>> print('input_shapes = {}'.format(ub.repr2(input_shapes))) >>> print('components = {}'.format(ub.repr2(components, precision=1))) >>> print('component_shapes = {}'.format(ub.repr2(component_shapes, nl=1)))
- __contains__(self, key)[source]¶
Example
>>> 'disparity' in ChannelSpec('rgb,disparity,flowx|flowy') True >>> 'gray' in ChannelSpec('rgb,disparity,flowx|flowy') False
- normalize(self)[source]¶
Replace aliases with explicit single-band-per-code specs
Example
>>> self = ChannelSpec('b1|b2|b3|rgb') >>> self.normalize() >>> list(self.keys())
- difference(self, other)[source]¶
Set difference
Example
>>> self = ChannelSpec('rgb|disparity,flowx|flowy') >>> other = ChannelSpec('rgb') >>> self.difference(other) >>> other = ChannelSpec('flowx') >>> self.difference(other)
- sizes(self)[source]¶
Number of dimensions for each fused stream channel
IE: The EARLY-FUSED channel sizes
Example
>>> self = ChannelSpec('rgb|disparity,flowx|flowy') >>> self.sizes()
- unique(self, normalize=False)[source]¶
Returns the unique channels that will need to be given or loaded
- _item_shapes(self, dims)[source]¶
Expected shape for an input item
- Parameters
dims (Tuple[int, int]) – the spatial dimension
- Returns
Dict[int, tuple]
- _demo_item(self, dims=(4, 4), rng=None)[source]¶
Create an input that satisfies this spec
- Returns
- an item like it might appear when its returned from the
__getitem__ method of a
torch...Dataset
.
- Return type
Example
>>> dims = (1, 1) >>> ChannelSpec.coerce(3)._demo_item(dims, rng=0) >>> ChannelSpec.coerce('r|g|b|disaprity')._demo_item(dims, rng=0) >>> ChannelSpec.coerce('rgb|disaprity')._demo_item(dims, rng=0) >>> ChannelSpec.coerce('rgb,disaprity')._demo_item(dims, rng=0) >>> ChannelSpec.coerce('rgb')._demo_item(dims, rng=0) >>> ChannelSpec.coerce('gray')._demo_item(dims, rng=0)
- encode(self, item, axis=0, mode=1)[source]¶
Given a dictionary containing preloaded components of the network inputs, build a concatenated (fused) network representations of each input stream.
- Parameters
item (Dict[str, Tensor]) – a batch item containing unfused parts. each key should be a single-stream (optionally early fused) channel key.
axis (int, default=0) – concatenation dimension
- Returns
mapping between input stream and its early fused tensor input.
- Return type
Dict[str, Tensor]
Example
>>> from kwcoco.channel_spec import * # NOQA >>> import numpy as np >>> dims = (4, 4) >>> item = { >>> 'rgb': np.random.rand(3, *dims), >>> 'disparity': np.random.rand(1, *dims), >>> 'flowx': np.random.rand(1, *dims), >>> 'flowy': np.random.rand(1, *dims), >>> } >>> # Complex Case >>> self = ChannelSpec('rgb,disparity,rgb|disparity|flowx|flowy,flowx|flowy') >>> fused = self.encode(item) >>> input_shapes = ub.map_vals(lambda x: x.shape, fused) >>> print('input_shapes = {}'.format(ub.repr2(input_shapes, nl=1))) >>> # Simpler case >>> self = ChannelSpec('rgb|disparity') >>> fused = self.encode(item) >>> input_shapes = ub.map_vals(lambda x: x.shape, fused) >>> print('input_shapes = {}'.format(ub.repr2(input_shapes, nl=1)))
Example
>>> # Case where we have to break up early fused data >>> import numpy as np >>> dims = (40, 40) >>> item = { >>> 'rgb|disparity': np.random.rand(4, *dims), >>> 'flowx': np.random.rand(1, *dims), >>> 'flowy': np.random.rand(1, *dims), >>> } >>> # Complex Case >>> self = ChannelSpec('rgb,disparity,rgb|disparity,rgb|disparity|flowx|flowy,flowx|flowy,flowx,disparity') >>> inputs = self.encode(item) >>> input_shapes = ub.map_vals(lambda x: x.shape, inputs) >>> print('input_shapes = {}'.format(ub.repr2(input_shapes, nl=1)))
>>> # xdoctest: +REQUIRES(--bench) >>> #self = ChannelSpec('rgb|disparity,flowx|flowy') >>> import timerit >>> ti = timerit.Timerit(100, bestof=10, verbose=2) >>> for timer in ti.reset('mode=simple'): >>> with timer: >>> inputs = self.encode(item, mode=0) >>> for timer in ti.reset('mode=minimize-concat'): >>> with timer: >>> inputs = self.encode(item, mode=1)
- decode(self, inputs, axis=1)[source]¶
break an early fused item into its components
- Parameters
inputs (Dict[str, Tensor]) – dictionary of components
axis (int, default=1) – channel dimension
Example
>>> from kwcoco.channel_spec import * # NOQA >>> import numpy as np >>> dims = (4, 4) >>> item_components = { >>> 'rgb': np.random.rand(3, *dims), >>> 'ir': np.random.rand(1, *dims), >>> } >>> self = ChannelSpec('rgb|ir') >>> item_encoded = self.encode(item_components) >>> batch = {k: np.concatenate([v[None, :], v[None, :]], axis=0) ... for k, v in item_encoded.items()} >>> components = self.decode(batch)
Example
>>> # xdoctest: +REQUIRES(module:netharn, module:torch) >>> import torch >>> import numpy as np >>> dims = (4, 4) >>> components = { >>> 'rgb': np.random.rand(3, *dims), >>> 'ir': np.random.rand(1, *dims), >>> } >>> components = ub.map_vals(torch.from_numpy, components) >>> self = ChannelSpec('rgb|ir') >>> encoded = self.encode(components) >>> from netharn.data import data_containers >>> item = {k: data_containers.ItemContainer(v, stack=True) >>> for k, v in encoded.items()} >>> batch = data_containers.container_collate([item, item]) >>> components = self.decode(batch)
- component_indices(self, axis=2)[source]¶
Look up component indices within fused streams
Example
>>> dims = (4, 4) >>> inputs = ['flowx', 'flowy', 'disparity'] >>> self = ChannelSpec('disparity,flowx|flowy') >>> component_indices = self.component_indices() >>> print('component_indices = {}'.format(ub.repr2(component_indices, nl=1))) component_indices = { 'disparity': ('disparity', (slice(None, None, None), slice(None, None, None), slice(0, 1, None))), 'flowx': ('flowx|flowy', (slice(None, None, None), slice(None, None, None), slice(0, 1, None))), 'flowy': ('flowx|flowy', (slice(None, None, None), slice(None, None, None), slice(1, 2, None))), }
- kwcoco.channel_spec.subsequence_index(oset1, oset2)[source]¶
Returns a slice into the first items indicating the position of the second items if they exist.
This is a variant of the substring problem.
- Returns
None | slice
Example
>>> oset1 = ub.oset([1, 2, 3, 4, 5, 6]) >>> oset2 = ub.oset([2, 3, 4]) >>> index = subsequence_index(oset1, oset2) >>> assert index
>>> oset1 = ub.oset([1, 2, 3, 4, 5, 6]) >>> oset2 = ub.oset([2, 4, 3]) >>> index = subsequence_index(oset1, oset2) >>> assert not index