From 8244e3dc1f546caace08b46bc295568434a9c154 Mon Sep 17 00:00:00 2001 From: phil Date: Thu, 16 May 2024 11:58:02 +0200 Subject: [PATCH] Graph: fix resampling --- src/gisaf/api/main.py | 295 ++++++++++++++++++++++++------------------ 1 file changed, 168 insertions(+), 127 deletions(-) diff --git a/src/gisaf/api/main.py b/src/gisaf/api/main.py index 956f2da..05e30a8 100644 --- a/src/gisaf/api/main.py +++ b/src/gisaf/api/main.py @@ -9,26 +9,38 @@ from sqlalchemy.orm import selectinload, joinedload from sqlalchemy.orm.attributes import QueryableAttribute from fastapi.security import OAuth2PasswordRequestForm from sqlmodel import select +from pandas.tseries.frequencies import to_offset -from gisaf.models.authentication import (User, UserRead, Role, RoleRead) +from gisaf.models.authentication import User, UserRead, Role, RoleRead from gisaf.models.category import Category, CategoryRead from gisaf.models.geo_models_base import GeoModel, PlottableModel -from gisaf.models.info import (ActionParam, ActionsResults, ActionsStore, - FormFieldInput, LegendItem, - ModelAction, ModelInfo, - DataProvider, ModelValue, PlotParams, - TagActions) +from gisaf.models.info import ( + ActionParam, + ActionsResults, + ActionsStore, + FormFieldInput, + LegendItem, + ModelAction, + ModelInfo, + DataProvider, + ModelValue, + PlotParams, + TagActions, +) from gisaf.models.measures import MeasuresItem from gisaf.models.survey import Equipment, SurveyMeta, Surveyor from gisaf.config import conf from gisaf.models.bootstrap import BootstrapData from gisaf.models.store import Store, StoreNameOnly from gisaf.models.project import Project -from gisaf.models.authentication import UserRoleLink #, ACL +from gisaf.models.authentication import UserRoleLink # , ACL from gisaf.database import pandas_query, fastapi_db_session as db_session from gisaf.security import ( - Token, authenticate_user, get_current_active_user, create_access_token, - ) + Token, + authenticate_user, + get_current_active_user, + create_access_token, +) from gisaf.registry import registry, NotInRegistry from gisaf.custom_store_base import BaseStore from gisaf.redis_tools import store as redis_store @@ -45,81 +57,87 @@ api = APIRouter( # dependencies=[Depends(get_token_header)], responses={404: {"description": "Not found"}}, ) -#api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret) +# api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret) -@api.get('/bootstrap') + +@api.get("/bootstrap") async def bootstrap( - user: Annotated[UserRead, Depends(get_current_active_user)] - ) -> BootstrapData: + user: Annotated[UserRead, Depends(get_current_active_user)], +) -> BootstrapData: return BootstrapData(user=user) @api.post("/token") async def login_for_access_token( - form_data: OAuth2PasswordRequestForm = Depends() - ) -> Token: + form_data: OAuth2PasswordRequestForm = Depends(), +) -> Token: user = await authenticate_user(form_data.username, form_data.password) if not user: - logger.info(f'{form_data.username} failed attempt to get token') + logger.info(f"{form_data.username} failed attempt to get token") raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token = create_access_token( - data={"sub": user.username}, - expires_delta=timedelta(seconds=conf.crypto.expire)) - logger.info(f'{user.username} ({user.id}) logged in') - return Token(access_token=access_token, token_type='bearer') + data={"sub": user.username}, expires_delta=timedelta(seconds=conf.crypto.expire) + ) + logger.info(f"{user.username} ({user.id}) logged in") + return Token(access_token=access_token, token_type="bearer") -@api.get('/logout') -async def logout( - user: Annotated[UserRead, Depends(get_current_active_user)]): +@api.get("/logout") +async def logout(user: Annotated[UserRead, Depends(get_current_active_user)]): if user is not None: - logger.info(f'{user.username} ({user.id}) logged out') + logger.info(f"{user.username} ({user.id}) logged out") @api.get("/users") async def get_users( db_session: db_session, - ) -> list[UserRead]: - query = select(User).options(selectinload(User.roles)) # type: ignore[arg-type] +) -> list[UserRead]: + query = select(User).options(selectinload(User.roles)) # type: ignore[arg-type] data = await db_session.exec(query) - return data.all() # type: ignore[return-value] + return data.all() # type: ignore[return-value] + @api.get("/roles") async def get_roles( db_session: db_session, - ) -> list[RoleRead]: - query = select(Role).options(selectinload(Role.users)) # type: ignore[arg-type] +) -> list[RoleRead]: + query = select(Role).options(selectinload(Role.users)) # type: ignore[arg-type] data = await db_session.exec(query) - return data.all() # type: ignore[return-value] + return data.all() # type: ignore[return-value] -@api.get('/acls') -async def get_acls(db_session: db_session, - user: Annotated[User, Depends(get_current_active_user)]) -> list[UserRoleLink]: + +@api.get("/acls") +async def get_acls( + db_session: db_session, user: Annotated[User, Depends(get_current_active_user)] +) -> list[UserRoleLink]: """New: ACLs returned as UserRoleLink""" - if user is not None or not user.has_role('manager'): + if user is not None or not user.has_role("manager"): raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED) data = await db_session.exec(select(UserRoleLink)) - return data.all() # type: ignore[return-value] + return data.all() # type: ignore[return-value] + @api.get("/categories") async def get_categories( db_session: db_session, - ) -> list[CategoryRead]: +) -> list[CategoryRead]: query = select(Category) data = await db_session.exec(query) - return data.all() # type: ignore[return-value] + return data.all() # type: ignore[return-value] + @api.get("/categories_pandas") async def get_categories_p( db_session: db_session, - ) -> list[CategoryRead]: +) -> list[CategoryRead]: query = select(Category) df = await db_session.run_sync(pandas_query, query) - return df.to_dict(orient="records") # type: ignore[return-value] + return df.to_dict(orient="records") # type: ignore[return-value] + # @api.get("/list") @api.get("/data-providers") @@ -132,14 +150,17 @@ async def list_data_providers() -> list[DataProvider]: DataProvider( store=model.get_store_name(), name=model.__name__, - values=[value.get_store_name() for value in values] - ) for model, values in registry.values_for_model.items()] + values=[value.get_store_name() for value in values], + ) + for model, values in registry.values_for_model.items() + ] + @api.get("/data-provider/{store}") async def get_model_list( store: str, db_session: db_session, - ) -> list[MeasuresItem]: +) -> list[MeasuresItem]: """ Json REST store API compatible with Flask Potion and Angular Get the list of items (used for making the list of items in measures) @@ -151,21 +172,23 @@ async def get_model_list( raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) model: type[PlottableModel] = store_record.model # FIXME: get only the first model of values - values_models = registry.values_for_model.get(model) # type: ignore + values_models = registry.values_for_model.get(model) # type: ignore if values_models is None or len(values_models) == 0: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) values_model = values_models[0] try: - ref_id_attr: QueryableAttribute = getattr(values_model, 'ref_id') + ref_id_attr: QueryableAttribute = getattr(values_model, "ref_id") except AttributeError: - raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - detail=f'No ref_id defined for {values_model.__name__}') + raise HTTPException( + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + detail=f"No ref_id defined for {values_model.__name__}", + ) data = await db_session.exec( select(ref_id_attr, func.count(ref_id_attr)).group_by(ref_id_attr) ) counts = dict(data.all()) - objs = await db_session.exec(select(model).options( - *(joinedload(jt) for jt in model.selectinload())) + objs = await db_session.exec( + select(model).options(*(joinedload(jt) for jt in model.selectinload())) ) resp = [ MeasuresItem( @@ -178,7 +201,8 @@ async def get_model_list( ] return resp -@api.get('/{store_name}/values/{value}') + +@api.get("/{store_name}/values/{value}") async def get_model_values( db_session: db_session, store_name: str, @@ -186,65 +210,68 @@ async def get_model_values( response: Response, where: str, resample: str | None = None, - ): +): """ Get values """ - comment = '' + comment = "" ## Get the request's args, i.e. the where clause of the DB query model_query = loads(where) # store_name = [k for k in model_query.keys()][0] model_id = model_query[store_name] model: GeoModel - model = registry.geom.get(store_name) # type: ignore + model = registry.geom.get(store_name) # type: ignore if model is None: raise HTTPException(status.HTTP_404_NOT_FOUND) values_model = registry.values_for_model.get(model)[0] ## Allow custom getter - getter = getattr(values_model, f'get_{value}', None) + getter = getattr(values_model, f"get_{value}", None) if getter: df = await getter(model_id) else: item = await db_session.get(model, model_id) - df = await values_model.get_as_dataframe(item=item, - with_only_columns=[value]) + df = await values_model.get_as_dataframe(item=item, with_only_columns=[value]) if len(df) == 0: return [] - if resample is not None and resample != '0': + if resample is not None and resample != "0": ## Model defines how to resample - value_defs = [v for v in values_model.values if v['name'] == value] - rule = request.query['resample'] + value_defs = [v for v in values_model.values if v["name"] == value] if len(value_defs) > 0: value_defs = value_defs[0] else: value_defs = {} - if hasattr(values_model, 'resampling_args') \ - and value in values_model.resampling_args \ - and rule in values_model.resampling_args[value]: - resampling_args = values_model.resampling_args[value][rule].copy() - comment = resampling_args.pop('comment', '') + if ( + hasattr(values_model, "resampling_args") + and value in values_model.resampling_args + and resample in values_model.resampling_args[value] + ): + resampling_args = values_model.resampling_args[value][resample].copy() + comment = resampling_args.pop("comment", "") else: resampling_args = {} - resampling_agg_method = value_defs.get('agg', 'mean') + resampling_agg_method = value_defs.get("agg", "mean") ## If the resampling method is sum, set the date as the end of each period - #if resampling_agg_method == 'sum': - #resampling_args['loffset'] = rule + # if resampling_agg_method == 'sum': + # resampling_args['loffset'] = resample ## loffset was deprecated in Pandas 1.1.0 - loffset = resampling_args.pop('loffset', None) - df = df.resample(rule, **resampling_args).agg(resampling_agg_method) + loffset = resampling_args.pop("loffset", None) + df = df.resample(resample, **resampling_args).agg(resampling_agg_method) if loffset is not None: df.index = df.index + to_offset(loffset) if len(df) > 0: df.reset_index(inplace=True) elif len(df) > conf.plot.maxDataSize: - msg ='Too much data to display in the graph, automatically switching to daily resampling. ' \ - 'Note that you can download raw data anyway as CSV in the "Tools" tab.', - raise HTTPException(status.HTTP_502_BAD_GATEWAY, # FIXME: 502 status code - detail=msg, - headers={'resampling': 'D'} + msg = ( + "Too much data to display in the graph, automatically switching to daily resampling. " + 'Note that you can download raw data anyway as CSV in the "Tools" tab.', + ) + raise HTTPException( + status.HTTP_500_INTERNAL_SERVER_ERROR, # FIXME: 500 status code + detail=msg, + headers={"resampling": "D"}, ) else: df.reset_index(inplace=True) @@ -252,53 +279,64 @@ async def get_model_values( df.dropna(inplace=True) ## Round values - values_dict = {value['name']: value for value in values_model.values} + values_dict = {value["name"]: value for value in values_model.values} for column in df.columns: if column in values_dict: ## XXX: workaround for https://github.com/pandas-dev/pandas/issues/38844: ## convert column to float. ## Revert back to the commented out line below when the ## bug fix is applied: in Pandas 1.3 - #df[column] = df[column].round(values_dict[column].get('round', 1)) - df[column] = df[column].astype(float).round(values_dict[column].get('round', 1)) + # df[column] = df[column].round(values_dict[column].get('round', 1)) + df[column] = ( + df[column].astype(float).round(values_dict[column].get("round", 1)) + ) response.headers["comment"] = comment - return df.to_json(orient='records', date_format='iso'), + return (df.to_json(orient="records", date_format="iso"),) + @api.get("/stores") async def get_stores() -> list[Store]: - df = registry.stores.reset_index().\ - drop(columns=['model', 'raw_model', 'base_gis_type']) - return df.to_dict(orient="records") # type: ignore[return-value] + df = registry.stores.reset_index().drop( + columns=["model", "raw_model", "base_gis_type"] + ) + return df.to_dict(orient="records") # type: ignore[return-value] + @api.get("/projects") async def get_projects( db_session: db_session, - ) -> list[Project]: +) -> list[Project]: query = select(Project) data = await db_session.exec(query) - return data.all() # type: ignore[return-value] + return data.all() # type: ignore[return-value] + @api.get("/survey_meta") async def get_survey_meta( db_session: db_session, - ) -> SurveyMeta: +) -> SurveyMeta: return SurveyMeta( - projects=(await db_session.exec(select(Project))).all(), # type: ignore[arg-type] - surveyors=(await db_session.exec(select(Surveyor))).all(), # type: ignore[arg-type] - equipments=(await db_session.exec(select(Equipment))).all(), # type: ignore[arg-type] + projects=(await db_session.exec(select(Project))).all(), # type: ignore[arg-type] + surveyors=(await db_session.exec(select(Surveyor))).all(), # type: ignore[arg-type] + equipments=(await db_session.exec(select(Equipment))).all(), # type: ignore[arg-type] statuses=conf.map.status, - stores_misc=[StoreNameOnly(name=name) - for name, model in registry.geom_custom.items()], - stores_line_work=[StoreNameOnly(name=name) - for name in registry.stores[registry.stores.is_line_work].index], - default=conf.admin.basket.default + stores_misc=[ + StoreNameOnly(name=name) for name, model in registry.geom_custom.items() + ], + stores_line_work=[ + StoreNameOnly(name=name) + for name in registry.stores[registry.stores.is_line_work].index + ], + default=conf.admin.basket.default, ) + @api.get("/feature-info/{store}/{id}") async def get_feature_info( - store: str, id: str, - ) -> FeatureInfo | None: + store: str, + id: str, +) -> FeatureInfo | None: if store not in registry.stores.index: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) store_record = registry.stores.loc[store] @@ -317,9 +355,7 @@ async def get_feature_info( @api.get("/model-info/{store}") -async def get_model_info( - store: str - ) -> ModelInfo: +async def get_model_info(store: str) -> ModelInfo: try: store_record = registry.stores.loc[store] except KeyError: @@ -327,34 +363,37 @@ async def get_model_info( if store_record.is_live: ## Get layer_defs from live redis and give symbol layer_def = await redis_store.get_layer_def(store) - return ModelInfo(modelName=layer_def.pop('name'), **layer_def) + return ModelInfo(modelName=layer_def.pop("name"), **layer_def) model = store_record.model model_info = { - 'store': store, - 'modelName': model.__name__, - 'symbol': model.symbol or gisTypeSymbolMap[model.base_gis_type], + "store": store, + "modelName": model.__name__, + "symbol": model.symbol or gisTypeSymbolMap[model.base_gis_type], } ## Add information about the legend - if hasattr(model, 'get_legend'): + if hasattr(model, "get_legend"): legend = await model.get_legend() - model_info['legend'] = [ - LegendItem(key=k, value=v) - for k, v in legend.items() - ] + model_info["legend"] = [LegendItem(key=k, value=v) for k, v in legend.items()] ## Add information about values values_model = registry.values_for_model.get(model) # FIXME: one the first values_model is managed - if values_model is not None and len(values_model) > 0 and hasattr(values_model[0], 'values'): - model_info['values'] = [ModelValue(**values) for values in values_model[0].values] + if ( + values_model is not None + and len(values_model) > 0 + and hasattr(values_model[0], "values") + ): + model_info["values"] = [ + ModelValue(**values) for values in values_model[0].values + ] ## Add information about tags ## TODO: add to plugin_manager a way to retrieve tag_store/tag_actions from a dict? - #tag_store = [tt for tt in plugin_manager.tagsStores.stores if tt.store==store][0] - model_info['tagActions'] = [ + # tag_store = [tt for tt in plugin_manager.tagsStores.stores if tt.store==store][0] + model_info["tagActions"] = [ TagActions(key=key, domain=domain, actions=actions) for domain, actions_keys in plugin_manager.tags_models[model].items() for key, actions in actions_keys.items() ] - model_info['actions'] = [ + model_info["actions"] = [ ModelAction( name=name, icon=action.icon, @@ -364,16 +403,15 @@ async def get_model_info( for name, actions in plugin_manager.actions_stores.get(store, {}).items() for action in actions ] - model_info['downloaders'] = plugin_manager.downloaders_stores[store] + model_info["downloaders"] = plugin_manager.downloaders_stores[store] return ModelInfo(**model_info) + @api.get("/plot-params/{store}") -async def get_plot_params( - store: str, id: str, value: str - ) -> PlotParams: +async def get_plot_params(store: str, id: str, value: str) -> PlotParams: model = registry.geom.get(store) ## Get additional plot params - if hasattr(model, 'get_plot_params'): + if hasattr(model, "get_plot_params"): plot_params: PlotParams = await model.get_plot_params(id, value) return plot_params else: @@ -387,29 +425,32 @@ async def get_plot_params( # roles = await db_session.exec(select(UserRoleLink)) # return roles.all() -@api.get('/actions') + +@api.get("/actions") async def get_actions() -> list[ActionsStore]: # actionsPlugins = List(ActionsStore) return plugin_manager.actionsStores -@api.post('/execTagActions') + +@api.post("/execTagActions") async def execute_tag_action( - user: Annotated[UserRead, Depends(get_current_active_user)], - stores: list[str], - ids: list[list[str]], - actionNames: list[str], - params: list[ActionParam | None], - formFields: list[FormFieldInput], - ) -> ActionsResults: + user: Annotated[UserRead, Depends(get_current_active_user)], + stores: list[str], + ids: list[list[str]], + actionNames: list[str], + params: list[ActionParam | None], + formFields: list[FormFieldInput], +) -> ActionsResults: features = dict(zip(stores, [[int(id) for id in _ids] for _ids in ids])) response = ActionsResults() - #formFields = {field['name']: field['value'] for field in formFields} + # formFields = {field['name']: field['value'] for field in formFields} if not params: params = [None] * len(actionNames) for name in actionNames: ## Give the request from context to execute action, along with the parameters ## FIXME: formFields/names? resp = await plugin_manager.execute_action( - user, features, name, params, form_fields=formFields) + user, features, name, params, form_fields=formFields + ) response.actionResults.append(resp) - return response \ No newline at end of file + return response