#!/usr/bin/env python """ Gisaf task scheduler, orchestrating the background tasks like remote device data collection, etc. """ import os import logging import sys import asyncio from json import dumps from datetime import datetime from importlib.metadata import entry_points from typing import Any, Mapping, List from fastapi import FastAPI from pydantic_settings import BaseSettings, SettingsConfigDict # from apscheduler import SchedulerStarted from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.date import DateTrigger # from gisaf.ipynb_tools import Gisaf formatter = logging.Formatter( "%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S" ) for handler in logging.root.handlers: handler.setFormatter(formatter) logger = logging.getLogger('gisaf.scheduler') class Settings(BaseSettings): model_config = SettingsConfigDict(env_prefix='gisaf_scheduler_') app_name: str = 'Gisaf scheduler' job_names: List[str] = [] exclude_job_names: List[str] = [] list: bool = False class JobBaseClass: """ Base class for all the jobs. """ task_id = None interval = None cron = None enabled = True type = '' ## interval, cron or longrun sched_params = '' name = '' features = None def __init__(self): self.last_run = None self.current_run = None async def get_feature_ids(self): """ Subclasses might define a get_features function to inform the front-ends about the map features it works on. The scheduler runs this on startup. """ return [] async def run(self): """ Subclasses should define a run async function to run """ logger.info(f'Noop defined for {self.name}') class JobScheduler: # gs: Gisaf jobs: dict[str, Any] tasks: dict[str, Any] wss: dict[str, Any] subscribers: set[Any] scheduler: AsyncIOScheduler def __init__(self): #self.redis_store = gs.app['store'] self.jobs = {} self.tasks = {} self.wss = {} self.subscribers = set() self.scheduler = AsyncIOScheduler() def start(self): self.scheduler.start() def scheduler_event_listener(self, event): asyncio.create_task(self.scheduler_event_alistener(event)) async def scheduler_event_alistener(self, event): if isinstance(event, SchedulerStarted): pid = os.getpid() logger.debug(f'Scheduler started, pid={pid}') #await self.gs.app['store'].pub.set('_scheduler/pid', pid) async def job_event_added(self, event): task = await self.scheduler.data_store.get_task(event.task_id) schedules = [ss for ss in await self.scheduler.get_schedules() if ss.task_id == event.task_id] if len(schedules) > 1: logger.warning(f'More than 1 schedule matching task {event.task_id}') return else: schedule = schedules[0] async def job_acquired(self, event): pass async def job_cancelled(self, event): pass async def job_released(self, event): pass # task = self.tasks.get(event.job_id) # if not task: # breakpoint() # logger.warning(f'Got an event {event} for unregistered task {event.task_id}') # return # if isinstance(event, apscheduler.JobCancelled): #events.EVENT_JOB_ERROR: # msg = f'"{task.name}" cancelled ({task.task_id})' # task.last_run = event # task.current_run = None # logger.warning(msg) # ## TODO: try to restart the task # elif isinstance(event, apscheduler.JobAcquired): #events.EVENT_JOB_SUBMITTED: # ## XXX: should be task.last_run = None # task.last_run = event # task.current_run = event # msg = f'"{task.name}" started ({task.task_id})' # elif isinstance(event, apscheduler.JobReleased): #events.EVENT_JOB_EXECUTED: # task.last_run = event # task.current_run = None # msg = f'"{task.name}" worked ({task.task_id})' # else: # logger.info(f'*********** Unhandled event: {event}') # pass # #await self.send_to_redis_store(task, event, msg) # ## Send to notification graphql websockets subscribers # for queue in self.subscribers: # queue.put_nowait((task, event)) # ## Send raw messages through websockets # await self.send_to_websockets(task, event, msg) async def send_to_redis_store(self, job, event, msg): """ Send to Redis store """ try: self.gs.app['store'].pub.publish( 'admin:scheduler:json', dumps({'msg': msg}) ) except Exception as err: logger.warning(f'Cannot publish updates for "{job.name}" to Redis: {err}') logger.exception(err) async def send_to_websockets(self, job, event, msg): """ Send to all connected websockets """ for ws in self.wss.values(): asyncio.create_task( ws.send_json({ 'msg': msg }) ) def add_subscription(self, ws): self.wss[id(ws)] = ws def delete_subscription(self, ws): del self.wss[id(ws)] def get_available_jobs(self): return [ entry_point.name for entry_point in entry_points().select(group='gisaf_jobs') ] async def setup(self, job_names=None, exclude_job_names=None): if job_names is None: job_names = [] if exclude_job_names is None: exclude_job_names = [] ## Go through entry points and define the tasks for entry_point in entry_points().select(group='gisaf_jobs'): ## Eventually skip task according to arguments of the command line if (entry_point.name in exclude_job_names) \ or ((len(job_names) > 0) and entry_point.name not in job_names): logger.info(f'Skip task {entry_point.name}') continue try: task_class = entry_point.load() except Exception as err: logger.error(f'Task {entry_point.name} skipped cannot be loaded: {err}') continue ## Create the task instance try: task = task_class(self.gs) except Exception as err: logger.error(f'Task {entry_point.name} cannot be instanciated: {err}') continue task.name = entry_point.name if not task.enabled: logger.debug(f'Job "{entry_point.name}" disabled') continue logger.debug(f'Add task "{entry_point.name}"') if not hasattr(task, 'run'): logger.error(f'Task {entry_point.name} skipped: no run method') continue task.features = await task.get_feature_ids() kwargs: dict[str: Any] = { # 'tags': [entry_point.name], } if isinstance(task.interval, dict): kwargs['trigger'] = IntervalTrigger(**task.interval) task.type = 'interval' ## TODO: format user friendly text for interval task.sched_params = get_pretty_format_interval(task.interval) elif isinstance(task.cron, dict): ## FIXME: CronTrigger kwargs['trigger'] = CronTrigger(**task.cron) kwargs.update(task.cron) task.type = 'cron' ## TODO: format user friendly text for cron task.sched_params = get_pretty_format_cron(task.cron) else: task.type = 'longrun' task.sched_params = 'always running' kwargs['trigger'] = DateTrigger(datetime.now()) # task.task_id = await self.scheduler.add_job(task.run, **kwargs) # self.tasks[task.task_id] = task # continue ## Create the APScheduler task try: task.task_id = await self.scheduler.add_schedule(task.run, **kwargs) except Exception as err: logger.warning(f'Cannot add task {entry_point.name}: {err}') logger.exception(err) else: logger.info(f'Job "{entry_point.name}" added ({task.task_id})') self.tasks[task.task_id] = task ## Subscribe to all events # self.scheduler.subscribe(self.job_acquired, JobAcquired) # self.scheduler.subscribe(self.job_cancelled, JobCancelled) # self.scheduler.subscribe(self.job_released, JobReleased) # self.scheduler.subscribe(self.job_event_added, JobAdded) # self.scheduler.subscribe(self.scheduler_event_listener, SchedulerEvent) class GSFastAPI(FastAPI): js: JobScheduler allowed_interval_params = set(('seconds', 'minutes', 'hours', 'days', 'weeks')) def get_pretty_format_interval(params): """ Return a format for describing interval """ return str({ k: v for k, v in params.items() if k in allowed_interval_params }) def get_pretty_format_cron(params): """ Return a format for describing cron """ return str(params) async def startup(settings): if settings.list: ## Just print avalable jobs and exit jobs = js.get_available_jobs() print(' '.join(jobs)) sys.exit(0) # try: # await js.gs.setup() # await js.gs.make_models() # except Exception as err: # logger.error('Cannot setup Gisaf') # logger.exception(err) # sys.exit(1) try: await js.setup(job_names=settings.job_names, exclude_job_names=settings.exclude_job_names) except Exception as err: logger.error('Cannot setup scheduler') logger.exception(err) sys.exit(1) js = JobScheduler()