Fix/update scheduler with GisafNG

This commit is contained in:
phil 2024-05-09 00:04:53 +02:00
parent 81d6d5ad83
commit 77adcceb18
5 changed files with 97 additions and 83 deletions

View file

@ -34,6 +34,7 @@ async def lifespan(app: FastAPI):
await shutdown_redis()
await map_tile_registry.shutdown()
app = FastAPI(
debug=False,
title=conf.gisaf.title,
@ -46,5 +47,5 @@ app.include_router(api, prefix="/api")
app.include_router(geoapi, prefix="/api/gj")
app.include_router(admin_api, prefix="/api/admin")
app.include_router(dashboard_api, prefix="/api/dashboard")
app.include_router(map_api, prefix='/api/map')
app.include_router(download_api, prefix='/api/download')
app.include_router(map_api, prefix="/api/map")
app.include_router(download_api, prefix="/api/download")

View file

@ -289,7 +289,7 @@ class Config(BaseSettings):
plugins: dict[str, dict[str, Any]] = {}
survey: Survey = Survey()
version: str = __version__
weather_station: dict[str, dict[str, Any]] = {}
weather_station: dict[str, list[dict[str, Any]] | dict[str, Any]] = {}
widgets: Widgets = Widgets()
@property

View file

@ -443,6 +443,8 @@ async def setup_redis_cache():
async def shutdown_redis():
if not hasattr(self, 'asyncpg_conn'):
return
global store
await store._close_permanant_db_connection()

View file

@ -3,7 +3,9 @@
Gisaf task scheduler, orchestrating the background tasks
like remote device data collection, etc.
"""
import os
import psutil
import logging
import sys
import asyncio
@ -24,18 +26,17 @@ from apscheduler.triggers.date import DateTrigger
# from gisaf.ipynb_tools import Gisaf
formatter = logging.Formatter(
"%(asctime)s:%(levelname)s:%(name)s:%(message)s",
"%Y-%m-%d %H:%M:%S"
"%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S"
)
for handler in logging.root.handlers:
handler.setFormatter(formatter)
logger = logging.getLogger('gisaf.scheduler')
logger = logging.getLogger("gisaf.scheduler")
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix='gisaf_scheduler_')
app_name: str = 'Gisaf scheduler'
model_config = SettingsConfigDict(env_prefix="gisaf_scheduler_")
app_name: str = "Gisaf scheduler"
job_names: List[str] = []
exclude_job_names: List[str] = []
list: bool = False
@ -45,14 +46,16 @@ class JobBaseClass:
"""
Base class for all the jobs.
"""
task_id = None
interval = None
cron = None
enabled = True
type = '' ## interval, cron or longrun
sched_params = ''
name = '<unnammed task>'
type = "" ## interval, cron or longrun
sched_params = ""
name = "<unnammed task>"
features = None
def __init__(self):
self.last_run = None
self.current_run = None
@ -69,20 +72,21 @@ class JobBaseClass:
"""
Subclasses should define a run async function to run
"""
logger.info(f'Noop defined for {self.name}')
logger.info(f"Noop defined for {self.name}")
class JobScheduler:
# gs: Gisaf
jobs: dict[str, Any]
tasks: dict[str, Any]
# tasks: dict[str, Any]
wss: dict[str, Any]
subscribers: set[Any]
scheduler: AsyncIOScheduler
def __init__(self):
#self.redis_store = gs.app['store']
# self.redis_store = gs.app['store']
self.jobs = {}
self.tasks = {}
# self.tasks = {}
self.wss = {}
self.subscribers = set()
self.scheduler = AsyncIOScheduler()
@ -90,21 +94,24 @@ class JobScheduler:
def start(self):
self.scheduler.start()
def scheduler_event_listener(self, event):
asyncio.create_task(self.scheduler_event_alistener(event))
# def scheduler_event_listener(self, event):
# asyncio.create_task(self.scheduler_event_alistener(event))
async def scheduler_event_alistener(self, event):
if isinstance(event, SchedulerStarted):
pid = os.getpid()
logger.debug(f'Scheduler started, pid={pid}')
#await self.gs.app['store'].pub.set('_scheduler/pid', pid)
# async def scheduler_event_alistener(self, event):
# if isinstance(event, SchedulerStarted):
# pid = os.getpid()
# logger.debug(f'Scheduler started, pid={pid}')
# #await self.gs.app['store'].pub.set('_scheduler/pid', pid)
async def job_event_added(self, event):
task = await self.scheduler.data_store.get_task(event.task_id)
schedules = [ss for ss in await self.scheduler.get_schedules()
if ss.task_id == event.task_id]
schedules = [
ss
for ss in await self.scheduler.get_schedules()
if ss.task_id == event.task_id
]
if len(schedules) > 1:
logger.warning(f'More than 1 schedule matching task {event.task_id}')
logger.warning(f"More than 1 schedule matching task {event.task_id}")
return
else:
schedule = schedules[0]
@ -155,9 +162,8 @@ class JobScheduler:
Send to Redis store
"""
try:
self.gs.app['store'].pub.publish(
'admin:scheduler:json',
dumps({'msg': msg})
self.gs.app["store"].pub.publish(
"admin:scheduler:json", dumps({"msg": msg})
)
except Exception as err:
logger.warning(f'Cannot publish updates for "{job.name}" to Redis: {err}')
@ -168,11 +174,7 @@ class JobScheduler:
Send to all connected websockets
"""
for ws in self.wss.values():
asyncio.create_task(
ws.send_json({
'msg': msg
})
)
asyncio.create_task(ws.send_json({"msg": msg}))
def add_subscription(self, ws):
self.wss[id(ws)] = ws
@ -183,7 +185,7 @@ class JobScheduler:
def get_available_jobs(self):
return [
entry_point.name
for entry_point in entry_points().select(group='gisaf_jobs')
for entry_point in entry_points().select(group="gisaf_jobs")
]
async def setup(self, job_names=None, exclude_job_names=None):
@ -193,24 +195,25 @@ class JobScheduler:
exclude_job_names = []
## Go through entry points and define the tasks
for entry_point in entry_points().select(group='gisaf_jobs'):
for entry_point in entry_points().select(group="gisaf_jobs"):
## Eventually skip task according to arguments of the command line
if (entry_point.name in exclude_job_names) \
or ((len(job_names) > 0) and entry_point.name not in job_names):
logger.info(f'Skip task {entry_point.name}')
if (entry_point.name in exclude_job_names) or (
(len(job_names) > 0) and entry_point.name not in job_names
):
logger.info(f"Skip task {entry_point.name}")
continue
try:
task_class = entry_point.load()
except Exception as err:
logger.error(f'Task {entry_point.name} skipped cannot be loaded: {err}')
logger.error(f"Task {entry_point.name} skipped cannot be loaded: {err}")
continue
## Create the task instance
try:
task = task_class(self.gs)
task = task_class()
except Exception as err:
logger.error(f'Task {entry_point.name} cannot be instanciated: {err}')
logger.error(f"Task {entry_point.name} cannot be instanciated: {err}")
continue
task.name = entry_point.name
@ -219,43 +222,48 @@ class JobScheduler:
continue
logger.debug(f'Add task "{entry_point.name}"')
if not hasattr(task, 'run'):
logger.error(f'Task {entry_point.name} skipped: no run method')
if not hasattr(task, "run"):
logger.error(f"Task {entry_point.name} skipped: no run method")
continue
task.features = await task.get_feature_ids()
kwargs: dict[str: Any] = {
kwargs: dict[str, Any] = {
# 'tags': [entry_point.name],
}
if isinstance(task.interval, dict):
kwargs['trigger'] = IntervalTrigger(**task.interval)
task.type = 'interval'
kwargs["trigger"] = IntervalTrigger(**task.interval)
kwargs["next_run_time"] = datetime.now()
task.type = "interval"
## TODO: format user friendly text for interval
task.sched_params = get_pretty_format_interval(task.interval)
elif isinstance(task.cron, dict):
## FIXME: CronTrigger
kwargs['trigger'] = CronTrigger(**task.cron)
kwargs["trigger"] = CronTrigger(**task.cron)
kwargs.update(task.cron)
task.type = 'cron'
task.type = "cron"
## TODO: format user friendly text for cron
task.sched_params = get_pretty_format_cron(task.cron)
else:
task.type = 'longrun'
task.sched_params = 'always running'
kwargs['trigger'] = DateTrigger(datetime.now())
task.type = "longrun"
task.sched_params = "always running"
kwargs["trigger"] = DateTrigger(datetime.now())
# task.task_id = await self.scheduler.add_job(task.run, **kwargs)
# self.tasks[task.task_id] = task
# continue
## Create the APScheduler task
try:
task.task_id = await self.scheduler.add_schedule(task.run, **kwargs)
task.task_id = self.scheduler.add_job(
task.run,
name=entry_point.name,
**kwargs,
)
except Exception as err:
logger.warning(f'Cannot add task {entry_point.name}: {err}')
logger.warning(f"Cannot add task {entry_point.name}: {err}")
logger.exception(err)
else:
logger.info(f'Job "{entry_point.name}" added ({task.task_id})')
self.tasks[task.task_id] = task
logger.info(f'Job "{entry_point.name}" added ({task.task_id.id})')
# self.tasks[task.task_id] = task
## Subscribe to all events
# self.scheduler.subscribe(self.job_acquired, JobAcquired)
@ -269,15 +277,15 @@ class GSFastAPI(FastAPI):
js: JobScheduler
allowed_interval_params = set(('seconds', 'minutes', 'hours', 'days', 'weeks'))
allowed_interval_params = set(("seconds", "minutes", "hours", "days", "weeks"))
def get_pretty_format_interval(params):
"""
Return a format for describing interval
"""
return str({
k: v for k, v in params.items()
if k in allowed_interval_params
})
return str({k: v for k, v in params.items() if k in allowed_interval_params})
def get_pretty_format_cron(params):
"""
@ -290,8 +298,15 @@ async def startup(settings):
if settings.list:
## Just print avalable jobs and exit
jobs = js.get_available_jobs()
print(' '.join(jobs))
sys.exit(0)
print(" ".join(jobs))
# sys.exit(0)
parent_pid = os.getpid()
parent = psutil.Process(parent_pid)
for child in parent.children(
recursive=True
): # or parent.children() for recursive=False
child.kill()
parent.kill()
# try:
# await js.gs.setup()
# await js.gs.make_models()
@ -300,11 +315,13 @@ async def startup(settings):
# logger.exception(err)
# sys.exit(1)
try:
await js.setup(job_names=settings.job_names,
exclude_job_names=settings.exclude_job_names)
await js.setup(
job_names=settings.job_names, exclude_job_names=settings.exclude_job_names
)
except Exception as err:
logger.error('Cannot setup scheduler')
logger.error("Cannot setup scheduler")
logger.exception(err)
sys.exit(1)
js = JobScheduler()
js = JobScheduler()

View file

@ -3,43 +3,37 @@
Gisaf job scheduler, orchestrating the background tasks
like remote device data collection, etc.
"""
import logging
from contextlib import asynccontextmanager
from starlette.routing import Mount
from fastapi.middleware.cors import CORSMiddleware
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from gisaf.config import conf
from gisaf.ipynb_tools import gisaf
from gisaf.registry import registry
from gisaf.redis_tools import setup_redis, shutdown_redis
from gisaf.redis_tools import store, shutdown_redis
from gisaf.scheduler import GSFastAPI, js, startup, Settings
from gisaf.scheduler_web import app as sched_app
formatter = logging.Formatter(
"%(asctime)s:%(levelname)s:%(name)s:%(message)s",
"%Y-%m-%d %H:%M:%S"
"%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S"
)
for handler in logging.root.handlers:
handler.setFormatter(formatter)
logging.basicConfig(level=conf.gisaf.debugLevel)
logger = logging.getLogger('gisaf.scheduler_application')
logger = logging.getLogger("gisaf.scheduler_application")
@asynccontextmanager
async def lifespan(app: GSFastAPI):
'''
"""
Handle startup and shutdown: setup scheduler, etc
'''
"""
## Startup
await registry.make_registry()
await setup_redis()
await gisaf.setup()
await startup(settings)
js.start()
await startup(settings)
await store.setup(with_registry=False)
yield
await shutdown_redis()
## Shutdown
@ -53,10 +47,10 @@ app = GSFastAPI(
)
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount('/_sched', sched_app)
app.mount('/sched', sched_app)
app.mount("/_sched", sched_app)
app.mount("/sched", sched_app)