Source code for delayed_image.sensorchan_spec

"""
This is an extension of :mod:`delayed_image.channel_spec`, which augments channel
information with an associated sensor attribute. Eventually, this will entirely
replace the channel spec.

Example:
    >>> # xdoctest: +REQUIRES(module:lark)
    >>> # hack for 3.6
    >>> from delayed_image import sensorchan_spec
    >>> import delayed_image
    >>> delayed_image.SensorChanSpec = sensorchan_spec.SensorChanSpec
    >>> self = delayed_image.SensorChanSpec.coerce('sensor0:B1|B8|B8a|B10|B11,sensor1:B11|X.2|Y:2:6,sensor2:r|g|b|disparity|gauss|B8|B11,sensor3:r|g|b|flowx|flowy|distri|B10|B11')
    >>> self.normalize()
"""

import ubelt as ub
import itertools as it
import functools


try:
    cache = functools.cache
except AttributeError:
    cache = ub.memoize

try:
    from lark import Transformer
except ImportError:
    class FakeTransformer:
        pass
    # TODO: get xdev typetubs to ignore this
    # probably need some kind of directive.
    Transformer = FakeTransformer

SENSOR_CHAN_GRAMMAR = ub.codeblock(
    '''
    // SENSOR_CHAN_GRAMMAR
    ?start: stream

    // An identifier can contain spaces
    IDEN: ("_"|"*"|LETTER) ("_"|" "|"-"|"*"|LETTER|DIGIT)*

    chan_single : IDEN
    chan_getitem : IDEN "." INT
    chan_getslice_0b : IDEN ":" INT
    chan_getslice_ab : (IDEN "." INT ":" INT) | (IDEN ":" INT ":" INT)

    // A channel code can just be an ID, or it can have a getitem
    // style syntax with a scalar or slice as an argument
    chan_code : chan_single | chan_getslice_0b | chan_getslice_ab | chan_getitem

    // Fused channels are an ordered sequence of channel codes (without sensors)
    fused : chan_code ("|" chan_code)*

    // Channels can be specified in a sequence but must contain parens
    fused_seq : "(" fused ("," fused)* ")"

    // Sensors can be specified in a sequence but must contain parens
    sensor_seq : "(" IDEN ("," IDEN)* "):"

    sensor_lhs : (IDEN ":") | (sensor_seq)

    // A channel only part can be a fused channel or a sequence
    channel_rhs : fused | fused_seq

    sensor_chan : sensor_lhs channel_rhs?

    nosensor_chan : channel_rhs

    stream_item : sensor_chan | nosensor_chan

    // A stream is an unordered sequence of fused channels, that can
    // optionally contain sensor specifications.

    stream : stream_item ("," stream_item)*

    %import common.DIGIT
    %import common.LETTER
    %import common.INT
    ''')


"""
TODO: add the concept of an exclusive or operator with left hand priority. The
idea is that we can specify a code that will use the one fused channel spec if
it is available, but if it is not, it will fall back to the next one. E.G.


WV:((red|green|blue)^(pan))

(L8,S2,WV,WV1):((red|green|blue)^(pan))


Possible Production Rules:

    fused : chan_code ("|" chan_code)*
    fused_seq : "(" fused ("," fused)* ")"



Maybe also include that on the sensor side?

Use WV:r|g|b if we have it otherwise use S2:r|g|b

(WV^S2)(r|g|b)

"""


class SensorSpec(ub.NiceRepr):
    """
    A simple wrapper for sensors in case we want to do anything fancy with them
    later. For now they are just a string.
    """
    def __init__(self, spec):
        self.spec = spec

    def __nice__(self):
        return self.spec

    def __json__(self):
        return self.spec


[docs] class SensorChanSpec(ub.NiceRepr): """ The public facing API for the sensor / channel specification Example: >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> self = SensorChanSpec('(L8,S2):BGR,WV:BGR,S2:nir,L8:land.0:4') >>> s1 = self.normalize() >>> s2 = self.concise() >>> streams = self.streams() >>> print(s1) >>> print(s2) >>> print('streams = {}'.format(ub.urepr(streams, sv=1, nl=1))) L8:BGR,S2:BGR,WV:BGR,S2:nir,L8:land.0|land.1|land.2|land.3 (L8,S2,WV):BGR,L8:land:4,S2:nir streams = [ L8:BGR, S2:BGR, WV:BGR, S2:nir, L8:land.0|land.1|land.2|land.3, ] Example: >>> # Check with generic sensors >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> import delayed_image >>> self = SensorChanSpec('(*):BGR,*:BGR,*:nir,*:land.0:4') >>> self.concise().normalize() >>> s1 = self.normalize() >>> s2 = self.concise() >>> print(s1) >>> print(s2) *:BGR,*:BGR,*:nir,*:land.0|land.1|land.2|land.3 (*):BGR,*:(nir,land:4) >>> import delayed_image >>> c = delayed_image.ChannelSpec.coerce('BGR,BGR,nir,land.0:8') >>> c1 = c.normalize() >>> c2 = c.concise() >>> print(c1) >>> print(c2) Example: >>> # Check empty channels >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> import delayed_image >>> print(SensorChanSpec('*:').normalize()) *: >>> print(SensorChanSpec('sen:').normalize()) sen: >>> print(SensorChanSpec('sen:').normalize().concise()) sen: >>> print(SensorChanSpec('sen:').concise().normalize().concise()) sen: """ def __init__(self, spec: str): self.spec: str = spec def __nice__(self): return self.spec def __json__(self): return self.spec def __str__(self): return self.spec
[docs] @classmethod def coerce(cls, data): """ Attempt to interpret the data as a channel specification Returns: SensorChanSpec Example: >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import * # NOQA >>> from delayed_image.sensorchan_spec import SensorChanSpec >>> data = SensorChanSpec.coerce(3) >>> assert SensorChanSpec.coerce(data).normalize().spec == '*:u0|u1|u2' >>> data = SensorChanSpec.coerce(3) >>> assert data.spec == 'u0|u1|u2' >>> assert SensorChanSpec.coerce(data).spec == 'u0|u1|u2' >>> data = SensorChanSpec.coerce('u:3') >>> assert data.normalize().spec == '*:u.0|u.1|u.2' """ import delayed_image if isinstance(data, cls): self = data return self elif isinstance(data, str): self = cls(data) return self elif isinstance(data, delayed_image.FusedChannelSpec): spec = data.spec self = cls(spec) return self elif isinstance(data, delayed_image.ChannelSpec): spec = data.spec self = cls(spec) return self else: chan = delayed_image.ChannelSpec.coerce(data) self = cls(chan.spec) return self
[docs] def normalize(self): new_spec = normalize_sensor_chan(self.spec) new = self.__class__(new_spec) return new
[docs] def concise(self): """ Example: >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image import SensorChanSpec >>> a = SensorChanSpec.coerce('Cam1:(red,blue)') >>> b = SensorChanSpec.coerce('Cam2:(blue,green)') >>> c = (a + b).concise() >>> print(c) (Cam1,Cam2):blue,Cam1:red,Cam2:green >>> # Note the importance of parenthesis in the previous example >>> # otherwise channels will be assigned to `*` the generic sensor. >>> a = SensorChanSpec.coerce('Cam1:red,blue') >>> b = SensorChanSpec.coerce('Cam2:blue,green') >>> c = (a + b).concise() >>> print(c) (*,Cam2):blue,*:green,Cam1:red """ new_spec = concise_sensor_chan(self.spec) new = self.__class__(new_spec) return new
[docs] def streams(self): """ Returns: List[FusedSensorChanSpec]: List of sensor-names and fused channel specs """ parts = sensorchan_normalized_parts(self.spec) streams = [ FusedSensorChanSpec(SensorSpec(part.sensor), part.chan.data) for part in parts] return streams
[docs] def late_fuse(self, *others): """ Example: >>> # xdoctest: +REQUIRES(module:lark) >>> import delayed_image >>> from delayed_image import sensorchan_spec >>> import delayed_image >>> delayed_image.SensorChanSpec = sensorchan_spec.SensorChanSpec # hack for 3.6 >>> a = delayed_image.SensorChanSpec.coerce('A|B|C,edf') >>> b = delayed_image.SensorChanSpec.coerce('A12') >>> c = delayed_image.SensorChanSpec.coerce('') >>> d = delayed_image.SensorChanSpec.coerce('rgb') >>> print(a.late_fuse(b).spec) >>> print((a + b).spec) >>> print((b + a).spec) >>> print((a + b + c).spec) >>> print(sum([a, b, c, d]).spec) A|B|C,edf,A12 A|B|C,edf,A12 A12,A|B|C,edf A|B|C,edf,A12 A|B|C,edf,A12,rgb >>> import delayed_image >>> a = delayed_image.SensorChanSpec.coerce('A|B|C,edf').normalize() >>> b = delayed_image.SensorChanSpec.coerce('A12').normalize() >>> c = delayed_image.SensorChanSpec.coerce('').normalize() >>> d = delayed_image.SensorChanSpec.coerce('rgb').normalize() >>> print(a.late_fuse(b).spec) >>> print((a + b).spec) >>> print((b + a).spec) >>> print((a + b + c).spec) >>> print(sum([a, b, c, d]).spec) *:A|B|C,*:edf,*:A12 *:A|B|C,*:edf,*:A12 *:A12,*:A|B|C,*:edf *:A|B|C,*:edf,*:A12,*: *:A|B|C,*:edf,*:A12,*:,*:rgb >>> print((a.late_fuse(b)).concise()) >>> print(((a + b)).concise()) >>> print(((b + a)).concise()) >>> print(((a + b + c)).concise()) >>> print((sum([a, b, c, d])).concise()) *:(A|B|C,edf,A12) *:(A|B|C,edf,A12) *:(A12,A|B|C,edf) *:(A|B|C,edf,A12,) *:(A|B|C,edf,A12,,r|g|b) Example: >>> # Test multi-arg case >>> import delayed_image >>> a = delayed_image.SensorChanSpec.coerce('A|B|C,edf') >>> b = delayed_image.SensorChanSpec.coerce('A12') >>> c = delayed_image.SensorChanSpec.coerce('') >>> d = delayed_image.SensorChanSpec.coerce('rgb') >>> others = [b, c, d] >>> print(a.late_fuse(*others).spec) >>> print(delayed_image.SensorChanSpec.late_fuse(a, b, c, d).spec) A|B|C,edf,A12,rgb A|B|C,edf,A12,rgb """ import itertools as it args = it.chain([self], others) specs = [s.spec for s in args if s.spec] new_spec = ','.join(specs) return SensorChanSpec.coerce(new_spec)
def __add__(self, other): """ Late fusion combination """ return self.late_fuse(other) def __radd__(self, other): """ Late fusion combination """ if other == 0: return self return other.late_fuse(self)
[docs] def matching_sensor(self, sensor): """ Get the components corresponding to a specific sensor Args: sensor (str): the name of the sensor to match Example: >>> # xdoctest: +REQUIRES(module:lark) >>> import delayed_image >>> self = delayed_image.SensorChanSpec.coerce('(S1,S2):(a|b|c),S2:c|d|e') >>> sensor = 'S2' >>> new = self.matching_sensor(sensor) >>> print(f'new={new}') new=S2:a|b|c,S2:c|d|e >>> print(self.matching_sensor('S1')) S1:a|b|c >>> print(self.matching_sensor('S3')) S3: """ matching_streams = [] for s in self.streams(): if s.sensor.spec == sensor: matching_streams.append(s) new = sum(matching_streams) if new == 0: import delayed_image new = FusedSensorChanSpec(SensorSpec(sensor), delayed_image.FusedChannelSpec.coerce('')) return new
@property def chans(self): """ Returns the channel-only spec, ONLY if all of the sensors are the same Example: >>> # xdoctest: +REQUIRES(module:lark) >>> import delayed_image >>> self = delayed_image.SensorChanSpec.coerce('(S1,S2):(a|b|c),S2:c|d|e') >>> import pytest >>> with pytest.raises(Exception): >>> self.chans >>> print(self.matching_sensor('S1').chans.spec) >>> print(self.matching_sensor('S2').chans.spec) a|b|c a|b|c,c|d|e """ channel_specs = [] sensor_specs = [] for s in self.streams(): sensor_specs.append(s.sensor.spec) channel_specs.append(s.chans) if not ub.allsame(sensor_specs): raise Exception('Can only take pure channel specs when all sensors are the same') return sum(channel_specs)
class FusedSensorChanSpec(SensorChanSpec): """ A single sensor a corresponding fused channels. """ def __init__(self, sensor, chans): self.sensor = sensor self._chans = chans @property def chans(self): return self._chans @property def spec(self): return '{}:{}'.format(self.sensor.spec, self.chans.spec) def __json__(self): return self.spec class SensorChanNode: """ TODO: just replace this with the spec class itself? """ def __init__(self, sensor, chan): self.sensor = sensor self.chan = chan @property def spec(self): return f"{self.sensor}:{self.chan}" def __repr__(self): return self.spec def __str__(self): return self.spec class FusedChanNode: """ TODO: just replace this with the spec class itself? Example: s = FusedChanNode('a|b|c.0|c.1|c.2') c = s.concise() print(s) print(c) """ def __init__(self, chan): import delayed_image self.data = delayed_image.FusedChannelSpec.coerce(chan) @property def spec(self): return self.data.spec def concise(self): return self.__class__(self.data.concise()) def __repr__(self): return self.data.spec def __str__(self): return self.data.spec class SensorChanTransformer(Transformer): """ Given a parsed tree for a sensor-chan spec, can transform it into useful forms. TODO: Make the classes that hold the underlying data more robust such that they either use the existing channel spec or entirely replace it. (probably the former). Also need to add either a FusedSensorChan node that is restircted to only a single sensor and group of fused channels. Ignore: cases = [ 'S1:b:3', 'S1:b:3,S2:b:3', 'S1:b:3,S2:(b.0,b.1,b.2)', ] basis = { 'concise_channels': [0, 1], 'concise_sensors': [0, 1], } for spec in cases: print('') print('=====') print('spec = {}'.format(ub.urepr(spec, nl=1))) print('-----') for kwargs in ub.named_product(basis): sensor_channel_parser = _global_sensor_chan_parser() tree = sensor_channel_parser.parse(spec) transformed = SensorChanTransformer(**kwargs).transform(tree) print('') print('kwargs = {}'.format(ub.urepr(kwargs, nl=0))) print(f'transformed={transformed}') print('') print('=====') """ def __init__(self, concise_channels=1, concise_sensors=1): self.consise_channels = concise_channels self.concise_sensors = concise_sensors def chan_id(self, items): code, = items return code.value def chan_single(self, items): code, = items return [code.value] def chan_getitem(self, items): code, index = items return [f'{code}.{index.value}'] def chan_getslice_0b(self, items): code, btok = items return ['{}.{}'.format(code, index) for index in range(int(btok.value))] def chan_getslice_ab(self, items): code, atok, btok = items return ['{}.{}'.format(code, index) for index in range(int(atok.value), int(btok.value))] def chan_code(self, items): return items[0] def sensor_seq(self, items): return [s.value for s in items] def fused_seq(self, items): s = list(items) return s def fused(self, items): ret = FusedChanNode(list(ub.flatten(items))) if self.consise_channels: ret = ret.concise() return ret def channel_rhs(self, items): flat = [] for item in items: if ub.iterable(item): flat.extend(item) else: flat.append(item) return flat def sensor_lhs(self, items): flat = [] for item in items: if ub.iterable(item): flat.extend(item) else: flat.append(item.value) return flat def nosensor_chan(self, items): item, = items return [SensorChanNode('*', c) for c in item] def sensor_chan(self, items): if len(items) == 1: # handle empty channels items = [items[0], ['']] assert len(items) == 2 lhs, rhs = items new = [] for a, b in it.product(lhs, rhs): new.append(SensorChanNode(a, b)) return new def stream_item(self, items): item, = items return item def stream(self, items): flat_items = list(ub.flatten(items)) # TODO: can probably improve this if self.concise_sensors: flat_sensors = [str(f.sensor) for f in flat_items] flat_chans = [str(f.chan) for f in flat_items] chan_to_sensors = ub.group_items(flat_sensors, flat_chans) pass1_sensors = [] pass1_chans = [] for chan, sensors in chan_to_sensors.items(): sense_part = ','.join(sorted(ub.unique(sensors))) if len(sensors) > 1: sense_part = '({})'.format(sense_part) pass1_sensors.append(sense_part) pass1_chans.append(str(chan)) pass2_parts = [] sensor_to_chan = ub.group_items(pass1_chans, pass1_sensors) for sensor, chans in sensor_to_chan.items(): chan_part = ','.join(chans) if len(chans) > 1: chan_part = '({})'.format(chan_part) pass2_parts.append('{}:{}'.format(sensor, chan_part)) parts = pass2_parts parts = sorted(parts) else: parts = flat_items return parts @cache def _global_sensor_chan_parser(): # https://github.com/lark-parser/lark/blob/master/docs/_static/lark_cheatsheet.pdf import lark try: import lark_cython sensor_channel_parser = lark.Lark(SENSOR_CHAN_GRAMMAR, start='start', parser='lalr', _plugins=lark_cython.plugins) except ImportError: sensor_channel_parser = lark.Lark(SENSOR_CHAN_GRAMMAR, start='start', parser='lalr') return sensor_channel_parser @cache def normalize_sensor_chan(spec: str): """ Example: >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import * # NOQA >>> spec = 'L8:mat:4,L8:red,S2:red,S2:forest|brush,S2:mat.0|mat.1|mat.2|mat.3' >>> r1 = normalize_sensor_chan(spec) >>> spec = 'L8:r|g|b,L8:r|g|b' >>> r2 = normalize_sensor_chan(spec) >>> print(f'r1={r1}') >>> print(f'r2={r2}') r1=L8:mat.0|mat.1|mat.2|mat.3,L8:red,S2:red,S2:forest|brush,S2:mat.0|mat.1|mat.2|mat.3 r2=L8:r|g|b,L8:r|g|b Ignore: >>> # TODO: fix bug or disallow behavior >>> from delayed_image.sensorchan_spec import * # NOQA >>> spec = '*:(rgb,,cde)' >>> concise_spec = normalize_sensor_chan(spec) """ if spec == '': spec = '*:' transformed = sensorchan_normalized_parts(spec) new_spec = ','.join([n.spec for n in transformed]) return new_spec @cache def concise_sensor_chan(spec: str): """ Example: >>> # xdoctest: +REQUIRES(module:lark) >>> from delayed_image.sensorchan_spec import * # NOQA >>> spec = 'L8:mat.0|mat.1|mat.2|mat.3,L8:red,S2:red,S2:forest|brush,S2:mat.0|mat.1|mat.2|mat.3' >>> concise_spec = concise_sensor_chan(spec) >>> normed_spec = normalize_sensor_chan(concise_spec) >>> concise_spec2 = concise_sensor_chan(normed_spec) >>> assert concise_spec2 == concise_spec >>> print(concise_spec) (L8,S2):(mat:4,red),S2:forest|brush """ transformed = sensorchan_concise_parts(spec) new_spec = ','.join([str(n) for n in transformed]) return new_spec # @cache def sensorchan_concise_parts(spec: str): """ Ignore: >>> # xdoctest: +REQUIRES(module:lark) >>> spec = 'L8:mat.0|mat.1|mat.2|mat.3,L8:red,(MODIS,S2):a|b|c,S2:red,S2:forest|brush|bare_ground,S2:mat.0|mat.1|mat.2|mat.3' >>> parts = sensorchan_concise_parts(spec) """ try: sensor_channel_parser = _global_sensor_chan_parser() tree = sensor_channel_parser.parse(spec) transformed = SensorChanTransformer(concise_sensors=1, concise_channels=1).transform(tree) except Exception: print(f'ERROR: Failed to condense spec={spec}') raise return transformed def sensorchan_normalized_parts(spec: str): """ Ignore: >>> # xdoctest: +REQUIRES(module:lark) >>> spec = 'L8:mat.0|mat.1|mat.2|mat.3,L8:red,(MODIS,S2):a|b|c,S2:red,S2:forest|brush|bare_ground|built_up|cropland|wetland|water|snow_or_ice_field,S2:mat.0|mat.1|mat.2|mat.3' >>> parts = sensorchan_normalized_parts(spec) """ try: sensor_channel_parser = _global_sensor_chan_parser() tree = sensor_channel_parser.parse(spec) transformed = SensorChanTransformer(concise_sensors=0, concise_channels=0).transform(tree) except Exception: print(f'ERROR: Failed to normalize spec={spec}') raise return transformed