Remove relative imports
Fix primary keys (optional) Add baskets, importers, plugins, reactor Add fake replacement fro graphql defs (to_migrate) Add typing marker (py.typed)
This commit is contained in:
parent
a974eea3d3
commit
741050db89
35 changed files with 2097 additions and 152 deletions
193
src/gisaf/reactor.py
Executable file
193
src/gisaf/reactor.py
Executable file
|
@ -0,0 +1,193 @@
|
|||
#!/usr/bin/env python
|
||||
"""
|
||||
Gisaf reactor, deals with message processing (mqtt, etc)
|
||||
"""
|
||||
import asyncio
|
||||
import re
|
||||
import sys
|
||||
import logging
|
||||
from typing import Any
|
||||
from importlib.metadata import entry_points
|
||||
from collections import OrderedDict
|
||||
|
||||
from aiomqtt import Client, Message
|
||||
|
||||
from gisaf.ipynb_tools import gisaf
|
||||
from gisaf.config import conf
|
||||
|
||||
logger = logging.getLogger('gisaf.reactor')
|
||||
|
||||
|
||||
class Reactor:
|
||||
def __init__(self):
|
||||
self.processors = {}
|
||||
|
||||
async def setup(self, exclude_processor_names=None):
|
||||
if exclude_processor_names == None:
|
||||
exclude_processor_names = []
|
||||
for entry_point in entry_points().select(group='gisaf_message_processors'):
|
||||
logger.debug(
|
||||
f'Processing message processor module {entry_point.name}'
|
||||
)
|
||||
try:
|
||||
message_processor_class = entry_point.load()
|
||||
except Exception as err:
|
||||
logger.error(f'Skip message processor module ' \
|
||||
f'{entry_point.name}: {err}')
|
||||
continue
|
||||
try:
|
||||
message_processor = message_processor_class()
|
||||
except Exception as err:
|
||||
logger.error(f'Skip message processor module ' \
|
||||
f'{entry_point.name} (cannot instanciate): {err}')
|
||||
continue
|
||||
if not message_processor.enabled:
|
||||
continue
|
||||
message_processor.name = entry_point.name
|
||||
|
||||
## Eventually skip processor according to arguments of the command line
|
||||
if message_processor.name in exclude_processor_names:
|
||||
continue
|
||||
|
||||
await message_processor.setup()
|
||||
self.add_processor(message_processor)
|
||||
logger.info(f'Added message processor "{entry_point.name}"')
|
||||
|
||||
def get_available_processors(self):
|
||||
return [
|
||||
entry_point.name
|
||||
for entry_point in entry_points().select(group='gisaf_message_processors')
|
||||
]
|
||||
|
||||
def add_processor(self, processor):
|
||||
try:
|
||||
processor.topic_re = re.compile(processor.topic)
|
||||
except Exception as err:
|
||||
logger.warning(f'Cannot treat topic "{processor.topic}" of '\
|
||||
f'"{processor.name}" as reg exp: {err}')
|
||||
processor.topic_re = None
|
||||
self.processors[processor.name] = processor
|
||||
|
||||
async def process_unfiltered_messages(self, messages):
|
||||
async for message in messages:
|
||||
await self.process_unfiltered_message(message)
|
||||
|
||||
async def process_unfiltered_message(self, message):
|
||||
## Log
|
||||
if len(message.payload)>50:
|
||||
msg = message.payload[:50].decode()
|
||||
else:
|
||||
msg = message.payload.decode()
|
||||
logger.debug(
|
||||
f'Got unfiltered message on "{message.topic}" '\
|
||||
f'({len(message.payload)} bytes): {msg}'
|
||||
)
|
||||
tasks = OrderedDict()
|
||||
for name, processor in self.processors.items():
|
||||
if processor.topic == message.topic:
|
||||
match = True
|
||||
else:
|
||||
if processor.topic_re:
|
||||
match = processor.topic_re.fullmatch(message.topic)
|
||||
else:
|
||||
match = False
|
||||
if match:
|
||||
tasks[processor.name] = processor.process(message)
|
||||
results = await asyncio.gather(*tasks.values(), return_exceptions=True)
|
||||
for task_number, task in enumerate(tasks.items()):
|
||||
result = results[task_number]
|
||||
if isinstance(result, Exception):
|
||||
logger.warning(f'Error executing task "{task[0]}" ' \
|
||||
f'for topic "{message.topic}": {result}')
|
||||
|
||||
|
||||
class MessageProcessorBaseClass:
|
||||
"""
|
||||
Base class for all message processors.
|
||||
Subclasses can set the attribute "context"
|
||||
(an async context processor) in setup():
|
||||
all the contexts will be used when the reactor runs.
|
||||
"""
|
||||
enabled: bool = True
|
||||
topic: str = ''
|
||||
context: Any = None
|
||||
|
||||
async def setup(self, *args, **kwargs) -> None:
|
||||
pass
|
||||
|
||||
async def process(self, message) -> None:
|
||||
pass
|
||||
|
||||
|
||||
async def cancel_tasks(tasks):
|
||||
for task in tasks:
|
||||
if task.done():
|
||||
continue
|
||||
task.cancel()
|
||||
try:
|
||||
await task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
|
||||
async def main(list=None, exclude_processor_names=None) -> None:
|
||||
if list:
|
||||
reactor = Reactor()
|
||||
jobs = reactor.get_available_processors()
|
||||
print(' '.join(jobs))
|
||||
sys.exit(0)
|
||||
await gisaf.setup()
|
||||
await gisaf.make_models()
|
||||
reactor = Reactor()
|
||||
await reactor.setup(exclude_processor_names=exclude_processor_names)
|
||||
|
||||
async with Client(
|
||||
hostname=conf.gisaf_live.mqtt.broker,
|
||||
port=conf.gisaf_live.mqtt.port
|
||||
) as client:
|
||||
async with client.messages() as messages:
|
||||
for name, processor in reactor.processors.items():
|
||||
await client.subscribe(processor.topic)
|
||||
message: Message
|
||||
async for message in messages:
|
||||
for name, processor in reactor.processors.items():
|
||||
if message.topic.matches(processor.topic):
|
||||
try:
|
||||
await processor.process(message)
|
||||
except Exception as err:
|
||||
logger.warning(
|
||||
'Error while processing message '
|
||||
f'{message.topic} by {processor.name}. '
|
||||
'See below for the full trace.'
|
||||
)
|
||||
logger.exception(err)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description=__doc__)
|
||||
parser.add_argument('--debug', '-d', action='store_true',
|
||||
help='Print debug messages')
|
||||
parser.add_argument('--exclude-processor-name', '-e', action='append',
|
||||
help='Do not run the processor with a given name')
|
||||
parser.add_argument('--list', '-l', action='store_true',
|
||||
help='Only list available processors and exit')
|
||||
|
||||
args = parser.parse_args()
|
||||
if args.debug:
|
||||
logging.root.setLevel(logging.DEBUG)
|
||||
logger.setLevel(logging.DEBUG)
|
||||
else:
|
||||
logging.root.setLevel(logging.INFO)
|
||||
logger.setLevel(logging.INFO)
|
||||
logging.getLogger('websockets').setLevel(logging.WARNING)
|
||||
logging.getLogger('engineio.client').setLevel(logging.WARNING)
|
||||
logging.getLogger('socketio.client').setLevel(logging.WARNING)
|
||||
logging.getLogger('aiosasl').setLevel(logging.WARNING)
|
||||
|
||||
asyncio.run(
|
||||
main(
|
||||
list=args.list,
|
||||
exclude_processor_names=args.exclude_processor_name,
|
||||
)
|
||||
)
|
Loading…
Add table
Add a link
Reference in a new issue