gisaf-backend/src/gisaf/reactor.py
phil 741050db89 Remove relative imports
Fix primary keys (optional)
Add baskets, importers, plugins, reactor
Add fake replacement fro graphql defs (to_migrate)
Add typing marker (py.typed)
2023-12-25 15:50:45 +05:30

193 lines
6.7 KiB
Python
Executable file

#!/usr/bin/env python
"""
Gisaf reactor, deals with message processing (mqtt, etc)
"""
import asyncio
import re
import sys
import logging
from typing import Any
from importlib.metadata import entry_points
from collections import OrderedDict
from aiomqtt import Client, Message
from gisaf.ipynb_tools import gisaf
from gisaf.config import conf
logger = logging.getLogger('gisaf.reactor')
class Reactor:
def __init__(self):
self.processors = {}
async def setup(self, exclude_processor_names=None):
if exclude_processor_names == None:
exclude_processor_names = []
for entry_point in entry_points().select(group='gisaf_message_processors'):
logger.debug(
f'Processing message processor module {entry_point.name}'
)
try:
message_processor_class = entry_point.load()
except Exception as err:
logger.error(f'Skip message processor module ' \
f'{entry_point.name}: {err}')
continue
try:
message_processor = message_processor_class()
except Exception as err:
logger.error(f'Skip message processor module ' \
f'{entry_point.name} (cannot instanciate): {err}')
continue
if not message_processor.enabled:
continue
message_processor.name = entry_point.name
## Eventually skip processor according to arguments of the command line
if message_processor.name in exclude_processor_names:
continue
await message_processor.setup()
self.add_processor(message_processor)
logger.info(f'Added message processor "{entry_point.name}"')
def get_available_processors(self):
return [
entry_point.name
for entry_point in entry_points().select(group='gisaf_message_processors')
]
def add_processor(self, processor):
try:
processor.topic_re = re.compile(processor.topic)
except Exception as err:
logger.warning(f'Cannot treat topic "{processor.topic}" of '\
f'"{processor.name}" as reg exp: {err}')
processor.topic_re = None
self.processors[processor.name] = processor
async def process_unfiltered_messages(self, messages):
async for message in messages:
await self.process_unfiltered_message(message)
async def process_unfiltered_message(self, message):
## Log
if len(message.payload)>50:
msg = message.payload[:50].decode()
else:
msg = message.payload.decode()
logger.debug(
f'Got unfiltered message on "{message.topic}" '\
f'({len(message.payload)} bytes): {msg}'
)
tasks = OrderedDict()
for name, processor in self.processors.items():
if processor.topic == message.topic:
match = True
else:
if processor.topic_re:
match = processor.topic_re.fullmatch(message.topic)
else:
match = False
if match:
tasks[processor.name] = processor.process(message)
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
for task_number, task in enumerate(tasks.items()):
result = results[task_number]
if isinstance(result, Exception):
logger.warning(f'Error executing task "{task[0]}" ' \
f'for topic "{message.topic}": {result}')
class MessageProcessorBaseClass:
"""
Base class for all message processors.
Subclasses can set the attribute "context"
(an async context processor) in setup():
all the contexts will be used when the reactor runs.
"""
enabled: bool = True
topic: str = ''
context: Any = None
async def setup(self, *args, **kwargs) -> None:
pass
async def process(self, message) -> None:
pass
async def cancel_tasks(tasks):
for task in tasks:
if task.done():
continue
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def main(list=None, exclude_processor_names=None) -> None:
if list:
reactor = Reactor()
jobs = reactor.get_available_processors()
print(' '.join(jobs))
sys.exit(0)
await gisaf.setup()
await gisaf.make_models()
reactor = Reactor()
await reactor.setup(exclude_processor_names=exclude_processor_names)
async with Client(
hostname=conf.gisaf_live.mqtt.broker,
port=conf.gisaf_live.mqtt.port
) as client:
async with client.messages() as messages:
for name, processor in reactor.processors.items():
await client.subscribe(processor.topic)
message: Message
async for message in messages:
for name, processor in reactor.processors.items():
if message.topic.matches(processor.topic):
try:
await processor.process(message)
except Exception as err:
logger.warning(
'Error while processing message '
f'{message.topic} by {processor.name}. '
'See below for the full trace.'
)
logger.exception(err)
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--debug', '-d', action='store_true',
help='Print debug messages')
parser.add_argument('--exclude-processor-name', '-e', action='append',
help='Do not run the processor with a given name')
parser.add_argument('--list', '-l', action='store_true',
help='Only list available processors and exit')
args = parser.parse_args()
if args.debug:
logging.root.setLevel(logging.DEBUG)
logger.setLevel(logging.DEBUG)
else:
logging.root.setLevel(logging.INFO)
logger.setLevel(logging.INFO)
logging.getLogger('websockets').setLevel(logging.WARNING)
logging.getLogger('engineio.client').setLevel(logging.WARNING)
logging.getLogger('socketio.client').setLevel(logging.WARNING)
logging.getLogger('aiosasl').setLevel(logging.WARNING)
asyncio.run(
main(
list=args.list,
exclude_processor_names=args.exclude_processor_name,
)
)