gisaf-backend/src/gisaf/scheduler.py
2024-05-09 01:42:11 +02:00

327 lines
10 KiB
Python
Executable file

#!/usr/bin/env python
"""
Gisaf task scheduler, orchestrating the background tasks
like remote device data collection, etc.
"""
import os
import psutil
import logging
import sys
import asyncio
from json import dumps
from datetime import datetime
from importlib.metadata import entry_points
from typing import Any, List
from fastapi import FastAPI
from pydantic_settings import BaseSettings, SettingsConfigDict
# from apscheduler import SchedulerStarted
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
# from gisaf.ipynb_tools import Gisaf
formatter = logging.Formatter(
"%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S"
)
for handler in logging.root.handlers:
handler.setFormatter(formatter)
logger = logging.getLogger("gisaf.scheduler")
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="gisaf_scheduler_")
app_name: str = "Gisaf scheduler"
job_names: List[str] = []
exclude_job_names: List[str] = []
list: bool = False
class JobBaseClass:
"""
Base class for all the jobs.
"""
task_id = None
interval = None
cron = None
enabled = True
type = "" ## interval, cron or longrun
sched_params = ""
name = "<unnammed task>"
features = None
def __init__(self):
self.last_run = None
self.current_run = None
async def get_feature_ids(self):
"""
Subclasses might define a get_features function to inform the
front-ends about the map features it works on.
The scheduler runs this on startup.
"""
return []
async def run(self):
"""
Subclasses should define a run async function to run
"""
logger.info(f"Noop defined for {self.name}")
class JobScheduler:
# gs: Gisaf
jobs: dict[str, Any]
# tasks: dict[str, Any]
wss: dict[str, Any]
subscribers: set[Any]
scheduler: AsyncIOScheduler
def __init__(self):
# self.redis_store = gs.app['store']
self.jobs = {}
# self.tasks = {}
self.wss = {}
self.subscribers = set()
self.scheduler = AsyncIOScheduler()
def start(self):
self.scheduler.start()
# def scheduler_event_listener(self, event):
# asyncio.create_task(self.scheduler_event_alistener(event))
# async def scheduler_event_alistener(self, event):
# if isinstance(event, SchedulerStarted):
# pid = os.getpid()
# logger.debug(f'Scheduler started, pid={pid}')
# #await self.gs.app['store'].pub.set('_scheduler/pid', pid)
async def job_event_added(self, event):
task = await self.scheduler.data_store.get_task(event.task_id)
schedules = [
ss
for ss in await self.scheduler.get_schedules()
if ss.task_id == event.task_id
]
if len(schedules) > 1:
logger.warning(f"More than 1 schedule matching task {event.task_id}")
return
else:
schedule = schedules[0]
async def job_acquired(self, event):
pass
async def job_cancelled(self, event):
pass
async def job_released(self, event):
pass
# task = self.tasks.get(event.job_id)
# if not task:
# breakpoint()
# logger.warning(f'Got an event {event} for unregistered task {event.task_id}')
# return
# if isinstance(event, apscheduler.JobCancelled): #events.EVENT_JOB_ERROR:
# msg = f'"{task.name}" cancelled ({task.task_id})'
# task.last_run = event
# task.current_run = None
# logger.warning(msg)
# ## TODO: try to restart the task
# elif isinstance(event, apscheduler.JobAcquired): #events.EVENT_JOB_SUBMITTED:
# ## XXX: should be task.last_run = None
# task.last_run = event
# task.current_run = event
# msg = f'"{task.name}" started ({task.task_id})'
# elif isinstance(event, apscheduler.JobReleased): #events.EVENT_JOB_EXECUTED:
# task.last_run = event
# task.current_run = None
# msg = f'"{task.name}" worked ({task.task_id})'
# else:
# logger.info(f'*********** Unhandled event: {event}')
# pass
# #await self.send_to_redis_store(task, event, msg)
# ## Send to notification graphql websockets subscribers
# for queue in self.subscribers:
# queue.put_nowait((task, event))
# ## Send raw messages through websockets
# await self.send_to_websockets(task, event, msg)
async def send_to_redis_store(self, job, event, msg):
"""
Send to Redis store
"""
try:
self.gs.app["store"].pub.publish(
"admin:scheduler:json", dumps({"msg": msg})
)
except Exception as err:
logger.warning(f'Cannot publish updates for "{job.name}" to Redis: {err}')
logger.exception(err)
async def send_to_websockets(self, job, event, msg):
"""
Send to all connected websockets
"""
for ws in self.wss.values():
asyncio.create_task(ws.send_json({"msg": msg}))
def add_subscription(self, ws):
self.wss[id(ws)] = ws
def delete_subscription(self, ws):
del self.wss[id(ws)]
def get_available_jobs(self):
return [
entry_point.name
for entry_point in entry_points().select(group="gisaf_jobs")
]
async def setup(self, job_names=None, exclude_job_names=None):
if job_names is None:
job_names = []
if exclude_job_names is None:
exclude_job_names = []
## Go through entry points and define the tasks
for entry_point in entry_points().select(group="gisaf_jobs"):
## Eventually skip task according to arguments of the command line
if (entry_point.name in exclude_job_names) or (
(len(job_names) > 0) and entry_point.name not in job_names
):
logger.info(f"Skip task {entry_point.name}")
continue
try:
task_class = entry_point.load()
except Exception as err:
logger.error(f"Task {entry_point.name} skipped cannot be loaded: {err}")
continue
## Create the task instance
try:
task = task_class()
except Exception as err:
logger.error(f"Task {entry_point.name} cannot be instanciated: {err}")
continue
task.name = entry_point.name
if not task.enabled:
logger.debug(f'Job "{entry_point.name}" disabled')
continue
logger.debug(f'Add task "{entry_point.name}"')
if not hasattr(task, "run"):
logger.error(f"Task {entry_point.name} skipped: no run method")
continue
task.features = await task.get_feature_ids()
kwargs: dict[str, Any] = {
# 'tags': [entry_point.name],
}
if isinstance(task.interval, dict):
kwargs["trigger"] = IntervalTrigger(**task.interval)
kwargs["next_run_time"] = datetime.now()
task.type = "interval"
## TODO: format user friendly text for interval
task.sched_params = get_pretty_format_interval(task.interval)
elif isinstance(task.cron, dict):
## FIXME: CronTrigger
kwargs["trigger"] = CronTrigger(**task.cron)
kwargs.update(task.cron)
task.type = "cron"
## TODO: format user friendly text for cron
task.sched_params = get_pretty_format_cron(task.cron)
else:
task.type = "longrun"
task.sched_params = "always running"
kwargs["trigger"] = DateTrigger(datetime.now())
# task.task_id = await self.scheduler.add_job(task.run, **kwargs)
# self.tasks[task.task_id] = task
# continue
## Create the APScheduler task
try:
task.task_id = self.scheduler.add_job(
task.run,
name=entry_point.name,
**kwargs,
)
except Exception as err:
logger.warning(f"Cannot add task {entry_point.name}: {err}")
logger.exception(err)
else:
logger.info(f'Job "{entry_point.name}" added ({task.task_id.id})')
# self.tasks[task.task_id] = task
## Subscribe to all events
# self.scheduler.subscribe(self.job_acquired, JobAcquired)
# self.scheduler.subscribe(self.job_cancelled, JobCancelled)
# self.scheduler.subscribe(self.job_released, JobReleased)
# self.scheduler.subscribe(self.job_event_added, JobAdded)
# self.scheduler.subscribe(self.scheduler_event_listener, SchedulerEvent)
class GSFastAPI(FastAPI):
js: JobScheduler
allowed_interval_params = set(("seconds", "minutes", "hours", "days", "weeks"))
def get_pretty_format_interval(params):
"""
Return a format for describing interval
"""
return str({k: v for k, v in params.items() if k in allowed_interval_params})
def get_pretty_format_cron(params):
"""
Return a format for describing cron
"""
return str(params)
async def startup(settings):
if settings.list:
## Just print avalable jobs and exit
jobs = js.get_available_jobs()
print(" ".join(jobs))
# sys.exit(0)
parent_pid = os.getpid()
parent = psutil.Process(parent_pid)
for child in parent.children(
recursive=True
): # or parent.children() for recursive=False
child.kill()
parent.kill()
# try:
# await js.gs.setup()
# await js.gs.make_models()
# except Exception as err:
# logger.error('Cannot setup Gisaf')
# logger.exception(err)
# sys.exit(1)
try:
await js.setup(
job_names=settings.job_names, exclude_job_names=settings.exclude_job_names
)
except Exception as err:
logger.error("Cannot setup scheduler")
logger.exception(err)
sys.exit(1)
js = JobScheduler()