Graph: fix resampling

This commit is contained in:
phil 2024-05-16 11:58:02 +02:00
parent 5a63892640
commit 8244e3dc1f

View file

@ -9,26 +9,38 @@ from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy.orm.attributes import QueryableAttribute
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import select
from pandas.tseries.frequencies import to_offset
from gisaf.models.authentication import (User, UserRead, Role, RoleRead)
from gisaf.models.authentication import User, UserRead, Role, RoleRead
from gisaf.models.category import Category, CategoryRead
from gisaf.models.geo_models_base import GeoModel, PlottableModel
from gisaf.models.info import (ActionParam, ActionsResults, ActionsStore,
FormFieldInput, LegendItem,
ModelAction, ModelInfo,
DataProvider, ModelValue, PlotParams,
TagActions)
from gisaf.models.info import (
ActionParam,
ActionsResults,
ActionsStore,
FormFieldInput,
LegendItem,
ModelAction,
ModelInfo,
DataProvider,
ModelValue,
PlotParams,
TagActions,
)
from gisaf.models.measures import MeasuresItem
from gisaf.models.survey import Equipment, SurveyMeta, Surveyor
from gisaf.config import conf
from gisaf.models.bootstrap import BootstrapData
from gisaf.models.store import Store, StoreNameOnly
from gisaf.models.project import Project
from gisaf.models.authentication import UserRoleLink #, ACL
from gisaf.models.authentication import UserRoleLink # , ACL
from gisaf.database import pandas_query, fastapi_db_session as db_session
from gisaf.security import (
Token, authenticate_user, get_current_active_user, create_access_token,
)
Token,
authenticate_user,
get_current_active_user,
create_access_token,
)
from gisaf.registry import registry, NotInRegistry
from gisaf.custom_store_base import BaseStore
from gisaf.redis_tools import store as redis_store
@ -45,81 +57,87 @@ api = APIRouter(
# dependencies=[Depends(get_token_header)],
responses={404: {"description": "Not found"}},
)
#api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret)
# api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret)
@api.get('/bootstrap')
@api.get("/bootstrap")
async def bootstrap(
user: Annotated[UserRead, Depends(get_current_active_user)]
) -> BootstrapData:
user: Annotated[UserRead, Depends(get_current_active_user)],
) -> BootstrapData:
return BootstrapData(user=user)
@api.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
) -> Token:
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = await authenticate_user(form_data.username, form_data.password)
if not user:
logger.info(f'{form_data.username} failed attempt to get token')
logger.info(f"{form_data.username} failed attempt to get token")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(seconds=conf.crypto.expire))
logger.info(f'{user.username} ({user.id}) logged in')
return Token(access_token=access_token, token_type='bearer')
data={"sub": user.username}, expires_delta=timedelta(seconds=conf.crypto.expire)
)
logger.info(f"{user.username} ({user.id}) logged in")
return Token(access_token=access_token, token_type="bearer")
@api.get('/logout')
async def logout(
user: Annotated[UserRead, Depends(get_current_active_user)]):
@api.get("/logout")
async def logout(user: Annotated[UserRead, Depends(get_current_active_user)]):
if user is not None:
logger.info(f'{user.username} ({user.id}) logged out')
logger.info(f"{user.username} ({user.id}) logged out")
@api.get("/users")
async def get_users(
db_session: db_session,
) -> list[UserRead]:
query = select(User).options(selectinload(User.roles)) # type: ignore[arg-type]
) -> list[UserRead]:
query = select(User).options(selectinload(User.roles)) # type: ignore[arg-type]
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
return data.all() # type: ignore[return-value]
@api.get("/roles")
async def get_roles(
db_session: db_session,
) -> list[RoleRead]:
query = select(Role).options(selectinload(Role.users)) # type: ignore[arg-type]
) -> list[RoleRead]:
query = select(Role).options(selectinload(Role.users)) # type: ignore[arg-type]
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
return data.all() # type: ignore[return-value]
@api.get('/acls')
async def get_acls(db_session: db_session,
user: Annotated[User, Depends(get_current_active_user)]) -> list[UserRoleLink]:
@api.get("/acls")
async def get_acls(
db_session: db_session, user: Annotated[User, Depends(get_current_active_user)]
) -> list[UserRoleLink]:
"""New: ACLs returned as UserRoleLink"""
if user is not None or not user.has_role('manager'):
if user is not None or not user.has_role("manager"):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
data = await db_session.exec(select(UserRoleLink))
return data.all() # type: ignore[return-value]
return data.all() # type: ignore[return-value]
@api.get("/categories")
async def get_categories(
db_session: db_session,
) -> list[CategoryRead]:
) -> list[CategoryRead]:
query = select(Category)
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
return data.all() # type: ignore[return-value]
@api.get("/categories_pandas")
async def get_categories_p(
db_session: db_session,
) -> list[CategoryRead]:
) -> list[CategoryRead]:
query = select(Category)
df = await db_session.run_sync(pandas_query, query)
return df.to_dict(orient="records") # type: ignore[return-value]
return df.to_dict(orient="records") # type: ignore[return-value]
# @api.get("/list")
@api.get("/data-providers")
@ -132,14 +150,17 @@ async def list_data_providers() -> list[DataProvider]:
DataProvider(
store=model.get_store_name(),
name=model.__name__,
values=[value.get_store_name() for value in values]
) for model, values in registry.values_for_model.items()]
values=[value.get_store_name() for value in values],
)
for model, values in registry.values_for_model.items()
]
@api.get("/data-provider/{store}")
async def get_model_list(
store: str,
db_session: db_session,
) -> list[MeasuresItem]:
) -> list[MeasuresItem]:
"""
Json REST store API compatible with Flask Potion and Angular
Get the list of items (used for making the list of items in measures)
@ -151,21 +172,23 @@ async def get_model_list(
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
model: type[PlottableModel] = store_record.model
# FIXME: get only the first model of values
values_models = registry.values_for_model.get(model) # type: ignore
values_models = registry.values_for_model.get(model) # type: ignore
if values_models is None or len(values_models) == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
values_model = values_models[0]
try:
ref_id_attr: QueryableAttribute = getattr(values_model, 'ref_id')
ref_id_attr: QueryableAttribute = getattr(values_model, "ref_id")
except AttributeError:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'No ref_id defined for {values_model.__name__}')
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"No ref_id defined for {values_model.__name__}",
)
data = await db_session.exec(
select(ref_id_attr, func.count(ref_id_attr)).group_by(ref_id_attr)
)
counts = dict(data.all())
objs = await db_session.exec(select(model).options(
*(joinedload(jt) for jt in model.selectinload()))
objs = await db_session.exec(
select(model).options(*(joinedload(jt) for jt in model.selectinload()))
)
resp = [
MeasuresItem(
@ -178,7 +201,8 @@ async def get_model_list(
]
return resp
@api.get('/{store_name}/values/{value}')
@api.get("/{store_name}/values/{value}")
async def get_model_values(
db_session: db_session,
store_name: str,
@ -186,65 +210,68 @@ async def get_model_values(
response: Response,
where: str,
resample: str | None = None,
):
):
"""
Get values
"""
comment = ''
comment = ""
## Get the request's args, i.e. the where clause of the DB query
model_query = loads(where)
# store_name = [k for k in model_query.keys()][0]
model_id = model_query[store_name]
model: GeoModel
model = registry.geom.get(store_name) # type: ignore
model = registry.geom.get(store_name) # type: ignore
if model is None:
raise HTTPException(status.HTTP_404_NOT_FOUND)
values_model = registry.values_for_model.get(model)[0]
## Allow custom getter
getter = getattr(values_model, f'get_{value}', None)
getter = getattr(values_model, f"get_{value}", None)
if getter:
df = await getter(model_id)
else:
item = await db_session.get(model, model_id)
df = await values_model.get_as_dataframe(item=item,
with_only_columns=[value])
df = await values_model.get_as_dataframe(item=item, with_only_columns=[value])
if len(df) == 0:
return []
if resample is not None and resample != '0':
if resample is not None and resample != "0":
## Model defines how to resample
value_defs = [v for v in values_model.values if v['name'] == value]
rule = request.query['resample']
value_defs = [v for v in values_model.values if v["name"] == value]
if len(value_defs) > 0:
value_defs = value_defs[0]
else:
value_defs = {}
if hasattr(values_model, 'resampling_args') \
and value in values_model.resampling_args \
and rule in values_model.resampling_args[value]:
resampling_args = values_model.resampling_args[value][rule].copy()
comment = resampling_args.pop('comment', '')
if (
hasattr(values_model, "resampling_args")
and value in values_model.resampling_args
and resample in values_model.resampling_args[value]
):
resampling_args = values_model.resampling_args[value][resample].copy()
comment = resampling_args.pop("comment", "")
else:
resampling_args = {}
resampling_agg_method = value_defs.get('agg', 'mean')
resampling_agg_method = value_defs.get("agg", "mean")
## If the resampling method is sum, set the date as the end of each period
#if resampling_agg_method == 'sum':
#resampling_args['loffset'] = rule
# if resampling_agg_method == 'sum':
# resampling_args['loffset'] = resample
## loffset was deprecated in Pandas 1.1.0
loffset = resampling_args.pop('loffset', None)
df = df.resample(rule, **resampling_args).agg(resampling_agg_method)
loffset = resampling_args.pop("loffset", None)
df = df.resample(resample, **resampling_args).agg(resampling_agg_method)
if loffset is not None:
df.index = df.index + to_offset(loffset)
if len(df) > 0:
df.reset_index(inplace=True)
elif len(df) > conf.plot.maxDataSize:
msg ='Too much data to display in the graph, automatically switching to daily resampling. ' \
'Note that you can download raw data anyway as CSV in the "Tools" tab.',
raise HTTPException(status.HTTP_502_BAD_GATEWAY, # FIXME: 502 status code
detail=msg,
headers={'resampling': 'D'}
msg = (
"Too much data to display in the graph, automatically switching to daily resampling. "
'Note that you can download raw data anyway as CSV in the "Tools" tab.',
)
raise HTTPException(
status.HTTP_500_INTERNAL_SERVER_ERROR, # FIXME: 500 status code
detail=msg,
headers={"resampling": "D"},
)
else:
df.reset_index(inplace=True)
@ -252,53 +279,64 @@ async def get_model_values(
df.dropna(inplace=True)
## Round values
values_dict = {value['name']: value for value in values_model.values}
values_dict = {value["name"]: value for value in values_model.values}
for column in df.columns:
if column in values_dict:
## XXX: workaround for https://github.com/pandas-dev/pandas/issues/38844:
## convert column to float.
## Revert back to the commented out line below when the
## bug fix is applied: in Pandas 1.3
#df[column] = df[column].round(values_dict[column].get('round', 1))
df[column] = df[column].astype(float).round(values_dict[column].get('round', 1))
# df[column] = df[column].round(values_dict[column].get('round', 1))
df[column] = (
df[column].astype(float).round(values_dict[column].get("round", 1))
)
response.headers["comment"] = comment
return df.to_json(orient='records', date_format='iso'),
return (df.to_json(orient="records", date_format="iso"),)
@api.get("/stores")
async def get_stores() -> list[Store]:
df = registry.stores.reset_index().\
drop(columns=['model', 'raw_model', 'base_gis_type'])
return df.to_dict(orient="records") # type: ignore[return-value]
df = registry.stores.reset_index().drop(
columns=["model", "raw_model", "base_gis_type"]
)
return df.to_dict(orient="records") # type: ignore[return-value]
@api.get("/projects")
async def get_projects(
db_session: db_session,
) -> list[Project]:
) -> list[Project]:
query = select(Project)
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
return data.all() # type: ignore[return-value]
@api.get("/survey_meta")
async def get_survey_meta(
db_session: db_session,
) -> SurveyMeta:
) -> SurveyMeta:
return SurveyMeta(
projects=(await db_session.exec(select(Project))).all(), # type: ignore[arg-type]
surveyors=(await db_session.exec(select(Surveyor))).all(), # type: ignore[arg-type]
equipments=(await db_session.exec(select(Equipment))).all(), # type: ignore[arg-type]
projects=(await db_session.exec(select(Project))).all(), # type: ignore[arg-type]
surveyors=(await db_session.exec(select(Surveyor))).all(), # type: ignore[arg-type]
equipments=(await db_session.exec(select(Equipment))).all(), # type: ignore[arg-type]
statuses=conf.map.status,
stores_misc=[StoreNameOnly(name=name)
for name, model in registry.geom_custom.items()],
stores_line_work=[StoreNameOnly(name=name)
for name in registry.stores[registry.stores.is_line_work].index],
default=conf.admin.basket.default
stores_misc=[
StoreNameOnly(name=name) for name, model in registry.geom_custom.items()
],
stores_line_work=[
StoreNameOnly(name=name)
for name in registry.stores[registry.stores.is_line_work].index
],
default=conf.admin.basket.default,
)
@api.get("/feature-info/{store}/{id}")
async def get_feature_info(
store: str, id: str,
) -> FeatureInfo | None:
store: str,
id: str,
) -> FeatureInfo | None:
if store not in registry.stores.index:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
store_record = registry.stores.loc[store]
@ -317,9 +355,7 @@ async def get_feature_info(
@api.get("/model-info/{store}")
async def get_model_info(
store: str
) -> ModelInfo:
async def get_model_info(store: str) -> ModelInfo:
try:
store_record = registry.stores.loc[store]
except KeyError:
@ -327,34 +363,37 @@ async def get_model_info(
if store_record.is_live:
## Get layer_defs from live redis and give symbol
layer_def = await redis_store.get_layer_def(store)
return ModelInfo(modelName=layer_def.pop('name'), **layer_def)
return ModelInfo(modelName=layer_def.pop("name"), **layer_def)
model = store_record.model
model_info = {
'store': store,
'modelName': model.__name__,
'symbol': model.symbol or gisTypeSymbolMap[model.base_gis_type],
"store": store,
"modelName": model.__name__,
"symbol": model.symbol or gisTypeSymbolMap[model.base_gis_type],
}
## Add information about the legend
if hasattr(model, 'get_legend'):
if hasattr(model, "get_legend"):
legend = await model.get_legend()
model_info['legend'] = [
LegendItem(key=k, value=v)
for k, v in legend.items()
]
model_info["legend"] = [LegendItem(key=k, value=v) for k, v in legend.items()]
## Add information about values
values_model = registry.values_for_model.get(model)
# FIXME: one the first values_model is managed
if values_model is not None and len(values_model) > 0 and hasattr(values_model[0], 'values'):
model_info['values'] = [ModelValue(**values) for values in values_model[0].values]
if (
values_model is not None
and len(values_model) > 0
and hasattr(values_model[0], "values")
):
model_info["values"] = [
ModelValue(**values) for values in values_model[0].values
]
## Add information about tags
## TODO: add to plugin_manager a way to retrieve tag_store/tag_actions from a dict?
#tag_store = [tt for tt in plugin_manager.tagsStores.stores if tt.store==store][0]
model_info['tagActions'] = [
# tag_store = [tt for tt in plugin_manager.tagsStores.stores if tt.store==store][0]
model_info["tagActions"] = [
TagActions(key=key, domain=domain, actions=actions)
for domain, actions_keys in plugin_manager.tags_models[model].items()
for key, actions in actions_keys.items()
]
model_info['actions'] = [
model_info["actions"] = [
ModelAction(
name=name,
icon=action.icon,
@ -364,16 +403,15 @@ async def get_model_info(
for name, actions in plugin_manager.actions_stores.get(store, {}).items()
for action in actions
]
model_info['downloaders'] = plugin_manager.downloaders_stores[store]
model_info["downloaders"] = plugin_manager.downloaders_stores[store]
return ModelInfo(**model_info)
@api.get("/plot-params/{store}")
async def get_plot_params(
store: str, id: str, value: str
) -> PlotParams:
async def get_plot_params(store: str, id: str, value: str) -> PlotParams:
model = registry.geom.get(store)
## Get additional plot params
if hasattr(model, 'get_plot_params'):
if hasattr(model, "get_plot_params"):
plot_params: PlotParams = await model.get_plot_params(id, value)
return plot_params
else:
@ -387,29 +425,32 @@ async def get_plot_params(
# roles = await db_session.exec(select(UserRoleLink))
# return roles.all()
@api.get('/actions')
@api.get("/actions")
async def get_actions() -> list[ActionsStore]:
# actionsPlugins = List(ActionsStore)
return plugin_manager.actionsStores
@api.post('/execTagActions')
@api.post("/execTagActions")
async def execute_tag_action(
user: Annotated[UserRead, Depends(get_current_active_user)],
stores: list[str],
ids: list[list[str]],
actionNames: list[str],
params: list[ActionParam | None],
formFields: list[FormFieldInput],
) -> ActionsResults:
user: Annotated[UserRead, Depends(get_current_active_user)],
stores: list[str],
ids: list[list[str]],
actionNames: list[str],
params: list[ActionParam | None],
formFields: list[FormFieldInput],
) -> ActionsResults:
features = dict(zip(stores, [[int(id) for id in _ids] for _ids in ids]))
response = ActionsResults()
#formFields = {field['name']: field['value'] for field in formFields}
# formFields = {field['name']: field['value'] for field in formFields}
if not params:
params = [None] * len(actionNames)
for name in actionNames:
## Give the request from context to execute action, along with the parameters
## FIXME: formFields/names?
resp = await plugin_manager.execute_action(
user, features, name, params, form_fields=formFields)
user, features, name, params, form_fields=formFields
)
response.actionResults.append(resp)
return response
return response