Reactor: fix import
This commit is contained in:
parent
8244e3dc1f
commit
ba0d09ef64
1 changed files with 61 additions and 50 deletions
|
@ -2,6 +2,7 @@
|
||||||
"""
|
"""
|
||||||
Gisaf reactor, deals with message processing (mqtt, etc)
|
Gisaf reactor, deals with message processing (mqtt, etc)
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
|
@ -12,10 +13,9 @@ from collections import OrderedDict
|
||||||
|
|
||||||
from aiomqtt import Client, Message
|
from aiomqtt import Client, Message
|
||||||
|
|
||||||
from gisaf.ipynb_tools import gisaf
|
|
||||||
from gisaf.config import conf
|
from gisaf.config import conf
|
||||||
|
|
||||||
logger = logging.getLogger('gisaf.reactor')
|
logger = logging.getLogger("gisaf.reactor")
|
||||||
|
|
||||||
|
|
||||||
class Reactor:
|
class Reactor:
|
||||||
|
@ -25,21 +25,22 @@ class Reactor:
|
||||||
async def setup(self, exclude_processor_names=None):
|
async def setup(self, exclude_processor_names=None):
|
||||||
if exclude_processor_names == None:
|
if exclude_processor_names == None:
|
||||||
exclude_processor_names = []
|
exclude_processor_names = []
|
||||||
for entry_point in entry_points().select(group='gisaf_message_processors'):
|
for entry_point in entry_points().select(group="gisaf_message_processors"):
|
||||||
logger.debug(
|
logger.debug(f"Processing message processor module {entry_point.name}")
|
||||||
f'Processing message processor module {entry_point.name}'
|
|
||||||
)
|
|
||||||
try:
|
try:
|
||||||
message_processor_class = entry_point.load()
|
message_processor_class = entry_point.load()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(f'Skip message processor module ' \
|
logger.error(
|
||||||
f'{entry_point.name}: {err}')
|
f"Skip message processor module " f"{entry_point.name}: {err}"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
try:
|
try:
|
||||||
message_processor = message_processor_class()
|
message_processor = message_processor_class()
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.error(f'Skip message processor module ' \
|
logger.error(
|
||||||
f'{entry_point.name} (cannot instanciate): {err}')
|
f"Skip message processor module "
|
||||||
|
f"{entry_point.name} (cannot instanciate): {err}"
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
if not message_processor.enabled:
|
if not message_processor.enabled:
|
||||||
continue
|
continue
|
||||||
|
@ -56,15 +57,17 @@ class Reactor:
|
||||||
def get_available_processors(self):
|
def get_available_processors(self):
|
||||||
return [
|
return [
|
||||||
entry_point.name
|
entry_point.name
|
||||||
for entry_point in entry_points().select(group='gisaf_message_processors')
|
for entry_point in entry_points().select(group="gisaf_message_processors")
|
||||||
]
|
]
|
||||||
|
|
||||||
def add_processor(self, processor):
|
def add_processor(self, processor):
|
||||||
try:
|
try:
|
||||||
processor.topic_re = re.compile(processor.topic)
|
processor.topic_re = re.compile(processor.topic)
|
||||||
except Exception as err:
|
except Exception as err:
|
||||||
logger.warning(f'Cannot treat topic "{processor.topic}" of '\
|
logger.warning(
|
||||||
f'"{processor.name}" as reg exp: {err}')
|
f'Cannot treat topic "{processor.topic}" of '
|
||||||
|
f'"{processor.name}" as reg exp: {err}'
|
||||||
|
)
|
||||||
processor.topic_re = None
|
processor.topic_re = None
|
||||||
self.processors[processor.name] = processor
|
self.processors[processor.name] = processor
|
||||||
|
|
||||||
|
@ -74,13 +77,13 @@ class Reactor:
|
||||||
|
|
||||||
async def process_unfiltered_message(self, message):
|
async def process_unfiltered_message(self, message):
|
||||||
## Log
|
## Log
|
||||||
if len(message.payload)>50:
|
if len(message.payload) > 50:
|
||||||
msg = message.payload[:50].decode()
|
msg = message.payload[:50].decode()
|
||||||
else:
|
else:
|
||||||
msg = message.payload.decode()
|
msg = message.payload.decode()
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f'Got unfiltered message on "{message.topic}" '\
|
f'Got unfiltered message on "{message.topic}" '
|
||||||
f'({len(message.payload)} bytes): {msg}'
|
f"({len(message.payload)} bytes): {msg}"
|
||||||
)
|
)
|
||||||
tasks = OrderedDict()
|
tasks = OrderedDict()
|
||||||
for name, processor in self.processors.items():
|
for name, processor in self.processors.items():
|
||||||
|
@ -97,8 +100,10 @@ class Reactor:
|
||||||
for task_number, task in enumerate(tasks.items()):
|
for task_number, task in enumerate(tasks.items()):
|
||||||
result = results[task_number]
|
result = results[task_number]
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
logger.warning(f'Error executing task "{task[0]}" ' \
|
logger.warning(
|
||||||
f'for topic "{message.topic}": {result}')
|
f'Error executing task "{task[0]}" '
|
||||||
|
f'for topic "{message.topic}": {result}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class MessageProcessorBaseClass:
|
class MessageProcessorBaseClass:
|
||||||
|
@ -108,8 +113,9 @@ class MessageProcessorBaseClass:
|
||||||
(an async context processor) in setup():
|
(an async context processor) in setup():
|
||||||
all the contexts will be used when the reactor runs.
|
all the contexts will be used when the reactor runs.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
enabled: bool = True
|
enabled: bool = True
|
||||||
topic: str = ''
|
topic: str = ""
|
||||||
context: Any = None
|
context: Any = None
|
||||||
|
|
||||||
async def setup(self, *args, **kwargs) -> None:
|
async def setup(self, *args, **kwargs) -> None:
|
||||||
|
@ -134,44 +140,49 @@ async def main(list=None, exclude_processor_names=None) -> None:
|
||||||
if list:
|
if list:
|
||||||
reactor = Reactor()
|
reactor = Reactor()
|
||||||
jobs = reactor.get_available_processors()
|
jobs = reactor.get_available_processors()
|
||||||
print(' '.join(jobs))
|
print(" ".join(jobs))
|
||||||
sys.exit(0)
|
sys.exit(0)
|
||||||
await gisaf.setup()
|
|
||||||
await gisaf.make_models()
|
|
||||||
reactor = Reactor()
|
reactor = Reactor()
|
||||||
await reactor.setup(exclude_processor_names=exclude_processor_names)
|
await reactor.setup(exclude_processor_names=exclude_processor_names)
|
||||||
|
|
||||||
async with Client(
|
async with Client(
|
||||||
hostname=conf.gisaf_live.mqtt.broker,
|
hostname=conf.gisaf_live.mqtt.broker, port=conf.gisaf_live.mqtt.port
|
||||||
port=conf.gisaf_live.mqtt.port
|
|
||||||
) as client:
|
) as client:
|
||||||
async with client.messages() as messages:
|
for name, processor in reactor.processors.items():
|
||||||
|
await client.subscribe(processor.topic)
|
||||||
|
async for message in client.messages:
|
||||||
for name, processor in reactor.processors.items():
|
for name, processor in reactor.processors.items():
|
||||||
await client.subscribe(processor.topic)
|
if message.topic.matches(processor.topic):
|
||||||
message: Message
|
try:
|
||||||
async for message in messages:
|
await processor.process(message)
|
||||||
for name, processor in reactor.processors.items():
|
except Exception as err:
|
||||||
if message.topic.matches(processor.topic):
|
logger.warning(
|
||||||
try:
|
"Error while processing message "
|
||||||
await processor.process(message)
|
f"{message.topic} by {processor.name}. "
|
||||||
except Exception as err:
|
"See below for the full trace."
|
||||||
logger.warning(
|
)
|
||||||
'Error while processing message '
|
logger.exception(err)
|
||||||
f'{message.topic} by {processor.name}. '
|
|
||||||
'See below for the full trace.'
|
|
||||||
)
|
|
||||||
logger.exception(err)
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == "__main__":
|
||||||
import argparse
|
import argparse
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(description=__doc__)
|
parser = argparse.ArgumentParser(description=__doc__)
|
||||||
parser.add_argument('--debug', '-d', action='store_true',
|
parser.add_argument(
|
||||||
help='Print debug messages')
|
"--debug", "-d", action="store_true", help="Print debug messages"
|
||||||
parser.add_argument('--exclude-processor-name', '-e', action='append',
|
)
|
||||||
help='Do not run the processor with a given name')
|
parser.add_argument(
|
||||||
parser.add_argument('--list', '-l', action='store_true',
|
"--exclude-processor-name",
|
||||||
help='Only list available processors and exit')
|
"-e",
|
||||||
|
action="append",
|
||||||
|
help="Do not run the processor with a given name",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--list",
|
||||||
|
"-l",
|
||||||
|
action="store_true",
|
||||||
|
help="Only list available processors and exit",
|
||||||
|
)
|
||||||
|
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
if args.debug:
|
if args.debug:
|
||||||
|
@ -180,10 +191,10 @@ if __name__ == '__main__':
|
||||||
else:
|
else:
|
||||||
logging.root.setLevel(logging.INFO)
|
logging.root.setLevel(logging.INFO)
|
||||||
logger.setLevel(logging.INFO)
|
logger.setLevel(logging.INFO)
|
||||||
logging.getLogger('websockets').setLevel(logging.WARNING)
|
logging.getLogger("websockets").setLevel(logging.WARNING)
|
||||||
logging.getLogger('engineio.client').setLevel(logging.WARNING)
|
logging.getLogger("engineio.client").setLevel(logging.WARNING)
|
||||||
logging.getLogger('socketio.client').setLevel(logging.WARNING)
|
logging.getLogger("socketio.client").setLevel(logging.WARNING)
|
||||||
logging.getLogger('aiosasl').setLevel(logging.WARNING)
|
logging.getLogger("aiosasl").setLevel(logging.WARNING)
|
||||||
|
|
||||||
asyncio.run(
|
asyncio.run(
|
||||||
main(
|
main(
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue