Fix reactor

Cleanup some confusion in redis_tools with mqtt
This commit is contained in:
phil 2024-05-22 03:06:01 +02:00
parent 46b524636b
commit 1fd347d8df
5 changed files with 54 additions and 32 deletions

View file

@ -1 +1 @@
__version__: str = '0.1.dev85+g41e92fa.d20240509' __version__: str = '2023.4.dev95+g46b5246.d20240520'

View file

@ -9,6 +9,7 @@ from sqlalchemy.sql.selectable import Select
from sqlmodel import SQLModel, select from sqlmodel import SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession from sqlmodel.ext.asyncio.session import AsyncSession
from fastapi import Depends from fastapi import Depends
# from geoalchemy2.functions import ST_SimplifyPreserveTopology # from geoalchemy2.functions import ST_SimplifyPreserveTopology
import pandas as pd import pandas as pd
import geopandas as gpd # type: ignore import geopandas as gpd # type: ignore
@ -29,21 +30,30 @@ sync_engine = create_engine(
max_overflow=conf.db.max_overflow, max_overflow=conf.db.max_overflow,
) )
async def get_db_session() -> AsyncGenerator[AsyncSession]: async def get_db_session() -> AsyncGenerator[AsyncSession]:
async with AsyncSession(engine) as session: async with AsyncSession(engine) as session:
yield session yield session
@asynccontextmanager @asynccontextmanager
async def db_session() -> AsyncGenerator[AsyncSession]: async def db_session() -> AsyncGenerator[AsyncSession]:
async with AsyncSession(engine) as session: async with AsyncSession(engine) as session:
yield session yield session
def pandas_query(session, query):
def pandas_query(session, query, cast=False):
return pd.read_sql_query(query, session.connection()) return pd.read_sql_query(query, session.connection())
def geopandas_query(session, query: Select, model, *,
def geopandas_query(
session,
query: Select,
model,
*,
# simplify_tolerance: float|None=None, # simplify_tolerance: float|None=None,
crs=None, cast=True, crs=None,
cast=True,
): ):
## XXX: I could not get the add_columns work without creating a subquery, ## XXX: I could not get the add_columns work without creating a subquery,
## so moving the simplification to geopandas - see in _get_df ## so moving the simplification to geopandas - see in _get_df
@ -55,9 +65,10 @@ def geopandas_query(session, query: Select, model, *,
# query = query.add_columns(new_column) # query = query.add_columns(new_column)
return gpd.GeoDataFrame.from_postgis(query, session.connection(), crs=crs) return gpd.GeoDataFrame.from_postgis(query, session.connection(), crs=crs)
class BaseModel(SQLModel): class BaseModel(SQLModel):
@classmethod @classmethod
def selectinload(cls) -> list[Literal['*'] | QueryableAttribute[Any]]: def selectinload(cls) -> list[Literal["*"] | QueryableAttribute[Any]]:
return [] return []
@classmethod @classmethod
@ -69,11 +80,17 @@ class BaseModel(SQLModel):
return await cls._get_df(geopandas_query, model=cls, **kwargs) # type: ignore return await cls._get_df(geopandas_query, model=cls, **kwargs) # type: ignore
@classmethod @classmethod
async def _get_df(cls, method, *, async def _get_df(
where=None, with_related=True, with_only_columns=[], cls,
method,
*,
where=None,
with_related=True,
with_only_columns=[],
simplify_tolerance: float | None = None, simplify_tolerance: float | None = None,
preserve_topology: bool | None = None, preserve_topology: bool | None = None,
**kwargs) -> pd.DataFrame | gpd.GeoDataFrame: **kwargs,
) -> pd.DataFrame | gpd.GeoDataFrame:
async with db_session() as session: async with db_session() as session:
if len(with_only_columns) == 0: if len(with_only_columns) == 0:
query = select(cls) query = select(cls)
@ -97,11 +114,13 @@ class BaseModel(SQLModel):
# pass # pass
df = await session.run_sync(method, query, **kwargs) df = await session.run_sync(method, query, **kwargs)
if method is geopandas_query and simplify_tolerance is not None: if method is geopandas_query and simplify_tolerance is not None:
df['geom'] = df['geom'].simplify( df["geom"] = df["geom"].simplify(
simplify_tolerance / conf.geo.simplify_geom_factor, simplify_tolerance / conf.geo.simplify_geom_factor,
preserve_topology=(conf.geo.simplify_preserve_topology preserve_topology=(
conf.geo.simplify_preserve_topology
if preserve_topology is None if preserve_topology is None
else preserve_topology) else preserve_topology
),
) )
## Chamge column names to reflect the joined tables ## Chamge column names to reflect the joined tables
## Leave the first columns unchanged, as their names come straight ## Leave the first columns unchanged, as their names come straight
@ -119,11 +138,15 @@ class BaseModel(SQLModel):
target = joined_table.property.target # type: ignore target = joined_table.property.target # type: ignore
for col in target.columns: for col in target.columns:
## Pop the column from the colujmn list and make a new name ## Pop the column from the colujmn list and make a new name
renames[joined_columns.pop(0)] = f'{target.schema}_{target.name}_{col.name}' renames[joined_columns.pop(0)] = (
f"{target.schema}_{target.name}_{col.name}"
)
df.rename(columns=renames, inplace=True) df.rename(columns=renames, inplace=True)
## Finally, set the index of the df as the index of cls ## Finally, set the index of the df as the index of cls
df.set_index([c.name for c in cls.__table__.primary_key.columns], # type: ignore df.set_index(
inplace=True) [c.name for c in cls.__table__.primary_key.columns], # type: ignore
inplace=True,
)
return df return df

View file

@ -2,7 +2,7 @@ import asyncio
import logging import logging
from collections import defaultdict from collections import defaultdict
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi import WebSocket
# from .config import conf # from .config import conf
from gisaf.redis_tools import store from gisaf.redis_tools import store
@ -14,12 +14,11 @@ class LiveServer:
def __init__(self): def __init__(self):
self.ws_clients = defaultdict(set) self.ws_clients = defaultdict(set)
async def setup(self, listen_to_redis=False, with_mqtt=False): async def setup(self, listen_to_redis=False):
""" """
Setup for the live server Setup for the live server
""" """
if with_mqtt: await store.setup(with_registry=False)
logger.warning("Gisaf LiveServer does not support with_mqtt: ignoring")
if listen_to_redis: if listen_to_redis:
self.pub = store.redis.pubsub() self.pub = store.redis.pubsub()
await self.pub.psubscribe("live:*:json") await self.pub.psubscribe("live:*:json")

View file

@ -14,6 +14,7 @@ from collections import OrderedDict
from aiomqtt import Client, Message from aiomqtt import Client, Message
from gisaf.config import conf from gisaf.config import conf
from gisaf.live import live_server
logger = logging.getLogger("gisaf.reactor") logger = logging.getLogger("gisaf.reactor")
@ -23,7 +24,7 @@ class Reactor:
self.processors = {} self.processors = {}
async def setup(self, exclude_processor_names=None): async def setup(self, exclude_processor_names=None):
if exclude_processor_names == None: if exclude_processor_names is None:
exclude_processor_names = [] exclude_processor_names = []
for entry_point in entry_points().select(group="gisaf_message_processors"): for entry_point in entry_points().select(group="gisaf_message_processors"):
logger.debug(f"Processing message processor module {entry_point.name}") logger.debug(f"Processing message processor module {entry_point.name}")
@ -53,6 +54,7 @@ class Reactor:
await message_processor.setup() await message_processor.setup()
self.add_processor(message_processor) self.add_processor(message_processor)
logger.info(f'Added message processor "{entry_point.name}"') logger.info(f'Added message processor "{entry_point.name}"')
await live_server.setup()
def get_available_processors(self): def get_available_processors(self):
return [ return [
@ -139,6 +141,7 @@ async def cancel_tasks(tasks):
async def main(list=None, exclude_processor_names=None) -> None: async def main(list=None, exclude_processor_names=None) -> None:
if list: if list:
reactor = Reactor() reactor = Reactor()
await reactor.setup()
jobs = reactor.get_available_processors() jobs = reactor.get_available_processors()
print(" ".join(jobs)) print(" ".join(jobs))
sys.exit(0) sys.exit(0)

View file

@ -23,8 +23,6 @@ from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.date import DateTrigger
# from gisaf.ipynb_tools import Gisaf
formatter = logging.Formatter( formatter = logging.Formatter(
"%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S" "%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S"
) )
@ -76,7 +74,6 @@ class JobBaseClass:
class JobScheduler: class JobScheduler:
# gs: Gisaf
jobs: dict[str, Any] jobs: dict[str, Any]
# tasks: dict[str, Any] # tasks: dict[str, Any]
wss: dict[str, Any] wss: dict[str, Any]