Setup redis cache (ttags)

This commit is contained in:
phil 2023-12-23 15:08:42 +05:30
parent f47e018e65
commit 5c9d82f507
6 changed files with 82 additions and 62 deletions

View file

@ -1,36 +1,24 @@
from contextlib import asynccontextmanager
import logging
from typing import Any
#import colorama
#colorama.init()
from contextlib import asynccontextmanager
from fastapi import FastAPI, responses
from .api import api
from .geoapi import api as geoapi
from .config import conf
from .registry import registry, ModelRegistry
from .redis_tools import setup_redis, shutdown_redis, setup_redis_cache
from .registry import registry
from .redis_tools import setup_redis, setup_redis_cache, shutdown_redis
from .live import setup_live
logging.basicConfig(level=conf.gisaf.debugLevel)
logger = logging.getLogger(__name__)
## Subclass FastAPI to add attributes to be used globally, ie. registry
class GisafExtra:
registry: ModelRegistry
#raw_survey_models: dict[str, Any] = {}
#survey_models: dict[str, Any] = {}
class GisafFastAPI(FastAPI):
gisaf_extra: GisafExtra
@asynccontextmanager
async def lifespan(app: FastAPI):
await registry.make_registry()
await setup_redis()
await setup_redis_cache()
await setup_live()
yield
await shutdown_redis()

View file

@ -91,6 +91,7 @@ class Crypto(BaseSettings):
class DB(BaseSettings):
uri: str
host: str
port: int = 5432
user: str
db: str
password: str
@ -98,6 +99,14 @@ class DB(BaseSettings):
info: bool
pool_size: int = 10
max_overflow: int = 10
echo: bool = False
def get_sqla_url(self):
return f'postgresql+asyncpg://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}'
def get_pg_url(self):
return f'postgresql://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}'
class Log(BaseSettings):
level: str

View file

@ -1,5 +1,6 @@
from contextlib import asynccontextmanager
from typing import Annotated, AsyncContextManager
from typing import Annotated
from collections.abc import AsyncGenerator
from sqlalchemy.ext.asyncio import create_async_engine
from sqlmodel.ext.asyncio.session import AsyncSession
@ -8,22 +9,19 @@ import pandas as pd
from .config import conf
echo = False
pg_url = "postgresql+asyncpg://avgis@localhost/avgis"
engine = create_async_engine(
pg_url,
echo=echo,
conf.db.get_sqla_url(),
echo=conf.db.echo,
pool_size=conf.db.pool_size,
max_overflow=conf.db.max_overflow,
)
async def get_db_session() -> AsyncSession:
async def get_db_session() -> AsyncGenerator[AsyncSession]:
async with AsyncSession(engine) as session:
yield session
@asynccontextmanager
async def db_session() -> AsyncContextManager[AsyncSession]:
async def db_session() -> AsyncGenerator[AsyncSession]:
async with AsyncSession(engine) as session:
yield session

View file

@ -9,9 +9,11 @@ import logging
import pandas as pd
import geopandas as gpd
from asyncpg import connect
from asyncpg.connection import Connection
from asyncpg.exceptions import UndefinedTableError, InterfaceError
from sqlalchemy import text
from redis import asyncio as aioredis
from pydantic import create_model
from .config import conf
# from gisaf.models.live import LiveModel
@ -20,6 +22,7 @@ from .utils import (SHAPELY_TYPE_TO_MAPBOX_TYPE, DEFAULT_MAPBOX_LAYOUT,
from .registry import registry
#from .models.geom import GeomGroup, GeomModel
from .models.geo_models_base import LiveGeoModel
from .database import db_session
logger = logging.getLogger(__name__)
@ -90,6 +93,8 @@ class Store:
- redis: RedisConnection
- pub (/sub) connections
"""
asyncpg_conn: Connection
async def setup(self):
"""
Setup the live service for the main Gisaf application:
@ -328,7 +333,10 @@ class Store:
Postgres/asyncpg listener for the trigger on data change.
A task is created because this function is not asynchronous.
"""
create_task(self.set_ttag(store_name, time()))
if store_name in registry.stores:
create_task(self.set_ttag(store_name, time()))
else:
logger.warn(f'Notify received for an unexisting store: {store_name}')
async def get_ttag(self, store_name):
"""
@ -348,7 +356,7 @@ class Store:
await self.set_ttag(store_name, weak_now_hex)
return weak_now_hex
async def delete_all_ttags(self):
async def delete_all_ttags(self) -> None:
"""
Delete all ttags in redis
"""
@ -357,7 +365,7 @@ class Store:
if keys:
await self.redis.delete(*keys)
async def _setup_db_cache_system(self):
async def _setup_db_cache_system(self) -> None:
"""
Setup the caching system:
- clear all Redis store at startup
@ -365,52 +373,63 @@ class Store:
function are setup on the database server
- listen to the DB event emitter: setup a callback function
"""
## Setup the function and triggers on tables
## Keep the connection alive: don't use a "with" block
## It needs to be closed correctly: see _close_permanant_db_connection
self._permanent_conn = await db.acquire()
self._permanent_raw_conn = await self._permanent_conn.get_raw_connection()
## Create the function in the database
await self._permanent_raw_conn.execute(ttag_function)
## Delete all the ttags, for safety
## eg. the database was changed and Gisaf wasn't running, so the redis store wasn't updated
await store.delete_all_ttags()
## Create DB triggers on the tables of the models
all_triggers = await self._permanent_raw_conn.fetch(get_all_triggers)
stores_with_trigger = {t['trigger_table'] for t in all_triggers if t['tigger_name'] == 'gisaf_ttag'}
missing_triger_tables = set(registry.geom).difference(stores_with_trigger)
if len(missing_triger_tables) > 0:
logger.info(f'Create Postgres modification triggers for {len(missing_triger_tables)} tables')
for store_name in missing_triger_tables:
model = registry.geom[store_name]
try:
await self._permanent_raw_conn.execute(ttag_create_trigger.format(
schema=model.__table__.schema, table=model.__table__.name))
except UndefinedTableError:
logger.warning(f'table {store_name} does not exist in the database: skip modification trigger')
## Setup triggers on Category and Qml, for Mapbox layer styling
for schema, table in (('gisaf_map', 'qml'), ('gisaf_survey', 'category')):
triggers = [t for t in all_triggers
if t['tigger_name'] == 'gisaf_ttag' and t['trigger_table'] == f'{schema}.{table}']
if len(triggers) == 0:
await self._permanent_raw_conn.execute(ttag_create_trigger.format(schema=schema, table=table))
async with db_session() as session:
## Create the function in the database
await session.exec(text(ttag_function))
## Listen: define the callback function
await self._permanent_raw_conn.add_listener('gisaf_ttag', store.create_task_store_ttag)
## Create DB triggers on the tables of the models
all_triggers_resp = await session.exec(text(get_all_triggers))
all_triggers = all_triggers_resp.mappings().all()
stores_with_trigger = {t['trigger_table']
for t in all_triggers
if t['tigger_name'] == 'gisaf_ttag'}
missing_triger_tables = set(registry.geom).difference(stores_with_trigger)
# model: SQLModel = registry.stores.loc[store_name, 'model']
if len(missing_triger_tables) > 0:
logger.info('Create Postgres modification triggers for '
f'{len(missing_triger_tables)} tables')
for store_name in missing_triger_tables:
## XXX: TODO: See https://stackoverflow.com/questions/7888846/trigger-in-sqlachemy
model = registry.geom[store_name]
try:
await session.exec(text(
ttag_create_trigger.format(
schema=model.metadata.schema,
table=model.__tablename__)
))
except UndefinedTableError:
logger.warning(f'table {store_name} does not exist in '
'the database: skip modification trigger')
## Setup triggers on Category and Qml, for Mapbox layer styling
for schema, table in (('gisaf_map', 'qml'), ('gisaf_survey', 'category')):
triggers = [t for t in all_triggers
if t['tigger_name'] == 'gisaf_ttag'
and t['trigger_table'] == f'{schema}.{table}']
if len(triggers) == 0:
await session.exec(text(
ttag_create_trigger.format(schema=schema, table=table)
))
## Listen: define the callback function
self.asyncpg_conn = await connect(conf.db.get_pg_url())
await self.asyncpg_conn.add_listener('gisaf_ttag', store.create_task_store_ttag)
async def _close_permanant_db_connection(self):
"""
Called at aiohttp server shutdown: remove the listener and close the connections
"""
try:
await self._permanent_raw_conn.remove_listener('gisaf_ttag', store.create_task_store_ttag)
await self.asyncpg_conn.remove_listener(
'gisaf_ttag', store.create_task_store_ttag)
except InterfaceError as err:
logger.warning(f'Cannot remove asyncpg listener in _close_permanant_db_connection: {err}')
await self._permanent_raw_conn.close()
await self._permanent_conn.release()
await self.asyncpg_conn.close()
async def setup_redis():

View file

@ -72,10 +72,11 @@ class ModelRegistry:
"""
stores: pd.DataFrame
values: dict[str, PlottableModel]
geom: dict[str, GeoModel]
geom_live: dict[str, LiveGeoModel]
geom_live_defs: dict[str, dict[str, Any]]
geom_custom: dict[str, GeoModel]
geom_custom_store: dict[str, Any]
geom_custom_store: dict[str, GeoModel]
other: dict[str, SQLModel]
misc: dict[str, SQLModel]
raw_survey_models: dict[str, RawSurveyBaseModel]

View file

@ -12,6 +12,8 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler
from .config import conf
from .ipynb_tools import gisaf
from .registry import registry
from .redis_tools import setup_redis, shutdown_redis
from .scheduler import GSFastAPI, js, startup, Settings
from .scheduler_web import app as sched_app
@ -33,10 +35,13 @@ async def lifespan(app: GSFastAPI):
Handle startup and shutdown: setup scheduler, etc
'''
## Startup
await registry.make_registry()
await setup_redis()
await gisaf.setup()
await startup(settings)
js.start()
yield
await shutdown_redis()
## Shutdown
pass