diff --git a/src/gisaf/application.py b/src/gisaf/application.py index e4eabff..524822f 100644 --- a/src/gisaf/application.py +++ b/src/gisaf/application.py @@ -34,6 +34,7 @@ async def lifespan(app: FastAPI): await shutdown_redis() await map_tile_registry.shutdown() + app = FastAPI( debug=False, title=conf.gisaf.title, @@ -46,5 +47,5 @@ app.include_router(api, prefix="/api") app.include_router(geoapi, prefix="/api/gj") app.include_router(admin_api, prefix="/api/admin") app.include_router(dashboard_api, prefix="/api/dashboard") -app.include_router(map_api, prefix='/api/map') -app.include_router(download_api, prefix='/api/download') \ No newline at end of file +app.include_router(map_api, prefix="/api/map") +app.include_router(download_api, prefix="/api/download") diff --git a/src/gisaf/config.py b/src/gisaf/config.py index 89c2032..a4c68d3 100644 --- a/src/gisaf/config.py +++ b/src/gisaf/config.py @@ -289,7 +289,7 @@ class Config(BaseSettings): plugins: dict[str, dict[str, Any]] = {} survey: Survey = Survey() version: str = __version__ - weather_station: dict[str, dict[str, Any]] = {} + weather_station: dict[str, list[dict[str, Any]] | dict[str, Any]] = {} widgets: Widgets = Widgets() @property diff --git a/src/gisaf/redis_tools.py b/src/gisaf/redis_tools.py index cd451e2..72e3bf1 100644 --- a/src/gisaf/redis_tools.py +++ b/src/gisaf/redis_tools.py @@ -443,6 +443,8 @@ async def setup_redis_cache(): async def shutdown_redis(): + if not hasattr(self, 'asyncpg_conn'): + return global store await store._close_permanant_db_connection() diff --git a/src/gisaf/scheduler.py b/src/gisaf/scheduler.py index 3c43b83..2317d28 100755 --- a/src/gisaf/scheduler.py +++ b/src/gisaf/scheduler.py @@ -3,7 +3,9 @@ Gisaf task scheduler, orchestrating the background tasks like remote device data collection, etc. """ + import os +import psutil import logging import sys import asyncio @@ -24,18 +26,17 @@ from apscheduler.triggers.date import DateTrigger # from gisaf.ipynb_tools import Gisaf formatter = logging.Formatter( - "%(asctime)s:%(levelname)s:%(name)s:%(message)s", - "%Y-%m-%d %H:%M:%S" + "%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S" ) for handler in logging.root.handlers: handler.setFormatter(formatter) -logger = logging.getLogger('gisaf.scheduler') +logger = logging.getLogger("gisaf.scheduler") class Settings(BaseSettings): - model_config = SettingsConfigDict(env_prefix='gisaf_scheduler_') - app_name: str = 'Gisaf scheduler' + model_config = SettingsConfigDict(env_prefix="gisaf_scheduler_") + app_name: str = "Gisaf scheduler" job_names: List[str] = [] exclude_job_names: List[str] = [] list: bool = False @@ -45,14 +46,16 @@ class JobBaseClass: """ Base class for all the jobs. """ + task_id = None interval = None cron = None enabled = True - type = '' ## interval, cron or longrun - sched_params = '' - name = '' + type = "" ## interval, cron or longrun + sched_params = "" + name = "" features = None + def __init__(self): self.last_run = None self.current_run = None @@ -69,20 +72,21 @@ class JobBaseClass: """ Subclasses should define a run async function to run """ - logger.info(f'Noop defined for {self.name}') + logger.info(f"Noop defined for {self.name}") class JobScheduler: # gs: Gisaf jobs: dict[str, Any] - tasks: dict[str, Any] + # tasks: dict[str, Any] wss: dict[str, Any] subscribers: set[Any] scheduler: AsyncIOScheduler + def __init__(self): - #self.redis_store = gs.app['store'] + # self.redis_store = gs.app['store'] self.jobs = {} - self.tasks = {} + # self.tasks = {} self.wss = {} self.subscribers = set() self.scheduler = AsyncIOScheduler() @@ -90,21 +94,24 @@ class JobScheduler: def start(self): self.scheduler.start() - def scheduler_event_listener(self, event): - asyncio.create_task(self.scheduler_event_alistener(event)) + # def scheduler_event_listener(self, event): + # asyncio.create_task(self.scheduler_event_alistener(event)) - async def scheduler_event_alistener(self, event): - if isinstance(event, SchedulerStarted): - pid = os.getpid() - logger.debug(f'Scheduler started, pid={pid}') - #await self.gs.app['store'].pub.set('_scheduler/pid', pid) + # async def scheduler_event_alistener(self, event): + # if isinstance(event, SchedulerStarted): + # pid = os.getpid() + # logger.debug(f'Scheduler started, pid={pid}') + # #await self.gs.app['store'].pub.set('_scheduler/pid', pid) async def job_event_added(self, event): task = await self.scheduler.data_store.get_task(event.task_id) - schedules = [ss for ss in await self.scheduler.get_schedules() - if ss.task_id == event.task_id] + schedules = [ + ss + for ss in await self.scheduler.get_schedules() + if ss.task_id == event.task_id + ] if len(schedules) > 1: - logger.warning(f'More than 1 schedule matching task {event.task_id}') + logger.warning(f"More than 1 schedule matching task {event.task_id}") return else: schedule = schedules[0] @@ -155,9 +162,8 @@ class JobScheduler: Send to Redis store """ try: - self.gs.app['store'].pub.publish( - 'admin:scheduler:json', - dumps({'msg': msg}) + self.gs.app["store"].pub.publish( + "admin:scheduler:json", dumps({"msg": msg}) ) except Exception as err: logger.warning(f'Cannot publish updates for "{job.name}" to Redis: {err}') @@ -168,11 +174,7 @@ class JobScheduler: Send to all connected websockets """ for ws in self.wss.values(): - asyncio.create_task( - ws.send_json({ - 'msg': msg - }) - ) + asyncio.create_task(ws.send_json({"msg": msg})) def add_subscription(self, ws): self.wss[id(ws)] = ws @@ -183,7 +185,7 @@ class JobScheduler: def get_available_jobs(self): return [ entry_point.name - for entry_point in entry_points().select(group='gisaf_jobs') + for entry_point in entry_points().select(group="gisaf_jobs") ] async def setup(self, job_names=None, exclude_job_names=None): @@ -193,24 +195,25 @@ class JobScheduler: exclude_job_names = [] ## Go through entry points and define the tasks - for entry_point in entry_points().select(group='gisaf_jobs'): + for entry_point in entry_points().select(group="gisaf_jobs"): ## Eventually skip task according to arguments of the command line - if (entry_point.name in exclude_job_names) \ - or ((len(job_names) > 0) and entry_point.name not in job_names): - logger.info(f'Skip task {entry_point.name}') + if (entry_point.name in exclude_job_names) or ( + (len(job_names) > 0) and entry_point.name not in job_names + ): + logger.info(f"Skip task {entry_point.name}") continue try: task_class = entry_point.load() except Exception as err: - logger.error(f'Task {entry_point.name} skipped cannot be loaded: {err}') + logger.error(f"Task {entry_point.name} skipped cannot be loaded: {err}") continue ## Create the task instance try: - task = task_class(self.gs) + task = task_class() except Exception as err: - logger.error(f'Task {entry_point.name} cannot be instanciated: {err}') + logger.error(f"Task {entry_point.name} cannot be instanciated: {err}") continue task.name = entry_point.name @@ -219,43 +222,48 @@ class JobScheduler: continue logger.debug(f'Add task "{entry_point.name}"') - if not hasattr(task, 'run'): - logger.error(f'Task {entry_point.name} skipped: no run method') + if not hasattr(task, "run"): + logger.error(f"Task {entry_point.name} skipped: no run method") continue task.features = await task.get_feature_ids() - kwargs: dict[str: Any] = { + kwargs: dict[str, Any] = { # 'tags': [entry_point.name], } if isinstance(task.interval, dict): - kwargs['trigger'] = IntervalTrigger(**task.interval) - task.type = 'interval' + kwargs["trigger"] = IntervalTrigger(**task.interval) + kwargs["next_run_time"] = datetime.now() + task.type = "interval" ## TODO: format user friendly text for interval task.sched_params = get_pretty_format_interval(task.interval) elif isinstance(task.cron, dict): ## FIXME: CronTrigger - kwargs['trigger'] = CronTrigger(**task.cron) + kwargs["trigger"] = CronTrigger(**task.cron) kwargs.update(task.cron) - task.type = 'cron' + task.type = "cron" ## TODO: format user friendly text for cron task.sched_params = get_pretty_format_cron(task.cron) else: - task.type = 'longrun' - task.sched_params = 'always running' - kwargs['trigger'] = DateTrigger(datetime.now()) + task.type = "longrun" + task.sched_params = "always running" + kwargs["trigger"] = DateTrigger(datetime.now()) # task.task_id = await self.scheduler.add_job(task.run, **kwargs) # self.tasks[task.task_id] = task # continue ## Create the APScheduler task try: - task.task_id = await self.scheduler.add_schedule(task.run, **kwargs) + task.task_id = self.scheduler.add_job( + task.run, + name=entry_point.name, + **kwargs, + ) except Exception as err: - logger.warning(f'Cannot add task {entry_point.name}: {err}') + logger.warning(f"Cannot add task {entry_point.name}: {err}") logger.exception(err) else: - logger.info(f'Job "{entry_point.name}" added ({task.task_id})') - self.tasks[task.task_id] = task + logger.info(f'Job "{entry_point.name}" added ({task.task_id.id})') + # self.tasks[task.task_id] = task ## Subscribe to all events # self.scheduler.subscribe(self.job_acquired, JobAcquired) @@ -269,15 +277,15 @@ class GSFastAPI(FastAPI): js: JobScheduler -allowed_interval_params = set(('seconds', 'minutes', 'hours', 'days', 'weeks')) +allowed_interval_params = set(("seconds", "minutes", "hours", "days", "weeks")) + + def get_pretty_format_interval(params): """ Return a format for describing interval """ - return str({ - k: v for k, v in params.items() - if k in allowed_interval_params - }) + return str({k: v for k, v in params.items() if k in allowed_interval_params}) + def get_pretty_format_cron(params): """ @@ -290,8 +298,15 @@ async def startup(settings): if settings.list: ## Just print avalable jobs and exit jobs = js.get_available_jobs() - print(' '.join(jobs)) - sys.exit(0) + print(" ".join(jobs)) + # sys.exit(0) + parent_pid = os.getpid() + parent = psutil.Process(parent_pid) + for child in parent.children( + recursive=True + ): # or parent.children() for recursive=False + child.kill() + parent.kill() # try: # await js.gs.setup() # await js.gs.make_models() @@ -300,11 +315,13 @@ async def startup(settings): # logger.exception(err) # sys.exit(1) try: - await js.setup(job_names=settings.job_names, - exclude_job_names=settings.exclude_job_names) + await js.setup( + job_names=settings.job_names, exclude_job_names=settings.exclude_job_names + ) except Exception as err: - logger.error('Cannot setup scheduler') + logger.error("Cannot setup scheduler") logger.exception(err) sys.exit(1) -js = JobScheduler() \ No newline at end of file + +js = JobScheduler() diff --git a/src/gisaf/scheduler_application.py b/src/gisaf/scheduler_application.py index 6295d57..3d54888 100755 --- a/src/gisaf/scheduler_application.py +++ b/src/gisaf/scheduler_application.py @@ -3,43 +3,37 @@ Gisaf job scheduler, orchestrating the background tasks like remote device data collection, etc. """ + import logging from contextlib import asynccontextmanager -from starlette.routing import Mount from fastapi.middleware.cors import CORSMiddleware -from apscheduler.schedulers.asyncio import AsyncIOScheduler from gisaf.config import conf -from gisaf.ipynb_tools import gisaf -from gisaf.registry import registry -from gisaf.redis_tools import setup_redis, shutdown_redis +from gisaf.redis_tools import store, shutdown_redis from gisaf.scheduler import GSFastAPI, js, startup, Settings from gisaf.scheduler_web import app as sched_app formatter = logging.Formatter( - "%(asctime)s:%(levelname)s:%(name)s:%(message)s", - "%Y-%m-%d %H:%M:%S" + "%(asctime)s:%(levelname)s:%(name)s:%(message)s", "%Y-%m-%d %H:%M:%S" ) for handler in logging.root.handlers: handler.setFormatter(formatter) logging.basicConfig(level=conf.gisaf.debugLevel) -logger = logging.getLogger('gisaf.scheduler_application') +logger = logging.getLogger("gisaf.scheduler_application") @asynccontextmanager async def lifespan(app: GSFastAPI): - ''' + """ Handle startup and shutdown: setup scheduler, etc - ''' + """ ## Startup - await registry.make_registry() - await setup_redis() - await gisaf.setup() - await startup(settings) js.start() + await startup(settings) + await store.setup(with_registry=False) yield await shutdown_redis() ## Shutdown @@ -53,10 +47,10 @@ app = GSFastAPI( ) app.add_middleware( CORSMiddleware, - allow_origins=['*'], + allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) -app.mount('/_sched', sched_app) -app.mount('/sched', sched_app) +app.mount("/_sched", sched_app) +app.mount("/sched", sched_app)