gisaf-backend/src/gisaf/api/main.py

363 lines
No EOL
14 KiB
Python

import logging
from datetime import timedelta
from typing import Annotated
from json import loads
from fastapi import Depends, APIRouter, HTTPException, status, Response
from sqlalchemy import func
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy.orm.attributes import QueryableAttribute
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import select
from gisaf.models.authentication import (
User, UserRead,
Role, RoleRead,
)
from gisaf.models.category import Category, CategoryRead
from gisaf.models.geo_models_base import GeoModel, PlottableModel
from gisaf.models.info import LegendItem, ModelAction, ModelInfo, DataProvider, ModelValue, TagActions
from gisaf.models.measures import MeasuresItem
from gisaf.models.survey import Equipment, SurveyMeta, Surveyor
from gisaf.config import Survey, conf
from gisaf.models.bootstrap import BootstrapData
from gisaf.models.store import Store, StoreNameOnly
from gisaf.models.project import Project
from gisaf.models.authentication import UserRoleLink #, ACL
from gisaf.database import pandas_query, fastapi_db_session as db_session
from gisaf.security import (
Token,
authenticate_user, get_current_user, create_access_token,
)
from gisaf.registry import registry, NotInRegistry
from gisaf.custom_store_base import BaseStore
from gisaf.redis_tools import store as redis_store
from gisaf.models.info import (
FeatureInfo, InfoItem, Attachment, InfoCategory
)
from gisaf.live_utils import get_live_feature_info
from gisaf.plugins import manager as plugin_manager, NoSuchAction
from gisaf.utils import gisTypeSymbolMap
logger = logging.getLogger(__name__)
api = APIRouter(
tags=["api"],
# dependencies=[Depends(get_token_header)],
responses={404: {"description": "Not found"}},
)
#api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret)
@api.get('/bootstrap')
async def bootstrap(
user: Annotated[UserRead, Depends(get_current_user)]) -> BootstrapData:
return BootstrapData(user=user)
@api.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
) -> Token:
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(seconds=conf.crypto.expire))
return Token(access_token=access_token, token_type='bearer')
@api.get("/users")
async def get_users(
db_session: db_session,
) -> list[UserRead]:
query = select(User).options(selectinload(User.roles)) # type: ignore[arg-type]
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
@api.get("/roles")
async def get_roles(
db_session: db_session,
) -> list[RoleRead]:
query = select(Role).options(selectinload(Role.users)) # type: ignore[arg-type]
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
@api.get('/acls')
async def get_acls(db_session: db_session,
user: Annotated[User, Depends(get_current_user)]) -> list[UserRoleLink]:
"""New: ACLs returned as UserRoleLink"""
if not user or not user.has_role('manager'):
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED)
data = await db_session.exec(select(UserRoleLink))
return data.all() # type: ignore[return-value]
@api.get("/categories")
async def get_categories(
db_session: db_session,
) -> list[CategoryRead]:
query = select(Category)
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
@api.get("/categories_pandas")
async def get_categories_p(
db_session: db_session,
) -> list[CategoryRead]:
query = select(Category)
df = await db_session.run_sync(pandas_query, query)
return df.to_dict(orient="records") # type: ignore[return-value]
# @api.get("/list")
@api.get("/data-providers")
async def list_data_providers() -> list[DataProvider]:
"""
Return a list of data providers, for use with the api (graphs, etc)
:return:
"""
return [
DataProvider(
store=model.get_store_name(),
name=model.__name__,
values=[value.get_store_name() for value in values]
) for model, values in registry.values_for_model.items()]
@api.get("/data-provider/{store}")
async def get_model_list(
store: str,
db_session: db_session,
) -> list[MeasuresItem]:
"""
Json REST store API compatible with Flask Potion and Angular
Get the list of items (used for making the list of items in measures)
Filter only items with at least one measure
"""
try:
store_record = registry.stores.loc[store]
except KeyError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
model: type[PlottableModel] = store_record.model
# FIXME: get only the first model of values
values_models = registry.values_for_model.get(model) # type: ignore
if values_models is None or len(values_models) == 0:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
values_model = values_models[0]
try:
ref_id_attr: QueryableAttribute = getattr(values_model, 'ref_id')
except AttributeError:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f'No ref_id defined for {values_model.__name__}')
data = await db_session.exec(
select(ref_id_attr, func.count(ref_id_attr)).group_by(ref_id_attr)
)
counts = dict(data.all())
objs = await db_session.exec(select(model).options(
*(joinedload(jt) for jt in model.selectinload()))
)
resp = [
MeasuresItem(
# uri=f'/data-provider/{store}/{obj.id}',
id=obj.id,
name=obj.caption,
)
for obj in objs.all()
if obj.id in counts
]
return resp
@api.get('/{store_name}/values/{value}')
async def get_model_values(store_name: str, value: str,
response: Response,
where: str,
resample: str | None = None,
):
"""
Get values
"""
comment = ''
## Get the request's args, i.e. the where clause of the DB query
model_query = loads(where)
# store_name = [k for k in model_query.keys()][0]
model_id = model_query[store_name]
model: GeoModel
model = registry.geom.get(store_name) # type: ignore
if model is None:
raise HTTPException(status.HTTP_404_NOT_FOUND)
values_model = registry.values_for_model.get(model)[0]
## Allow custom getter
getter = getattr(values_model, f'get_{value}', None)
if getter:
df = await getter(model_id)
else:
df = await values_model.get_as_dataframe(model_id=model_id,
with_only_columns=[value])
if len(df) == 0:
return []
if resample is not None and resample != '0':
## Model defines how to resample
value_defs = [v for v in values_model.values if v['name'] == value]
rule = request.query['resample']
if len(value_defs) > 0:
value_defs = value_defs[0]
else:
value_defs = {}
if hasattr(values_model, 'resampling_args') \
and value in values_model.resampling_args \
and rule in values_model.resampling_args[value]:
resampling_args = values_model.resampling_args[value][rule].copy()
comment = resampling_args.pop('comment', '')
else:
resampling_args = {}
resampling_agg_method = value_defs.get('agg', 'mean')
## If the resampling method is sum, set the date as the end of each period
#if resampling_agg_method == 'sum':
#resampling_args['loffset'] = rule
## loffset was deprecated in Pandas 1.1.0
loffset = resampling_args.pop('loffset', None)
df = df.resample(rule, **resampling_args).agg(resampling_agg_method)
if loffset is not None:
df.index = df.index + to_offset(loffset)
if len(df) > 0:
df.reset_index(inplace=True)
elif len(df) > conf.plot.maxDataSize:
msg ='Too much data to display in the graph, automatically switching to daily resampling. ' \
'Note that you can download raw data anyway as CSV in the "Tools" tab.',
raise HTTPException(status.HTTP_502_BAD_GATEWAY, # FIXME: 502 status code
detail=msg,
headers={'resampling': 'D'}
)
else:
df.reset_index(inplace=True)
df.dropna(inplace=True)
## Round values
values_dict = {value['name']: value for value in values_model.values}
for column in df.columns:
if column in values_dict:
## XXX: workaround for https://github.com/pandas-dev/pandas/issues/38844:
## convert column to float.
## Revert back to the commented out line below when the
## bug fix is applied: in Pandas 1.3
#df[column] = df[column].round(values_dict[column].get('round', 1))
df[column] = df[column].astype(float).round(values_dict[column].get('round', 1))
response.headers["comment"] = comment
return df.to_json(orient='records', date_format='iso'),
@api.get("/stores")
async def get_stores() -> list[Store]:
df = registry.stores.reset_index().\
drop(columns=['model', 'raw_model', 'base_gis_type'])
return df.to_dict(orient="records") # type: ignore[return-value]
@api.get("/projects")
async def get_projects(
db_session: db_session,
) -> list[Project]:
query = select(Project)
data = await db_session.exec(query)
return data.all() # type: ignore[return-value]
@api.get("/survey_meta")
async def get_survey_meta(
db_session: db_session,
) -> SurveyMeta:
return SurveyMeta(
projects=(await db_session.exec(select(Project))).all(), # type: ignore[arg-type]
surveyors=(await db_session.exec(select(Surveyor))).all(), # type: ignore[arg-type]
equipments=(await db_session.exec(select(Equipment))).all(), # type: ignore[arg-type]
statuses=conf.map.status,
stores_misc=[StoreNameOnly(name=name)
for name, model in registry.geom_custom.items()],
stores_line_work=[StoreNameOnly(name=name)
for name in registry.stores[registry.stores.is_line_work].index],
default=conf.admin.basket.default
)
@api.get("/feature-info/{store}/{id}")
async def get_feature_info(
store: str, id: str,
) -> FeatureInfo | None:
if store not in registry.stores.index:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
store_record = registry.stores.loc[store]
model: type[GeoModel] = store_record.model
if store_record.is_live:
feature_info = await get_live_feature_info(store, id)
elif issubclass(model, BaseStore):
feature_info = await model.get_item_params(id)
else:
## A layer in the database
try:
feature_info = await registry.get_model_id_params(model, int(id))
except NotInRegistry:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return feature_info
@api.get("/model-info/{store}")
async def get_model_info(
store: str
) -> ModelInfo:
try:
store_record = registry.stores.loc[store]
except KeyError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
if store_record.is_live:
## Get layer_defs from live redis and give symbol
layer_def = await redis_store.get_layer_def(store)
return ModelInfo(modelName=layer_def.pop('name'), **layer_def)
model = store_record.model
model_info = {
'store': store,
'modelName': model.__name__,
'symbol': model.symbol or gisTypeSymbolMap[model.base_gis_type],
}
## Add information about the legend
if hasattr(model, 'get_legend'):
legend = await model.get_legend()
model_info['legend'] = [
LegendItem(key=k, value=v)
for k, v in legend.items()
]
## Add information about values
values_model = registry.values_for_model.get(model)
assert values_model is not None
# FIXME: one the first values_model is managed
if len(values_model) > 0 and hasattr(values_model[0], 'values'):
model_info['values'] = [ModelValue(**values) for values in values_model[0].values]
## Add information about tags
## TODO: add to plugin_manager a way to retrieve tag_store/tag_actions from a dict?
#tag_store = [tt for tt in plugin_manager.tagsStores.stores if tt.store==store][0]
model_info['tagActions'] = [
TagActions(key=key, domain=domain, actions=actions)
for domain, actions_keys in plugin_manager.tags_models[model].items()
for key, actions in actions_keys.items()
]
model_info['actions'] = [
ModelAction(
name=name,
icon=action.icon,
formFields=action.formFields,
)
for name, actions in plugin_manager.actions_stores.get(store, {}).items()
for action in actions
]
model_info['downloaders'] = plugin_manager.downloaders_stores[store]
return ModelInfo(**model_info)
# @api.get("/user-role")
# async def get_user_role_relation(
# *, db_session: AsyncSession = Depends(get_db_session)
# ) -> list[UserRoleLink]:
# roles = await db_session.exec(select(UserRoleLink))
# return roles.all()