diff --git a/src/gisaf/reactor.py b/src/gisaf/reactor.py index 38918bc..9ce962e 100755 --- a/src/gisaf/reactor.py +++ b/src/gisaf/reactor.py @@ -2,6 +2,7 @@ """ Gisaf reactor, deals with message processing (mqtt, etc) """ + import asyncio import re import sys @@ -12,10 +13,9 @@ from collections import OrderedDict from aiomqtt import Client, Message -from gisaf.ipynb_tools import gisaf from gisaf.config import conf -logger = logging.getLogger('gisaf.reactor') +logger = logging.getLogger("gisaf.reactor") class Reactor: @@ -25,21 +25,22 @@ class Reactor: async def setup(self, exclude_processor_names=None): if exclude_processor_names == None: exclude_processor_names = [] - for entry_point in entry_points().select(group='gisaf_message_processors'): - logger.debug( - f'Processing message processor module {entry_point.name}' - ) + for entry_point in entry_points().select(group="gisaf_message_processors"): + logger.debug(f"Processing message processor module {entry_point.name}") try: message_processor_class = entry_point.load() except Exception as err: - logger.error(f'Skip message processor module ' \ - f'{entry_point.name}: {err}') + logger.error( + f"Skip message processor module " f"{entry_point.name}: {err}" + ) continue try: message_processor = message_processor_class() except Exception as err: - logger.error(f'Skip message processor module ' \ - f'{entry_point.name} (cannot instanciate): {err}') + logger.error( + f"Skip message processor module " + f"{entry_point.name} (cannot instanciate): {err}" + ) continue if not message_processor.enabled: continue @@ -56,15 +57,17 @@ class Reactor: def get_available_processors(self): return [ entry_point.name - for entry_point in entry_points().select(group='gisaf_message_processors') + for entry_point in entry_points().select(group="gisaf_message_processors") ] def add_processor(self, processor): try: processor.topic_re = re.compile(processor.topic) except Exception as err: - logger.warning(f'Cannot treat topic "{processor.topic}" of '\ - f'"{processor.name}" as reg exp: {err}') + logger.warning( + f'Cannot treat topic "{processor.topic}" of ' + f'"{processor.name}" as reg exp: {err}' + ) processor.topic_re = None self.processors[processor.name] = processor @@ -74,13 +77,13 @@ class Reactor: async def process_unfiltered_message(self, message): ## Log - if len(message.payload)>50: + if len(message.payload) > 50: msg = message.payload[:50].decode() else: msg = message.payload.decode() logger.debug( - f'Got unfiltered message on "{message.topic}" '\ - f'({len(message.payload)} bytes): {msg}' + f'Got unfiltered message on "{message.topic}" ' + f"({len(message.payload)} bytes): {msg}" ) tasks = OrderedDict() for name, processor in self.processors.items(): @@ -97,8 +100,10 @@ class Reactor: for task_number, task in enumerate(tasks.items()): result = results[task_number] if isinstance(result, Exception): - logger.warning(f'Error executing task "{task[0]}" ' \ - f'for topic "{message.topic}": {result}') + logger.warning( + f'Error executing task "{task[0]}" ' + f'for topic "{message.topic}": {result}' + ) class MessageProcessorBaseClass: @@ -108,8 +113,9 @@ class MessageProcessorBaseClass: (an async context processor) in setup(): all the contexts will be used when the reactor runs. """ + enabled: bool = True - topic: str = '' + topic: str = "" context: Any = None async def setup(self, *args, **kwargs) -> None: @@ -134,44 +140,49 @@ async def main(list=None, exclude_processor_names=None) -> None: if list: reactor = Reactor() jobs = reactor.get_available_processors() - print(' '.join(jobs)) + print(" ".join(jobs)) sys.exit(0) - await gisaf.setup() - await gisaf.make_models() reactor = Reactor() await reactor.setup(exclude_processor_names=exclude_processor_names) async with Client( - hostname=conf.gisaf_live.mqtt.broker, - port=conf.gisaf_live.mqtt.port + hostname=conf.gisaf_live.mqtt.broker, port=conf.gisaf_live.mqtt.port ) as client: - async with client.messages() as messages: + for name, processor in reactor.processors.items(): + await client.subscribe(processor.topic) + async for message in client.messages: for name, processor in reactor.processors.items(): - await client.subscribe(processor.topic) - message: Message - async for message in messages: - for name, processor in reactor.processors.items(): - if message.topic.matches(processor.topic): - try: - await processor.process(message) - except Exception as err: - logger.warning( - 'Error while processing message ' - f'{message.topic} by {processor.name}. ' - 'See below for the full trace.' - ) - logger.exception(err) + if message.topic.matches(processor.topic): + try: + await processor.process(message) + except Exception as err: + logger.warning( + "Error while processing message " + f"{message.topic} by {processor.name}. " + "See below for the full trace." + ) + logger.exception(err) -if __name__ == '__main__': +if __name__ == "__main__": import argparse + parser = argparse.ArgumentParser(description=__doc__) - parser.add_argument('--debug', '-d', action='store_true', - help='Print debug messages') - parser.add_argument('--exclude-processor-name', '-e', action='append', - help='Do not run the processor with a given name') - parser.add_argument('--list', '-l', action='store_true', - help='Only list available processors and exit') + parser.add_argument( + "--debug", "-d", action="store_true", help="Print debug messages" + ) + parser.add_argument( + "--exclude-processor-name", + "-e", + action="append", + help="Do not run the processor with a given name", + ) + parser.add_argument( + "--list", + "-l", + action="store_true", + help="Only list available processors and exit", + ) args = parser.parse_args() if args.debug: @@ -180,10 +191,10 @@ if __name__ == '__main__': else: logging.root.setLevel(logging.INFO) logger.setLevel(logging.INFO) - logging.getLogger('websockets').setLevel(logging.WARNING) - logging.getLogger('engineio.client').setLevel(logging.WARNING) - logging.getLogger('socketio.client').setLevel(logging.WARNING) - logging.getLogger('aiosasl').setLevel(logging.WARNING) + logging.getLogger("websockets").setLevel(logging.WARNING) + logging.getLogger("engineio.client").setLevel(logging.WARNING) + logging.getLogger("socketio.client").setLevel(logging.WARNING) + logging.getLogger("aiosasl").setLevel(logging.WARNING) asyncio.run( main(