Restructure apis, to the dedicated directory

Implement dashboards
This commit is contained in:
phil 2024-01-09 17:46:18 +05:30
parent aed84e0f36
commit 581598c208
7 changed files with 456 additions and 20 deletions

137
src/gisaf/api/dashboard.py Normal file
View file

@ -0,0 +1,137 @@
import logging
from pathlib import Path
from fastapi import Depends, FastAPI, HTTPException, status, responses
from sqlalchemy.orm import selectinload
from sqlmodel import select
from gisaf.config import conf
from gisaf.database import pandas_query, fastapi_db_session as db_session
from gisaf.models.authentication import User
from gisaf.models.dashboard import (
DashboardPage, DashboardPageSection,
DashboadPageSectionType, DashboardPage_,
DashboardGroup, DashboardHome,
)
from gisaf.models.misc import NotADataframeError
from gisaf.security import get_current_active_user
logger = logging.getLogger(__name__)
api = FastAPI(
default_response_class=responses.ORJSONResponse,
)
@api.get('/groups')
async def get_groups(
db_session: db_session,
) -> list[DashboardGroup]:
query = select(DashboardPage)
data = await db_session.exec(query)
groups: dict[str, DashboardPage_] = {}
for page in data.all():
page_field = DashboardPage_(
name=page.name,
group=page.group,
description=page.description,
)
group = groups.get(page.group)
if group is None:
group = DashboardGroup(
name=page.group,
pages=[page_field]
)
groups[page.group] = group
else:
group.pages.append(page_field)
return groups.values()
@api.get('/home')
async def get_home() -> DashboardHome:
content_path = Path(conf.gisaf.dashboard_home.content_file).expanduser()
footer_path = Path(conf.gisaf.dashboard_home.footer_file).expanduser()
if content_path.is_file():
content = content_path.read_text()
else:
content = 'Gisaf is free, open source software for geomatics and GIS: <a href="http://redmine.auroville.org.in/projects/gisaf">Gisaf</a>.'
if footer_path.is_file():
footer = footer_path.read_text()
else:
footer = '<a rel="license" href="https://www.gnu.org/licenses/gpl.html"><img alt="GNU GPL v3 license"style="border-width:0" src="/static/icons/gplv3-88x31.png" title="GPL Open Source license"/></a>'
return DashboardHome(
title=conf.gisaf.dashboard_home.title,
content=content,
footer=footer,
)
@api.get('/page/{group}/{name}')
async def get_dashboard_page(group: str, name: str,
db_session: db_session,
user: User = Depends(get_current_active_user),
) -> DashboardPage_:
query1 = select(DashboardPage).where((DashboardPage.name==name)
& (DashboardPage.group==group))
data1 = await db_session.exec(query1)
page = data1.one_or_none()
if not page:
raise HTTPException(status.HTTP_404_NOT_FOUND)
query2 = select(DashboardPageSection)\
.where(DashboardPageSection.dashboard_page_id==page.id)\
.options(selectinload(DashboardPageSection.dashboard_page))\
.order_by(DashboardPageSection.name)
data2 = await db_session.exec(query2)
sections = data2.all()
if page.viewable_role:
if not(user and user.has_role(page.viewable_role)):
username = user.username if user is not None else "Anonymous"
logger.info(f'{username} tried to access dashboard page {name}')
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
dashboard_page = DashboardPage_(
name=page.name,
group=page.group,
description=page.description,
html=page.html,
time=page.time,
notebook=page.get_notebook_url(),
attachment=page.get_attachment_url(),
expandedPanes=[
p.strip()
for p in page.expanded_panes.split(',')
] if page.expanded_panes else [],
sections=[
DashboadPageSectionType(
name=dps.name,
plot=dps.get_plot_url()
)
for dps in sections
]
)
try:
df = page.get_page_df()
if df is not None:
## TODO: plot as external file, like for sections
## Convert Geopandas dataframe to Pandas
if isinstance(df, gpd.GeoDataFrame):
gdf = pd.DataFrame(df.drop(columns=['geometry']))
df = gdf
dashboard_page.dfData = df.to_json(orient='table', double_precision=2)
except NotADataframeError:
logger.warning(f'Dashboard: cannot read dataframe for page {page.name}')
except Exception as err:
logger.warning(f'Dashboard: cannot add dataframe for page {page.name}, see debug message')
logger.exception(err)
if page.plot:
try:
plot = page.get_plot()
plotData = {
'data': [d.to_plotly_json() for d in plot.data],
'layout': plot.layout.to_plotly_json(),
}
except Exception as err:
logger.warning(f'Dashboard: cannot add plot for page {page.name}, see debug message')
logger.exception(err)
else:
dashboard_page.plotData = dumps(plotData, cls=NumpyEncoder)
return dashboard_page

177
src/gisaf/api/geoapi.py Normal file
View file

@ -0,0 +1,177 @@
"""
Geographical json stores, served under /gj
Used for displaying features on maps
"""
from json import JSONDecodeError
import logging
from typing import Annotated
from asyncio import CancelledError
from fastapi import (Depends, FastAPI, HTTPException, Response, Header,
WebSocket, WebSocketDisconnect,
status, responses)
from gisaf.models.authentication import User
from gisaf.redis_tools import store as redis_store
from gisaf.live import live_server
from gisaf.registry import registry
from gisaf.security import get_current_active_user
logger = logging.getLogger(__name__)
api = FastAPI(
default_response_class=responses.ORJSONResponse,
)
class ConnectionManager:
active_connections: list[WebSocket]
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@api.websocket('/live/{store}')
async def live_layer(store: str, websocket: WebSocket):
"""
Websocket for live layer updates
"""
await websocket.accept()
try:
while True:
try:
msg_data = await websocket.receive_json()
except JSONDecodeError:
msg_text = await websocket.receive_text()
if msg_text == 'close':
await websocket.close()
continue
# else:
if 'message' in msg_data:
if msg_data['message'] == 'subscribeLiveLayer':
live_server.add_subscription(websocket, store)
elif msg_data['message'] == 'unsubscribeLiveLayer':
live_server.remove_subscription(websocket, store)
else:
logger.warning(f'Got websocket message with no message field: {msg_data}')
except WebSocketDisconnect:
logger.debug('Websocket disconnected')
# logger.debug('websocket connection closed')
@api.get('/{store_name}')
async def get_geojson(store_name,
user: User = Depends(get_current_active_user),
If_None_Match: Annotated[str | None, Header()] = None,
simplify: Annotated[float | None, Header()] = None,
preserveTopology: Annotated[bool|None, Header()] = None,
):
"""
Some REST stores coded manually (route prefixed with "gj": geojson).
:param store_name: name of the model
:return: json
"""
use_cache = False
try:
model = registry.stores.loc[store_name].model
except KeyError:
raise HTTPException(status.HTTP_404_NOT_FOUND)
if getattr(model, 'viewable_role', None):
if not(user and user.can_view(model)):
username = user.username if user else "Anonymous"
logger.info(f'{username} tried to access {model}')
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
if await redis_store.has_channel(store_name):
## Live layers
data = await redis_store.get_layer_as_json(store_name)
return Response(content=data.decode(),
media_type="application/json")
if model.cache_enabled:
ttag = await redis_store.get_ttag(store_name)
if ttag and If_None_Match == ttag:
return status.HTTP_304_NOT_MODIFIED
if hasattr(model, 'get_geojson'):
geojson = await model.get_geojson(simplify_tolerance=simplify,
preserve_topology=preserveTopology,
registry=registry)
## Store to redis for caching
if use_cache:
await redis_store.store_json(model, geojson)
resp = geojson
elif model.can_get_features_as_df:
## Get the GeoDataframe (gdf) with GeoPandas
## get_popup and get_propertites get the gdf as argument
## and can use vectorised operations
try:
gdf = await model.get_gdf(cast=True, with_related=True,
# filter_columns=True,
preserve_topology=preserveTopology,
simplify_tolerance=simplify)
except CancelledError as err:
logger.debug(f'Getting {store_name} cancelled while getting gdf')
raise err
except Exception as err:
logger.exception(err)
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR)
## The query of category defined models gets the status
## (not sure how and this could be skipped)
## Other models do not have: just add it manually from the model itself
if 'status' not in gdf.columns:
gdf['status'] = model.status
if 'popup' not in gdf.columns:
gdf['popup'] = await model.get_popup(gdf)
# Add properties
properties = await model.get_properties(gdf)
columns = ['geom', 'status', 'popup']
for property, values in properties.items():
columns.append(property)
gdf[property] = values
geojson = gdf[columns].to_json(separators=(',', ':'),
check_circular=False)
## Store to redis for caching
if use_cache:
await redis_store.store_json(model, geojson)
resp = geojson
else:
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Gino is for: Gino Is No Option')
# logger.warn(f"{model} doesn't allow using dataframe for generating json!")
# attrs, features_kwargs = await model.get_features_attrs(simplify)
# ## Using gino: allows OO model (get_info, etc)
# try:
# attrs['features'] = await model.get_features_in_bulk_gino(**features_kwargs)
# except Exception as err:
# logger.exception(err)
# raise status.HTTP_500_INTERNAL_SERVER_ERROR
# resp = attrs
headers = {}
if model.cache_enabled and ttag:
headers['ETag'] = ttag
return Response(content=resp,
media_type="application/json", headers=headers)
# @api.get('/gj/{store_name}/popup/{id}')
# async def gj_popup(store_name: str, id: int):
# model = registry.geom.get(store_name)
# if not hasattr(model, 'get_popup_dynamic'):
# return ''
# obj = await model.get(id)
# ## Escape characters for json
# popup_more = obj.get_popup_dynamic().replace('"', '\\"').replace('\n', '\\n')
# return {"text": popup_more}

150
src/gisaf/api/v2.py Normal file
View file

@ -0,0 +1,150 @@
import logging
from datetime import timedelta
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status, responses
from sqlalchemy.orm import selectinload
from fastapi.security import OAuth2PasswordRequestForm
from sqlmodel import select
from gisaf.models.authentication import (
User, UserRead,
Role, RoleRead,
)
from gisaf.models.category import Category, CategoryRead
from gisaf.models.to_migrate import DataProvider
from gisaf.config import conf
from gisaf.models.bootstrap import BootstrapData
from gisaf.models.store import Store
from gisaf.models.project import Project
from gisaf.database import pandas_query, fastapi_db_session as db_session
from gisaf.security import (
Token,
authenticate_user, get_current_user, create_access_token,
)
from gisaf.registry import registry, NotInRegistry
from gisaf.custom_store_base import BaseStore
from gisaf.models.to_migrate import (
FeatureInfo, InfoItem, Attachment, InfoCategory
)
from gisaf.live_utils import get_live_feature_info
from gisaf.api.dashboard import api as dashboard_api
logger = logging.getLogger(__name__)
api = FastAPI(
default_response_class=responses.ORJSONResponse,
)
#api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret)
api.mount('/dashboard', dashboard_api)
@api.get('/bootstrap')
async def bootstrap(
user: Annotated[UserRead, Depends(get_current_user)]) -> BootstrapData:
return BootstrapData(user=user)
@api.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
) -> Token:
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(seconds=conf.crypto.expire))
return Token(access_token=access_token, token_type='bearer')
@api.get("/users")
async def get_users(
db_session: db_session,
) -> list[UserRead]:
query = select(User).options(selectinload(User.roles))
data = await db_session.exec(query)
return data.all()
@api.get("/roles")
async def get_roles(
db_session: db_session,
) -> list[RoleRead]:
query = select(Role).options(selectinload(Role.users))
data = await db_session.exec(query)
return data.all()
@api.get("/categories")
async def get_categories(
db_session: db_session,
) -> list[CategoryRead]:
query = select(Category)
data = await db_session.exec(query)
return data.all()
@api.get("/categories_pandas")
async def get_categories_p(
db_session: db_session,
) -> list[CategoryRead]:
query = select(Category)
df = await db_session.run_sync(pandas_query, query)
return df.to_dict(orient="records")
# @api.get("/list")
@api.get("/data-providers")
async def list_data_providers() -> list[DataProvider]:
"""
Return a list of data providers, for use with the api (graphs, etc)
:return:
"""
return [
DataProvider(
name=model.get_store_name(),
values=[value.get_store_name() for value in values]
) for model, values in registry.values_for_model.items()]
@api.get("/stores")
async def get_stores() -> list[Store]:
df = registry.stores.reset_index().\
drop(columns=['model', 'raw_model', 'base_gis_type'])
return df.to_dict(orient="records")
@api.get("/projects")
async def get_projects(
db_session: db_session,
) -> list[Project]:
query = select(Project)
df = await db_session.run_sync(pandas_query, query)
return df.to_dict(orient="records")
@api.get("/feature-info/{store}/{id}")
async def get_feature_info(
store: str, id: str,
) -> FeatureInfo:
if store not in registry.stores.index:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
store_record = registry.stores.loc[store]
model = store_record.model
if store_record.is_live:
feature_info = await get_live_feature_info(store, id)
elif issubclass(model, BaseStore):
feature_info = await model.get_item_params(id)
else:
## A layer in the database
try:
feature_info = await registry.get_model_id_params(model, int(id))
except NotInRegistry:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return feature_info
# @api.get("/user-role")
# async def get_user_role_relation(
# *, db_session: AsyncSession = Depends(get_db_session)
# ) -> list[UserRoleLink]:
# roles = await db_session.exec(select(UserRoleLink))
# return roles.all()