From 5c9d82f5076fcb13fe341bbf53ab25ed56e18035 Mon Sep 17 00:00:00 2001 From: phil Date: Sat, 23 Dec 2023 15:08:42 +0530 Subject: [PATCH] Setup redis cache (ttags) --- src/gisaf/application.py | 20 ++----- src/gisaf/config.py | 9 +++ src/gisaf/database.py | 14 ++--- src/gisaf/redis_tools.py | 93 ++++++++++++++++++------------ src/gisaf/registry.py | 3 +- src/gisaf/scheduler_application.py | 5 ++ 6 files changed, 82 insertions(+), 62 deletions(-) diff --git a/src/gisaf/application.py b/src/gisaf/application.py index dad3345..3e6861a 100644 --- a/src/gisaf/application.py +++ b/src/gisaf/application.py @@ -1,36 +1,24 @@ -from contextlib import asynccontextmanager import logging -from typing import Any - -#import colorama -#colorama.init() +from contextlib import asynccontextmanager from fastapi import FastAPI, responses from .api import api from .geoapi import api as geoapi from .config import conf -from .registry import registry, ModelRegistry -from .redis_tools import setup_redis, shutdown_redis, setup_redis_cache +from .registry import registry +from .redis_tools import setup_redis, setup_redis_cache, shutdown_redis from .live import setup_live logging.basicConfig(level=conf.gisaf.debugLevel) logger = logging.getLogger(__name__) -## Subclass FastAPI to add attributes to be used globally, ie. registry -class GisafExtra: - registry: ModelRegistry - #raw_survey_models: dict[str, Any] = {} - #survey_models: dict[str, Any] = {} - - -class GisafFastAPI(FastAPI): - gisaf_extra: GisafExtra @asynccontextmanager async def lifespan(app: FastAPI): await registry.make_registry() await setup_redis() + await setup_redis_cache() await setup_live() yield await shutdown_redis() diff --git a/src/gisaf/config.py b/src/gisaf/config.py index ed4e0f0..83e0499 100644 --- a/src/gisaf/config.py +++ b/src/gisaf/config.py @@ -91,6 +91,7 @@ class Crypto(BaseSettings): class DB(BaseSettings): uri: str host: str + port: int = 5432 user: str db: str password: str @@ -98,6 +99,14 @@ class DB(BaseSettings): info: bool pool_size: int = 10 max_overflow: int = 10 + echo: bool = False + + def get_sqla_url(self): + return f'postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}' + + def get_pg_url(self): + return f'postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}' + class Log(BaseSettings): level: str diff --git a/src/gisaf/database.py b/src/gisaf/database.py index 375db90..d6e05f6 100644 --- a/src/gisaf/database.py +++ b/src/gisaf/database.py @@ -1,5 +1,6 @@ from contextlib import asynccontextmanager -from typing import Annotated, AsyncContextManager +from typing import Annotated +from collections.abc import AsyncGenerator from sqlalchemy.ext.asyncio import create_async_engine from sqlmodel.ext.asyncio.session import AsyncSession @@ -8,22 +9,19 @@ import pandas as pd from .config import conf -echo = False -pg_url = "postgresql+asyncpg://avgis@localhost/avgis" - engine = create_async_engine( - pg_url, - echo=echo, + conf.db.get_sqla_url(), + echo=conf.db.echo, pool_size=conf.db.pool_size, max_overflow=conf.db.max_overflow, ) -async def get_db_session() -> AsyncSession: +async def get_db_session() -> AsyncGenerator[AsyncSession]: async with AsyncSession(engine) as session: yield session @asynccontextmanager -async def db_session() -> AsyncContextManager[AsyncSession]: +async def db_session() -> AsyncGenerator[AsyncSession]: async with AsyncSession(engine) as session: yield session diff --git a/src/gisaf/redis_tools.py b/src/gisaf/redis_tools.py index d2592de..aa421e8 100644 --- a/src/gisaf/redis_tools.py +++ b/src/gisaf/redis_tools.py @@ -9,9 +9,11 @@ import logging import pandas as pd import geopandas as gpd +from asyncpg import connect +from asyncpg.connection import Connection from asyncpg.exceptions import UndefinedTableError, InterfaceError +from sqlalchemy import text from redis import asyncio as aioredis -from pydantic import create_model from .config import conf # from gisaf.models.live import LiveModel @@ -20,6 +22,7 @@ from .utils import (SHAPELY_TYPE_TO_MAPBOX_TYPE, DEFAULT_MAPBOX_LAYOUT, from .registry import registry #from .models.geom import GeomGroup, GeomModel from .models.geo_models_base import LiveGeoModel +from .database import db_session logger = logging.getLogger(__name__) @@ -90,6 +93,8 @@ class Store: - redis: RedisConnection - pub (/sub) connections """ + asyncpg_conn: Connection + async def setup(self): """ Setup the live service for the main Gisaf application: @@ -328,7 +333,10 @@ class Store: Postgres/asyncpg listener for the trigger on data change. A task is created because this function is not asynchronous. """ - create_task(self.set_ttag(store_name, time())) + if store_name in registry.stores: + create_task(self.set_ttag(store_name, time())) + else: + logger.warn(f'Notify received for an unexisting store: {store_name}') async def get_ttag(self, store_name): """ @@ -348,7 +356,7 @@ class Store: await self.set_ttag(store_name, weak_now_hex) return weak_now_hex - async def delete_all_ttags(self): + async def delete_all_ttags(self) -> None: """ Delete all ttags in redis """ @@ -357,7 +365,7 @@ class Store: if keys: await self.redis.delete(*keys) - async def _setup_db_cache_system(self): + async def _setup_db_cache_system(self) -> None: """ Setup the caching system: - clear all Redis store at startup @@ -365,52 +373,63 @@ class Store: function are setup on the database server - listen to the DB event emitter: setup a callback function """ - ## Setup the function and triggers on tables - ## Keep the connection alive: don't use a "with" block - ## It needs to be closed correctly: see _close_permanant_db_connection - self._permanent_conn = await db.acquire() - self._permanent_raw_conn = await self._permanent_conn.get_raw_connection() - - ## Create the function in the database - await self._permanent_raw_conn.execute(ttag_function) - ## Delete all the ttags, for safety ## eg. the database was changed and Gisaf wasn't running, so the redis store wasn't updated await store.delete_all_ttags() - ## Create DB triggers on the tables of the models - all_triggers = await self._permanent_raw_conn.fetch(get_all_triggers) - stores_with_trigger = {t['trigger_table'] for t in all_triggers if t['tigger_name'] == 'gisaf_ttag'} - missing_triger_tables = set(registry.geom).difference(stores_with_trigger) - if len(missing_triger_tables) > 0: - logger.info(f'Create Postgres modification triggers for {len(missing_triger_tables)} tables') - for store_name in missing_triger_tables: - model = registry.geom[store_name] - try: - await self._permanent_raw_conn.execute(ttag_create_trigger.format( - schema=model.__table__.schema, table=model.__table__.name)) - except UndefinedTableError: - logger.warning(f'table {store_name} does not exist in the database: skip modification trigger') - ## Setup triggers on Category and Qml, for Mapbox layer styling - for schema, table in (('gisaf_map', 'qml'), ('gisaf_survey', 'category')): - triggers = [t for t in all_triggers - if t['tigger_name'] == 'gisaf_ttag' and t['trigger_table'] == f'{schema}.{table}'] - if len(triggers) == 0: - await self._permanent_raw_conn.execute(ttag_create_trigger.format(schema=schema, table=table)) + async with db_session() as session: + ## Create the function in the database + await session.exec(text(ttag_function)) - ## Listen: define the callback function - await self._permanent_raw_conn.add_listener('gisaf_ttag', store.create_task_store_ttag) + ## Create DB triggers on the tables of the models + all_triggers_resp = await session.exec(text(get_all_triggers)) + all_triggers = all_triggers_resp.mappings().all() + stores_with_trigger = {t['trigger_table'] + for t in all_triggers + if t['tigger_name'] == 'gisaf_ttag'} + missing_triger_tables = set(registry.geom).difference(stores_with_trigger) + + # model: SQLModel = registry.stores.loc[store_name, 'model'] + if len(missing_triger_tables) > 0: + logger.info('Create Postgres modification triggers for ' + f'{len(missing_triger_tables)} tables') + for store_name in missing_triger_tables: + ## XXX: TODO: See https://stackoverflow.com/questions/7888846/trigger-in-sqlachemy + model = registry.geom[store_name] + + try: + await session.exec(text( + ttag_create_trigger.format( + schema=model.metadata.schema, + table=model.__tablename__) + )) + except UndefinedTableError: + logger.warning(f'table {store_name} does not exist in ' + 'the database: skip modification trigger') + ## Setup triggers on Category and Qml, for Mapbox layer styling + for schema, table in (('gisaf_map', 'qml'), ('gisaf_survey', 'category')): + triggers = [t for t in all_triggers + if t['tigger_name'] == 'gisaf_ttag' + and t['trigger_table'] == f'{schema}.{table}'] + if len(triggers) == 0: + await session.exec(text( + ttag_create_trigger.format(schema=schema, table=table) + )) + + ## Listen: define the callback function + self.asyncpg_conn = await connect(conf.db.get_pg_url()) + await self.asyncpg_conn.add_listener('gisaf_ttag', store.create_task_store_ttag) async def _close_permanant_db_connection(self): """ Called at aiohttp server shutdown: remove the listener and close the connections """ try: - await self._permanent_raw_conn.remove_listener('gisaf_ttag', store.create_task_store_ttag) + await self.asyncpg_conn.remove_listener( + 'gisaf_ttag', store.create_task_store_ttag) except InterfaceError as err: logger.warning(f'Cannot remove asyncpg listener in _close_permanant_db_connection: {err}') - await self._permanent_raw_conn.close() - await self._permanent_conn.release() + await self.asyncpg_conn.close() async def setup_redis(): diff --git a/src/gisaf/registry.py b/src/gisaf/registry.py index 40308a4..98d3042 100644 --- a/src/gisaf/registry.py +++ b/src/gisaf/registry.py @@ -72,10 +72,11 @@ class ModelRegistry: """ stores: pd.DataFrame values: dict[str, PlottableModel] + geom: dict[str, GeoModel] geom_live: dict[str, LiveGeoModel] geom_live_defs: dict[str, dict[str, Any]] geom_custom: dict[str, GeoModel] - geom_custom_store: dict[str, Any] + geom_custom_store: dict[str, GeoModel] other: dict[str, SQLModel] misc: dict[str, SQLModel] raw_survey_models: dict[str, RawSurveyBaseModel] diff --git a/src/gisaf/scheduler_application.py b/src/gisaf/scheduler_application.py index 7ffa694..d25c16b 100755 --- a/src/gisaf/scheduler_application.py +++ b/src/gisaf/scheduler_application.py @@ -12,6 +12,8 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from .config import conf from .ipynb_tools import gisaf +from .registry import registry +from .redis_tools import setup_redis, shutdown_redis from .scheduler import GSFastAPI, js, startup, Settings from .scheduler_web import app as sched_app @@ -33,10 +35,13 @@ async def lifespan(app: GSFastAPI): Handle startup and shutdown: setup scheduler, etc ''' ## Startup + await registry.make_registry() + await setup_redis() await gisaf.setup() await startup(settings) js.start() yield + await shutdown_redis() ## Shutdown pass