gisaf-backend/src/gisaf/geoapi.py

178 lines
6.8 KiB
Python
Raw Normal View History

"""
Geographical json stores, served under /gj
Used for displaying features on maps
"""
from json import JSONDecodeError
import logging
2023-12-17 12:20:07 +05:30
from typing import Annotated
from asyncio import CancelledError
from fastapi import (Depends, FastAPI, HTTPException, Response, Header,
WebSocket, WebSocketDisconnect,
status, responses)
2023-12-17 12:20:07 +05:30
2024-01-03 12:24:13 +05:30
from gisaf.models.authentication import User
from gisaf.redis_tools import store as redis_store
from gisaf.live import live_server
from gisaf.registry import registry
2024-01-03 15:29:24 +05:30
from gisaf.security import get_current_active_user
logger = logging.getLogger(__name__)
api = FastAPI(
default_response_class=responses.ORJSONResponse,
)
class ConnectionManager:
active_connections: list[WebSocket]
def __init__(self):
self.active_connections = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@api.websocket('/live/{store}')
async def live_layer(store: str, websocket: WebSocket):
"""
Websocket for live layer updates
"""
await websocket.accept()
try:
while True:
try:
msg_data = await websocket.receive_json()
except JSONDecodeError:
msg_text = await websocket.receive_text()
if msg_text == 'close':
await websocket.close()
continue
# else:
if 'message' in msg_data:
if msg_data['message'] == 'subscribeLiveLayer':
live_server.add_subscription(websocket, store)
elif msg_data['message'] == 'unsubscribeLiveLayer':
live_server.remove_subscription(websocket, store)
else:
logger.warning(f'Got websocket message with no message field: {msg_data}')
except WebSocketDisconnect:
logger.debug('Websocket disconnected')
# logger.debug('websocket connection closed')
@api.get('/{store_name}')
2023-12-17 12:20:07 +05:30
async def get_geojson(store_name,
2024-01-03 12:24:13 +05:30
user: User = Depends(get_current_active_user),
2023-12-17 12:20:07 +05:30
If_None_Match: Annotated[str | None, Header()] = None,
simplify: Annotated[float | None, Header()] = None,
preserveTopology: Annotated[bool|None, Header()] = None,
2023-12-17 12:20:07 +05:30
):
"""
Some REST stores coded manually (route prefixed with "gj": geojson).
:param store_name: name of the model
:return: json
"""
use_cache = False
try:
model = registry.stores.loc[store_name].model
except KeyError:
raise HTTPException(status.HTTP_404_NOT_FOUND)
2024-01-03 12:24:13 +05:30
if hasattr(model, 'viewable_role'):
if not(user and user.can_view(model)):
username = user.username if user else "Anonymous"
logger.info(f'{username} tried to access {model}')
2024-01-03 12:24:13 +05:30
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
if await redis_store.has_channel(store_name):
## Live layers
data = await redis_store.get_layer_as_json(store_name)
return Response(content=data.decode(),
media_type="application/json")
if model.cache_enabled:
ttag = await redis_store.get_ttag(store_name)
2023-12-17 12:20:07 +05:30
if ttag and If_None_Match == ttag:
return status.HTTP_304_NOT_MODIFIED
if hasattr(model, 'get_geojson'):
geojson = await model.get_geojson(simplify_tolerance=simplify,
preserve_topology=preserveTopology,
registry=registry)
## Store to redis for caching
if use_cache:
await redis_store.store_json(model, geojson)
2023-12-17 12:20:07 +05:30
resp = geojson
elif model.can_get_features_as_df:
## Get the GeoDataframe (gdf) with GeoPandas
## get_popup and get_propertites get the gdf as argument
## and can use vectorised operations
try:
gdf = await model.get_gdf(cast=True, with_related=True,
# filter_columns=True,
preserve_topology=preserveTopology,
simplify_tolerance=simplify)
except CancelledError as err:
logger.debug(f'Getting {store_name} cancelled while getting gdf')
raise err
except Exception as err:
logger.exception(err)
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR)
## The query of category defined models gets the status
## (not sure how and this could be skipped)
## Other models do not have: just add it manually from the model itself
if 'status' not in gdf.columns:
gdf['status'] = model.status
if 'popup' not in gdf.columns:
gdf['popup'] = await model.get_popup(gdf)
# Add properties
properties = await model.get_properties(gdf)
columns = ['geom', 'status', 'popup']
for property, values in properties.items():
columns.append(property)
gdf[property] = values
geojson = gdf[columns].to_json(separators=(',', ':'),
check_circular=False)
## Store to redis for caching
if use_cache:
await redis_store.store_json(model, geojson)
resp = geojson
else:
raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR,
detail='Gino is for: Gino Is No Option')
# logger.warn(f"{model} doesn't allow using dataframe for generating json!")
# attrs, features_kwargs = await model.get_features_attrs(simplify)
# ## Using gino: allows OO model (get_info, etc)
# try:
# attrs['features'] = await model.get_features_in_bulk_gino(**features_kwargs)
# except Exception as err:
# logger.exception(err)
# raise status.HTTP_500_INTERNAL_SERVER_ERROR
# resp = attrs
2023-12-17 12:20:07 +05:30
headers = {}
if model.cache_enabled and ttag:
2023-12-17 12:20:07 +05:30
headers['ETag'] = ttag
return Response(content=resp,
media_type="application/json", headers=headers)
# @api.get('/gj/{store_name}/popup/{id}')
# async def gj_popup(store_name: str, id: int):
# model = registry.geom.get(store_name)
# if not hasattr(model, 'get_popup_dynamic'):
# return ''
# obj = await model.get(id)
# ## Escape characters for json
# popup_more = obj.get_popup_dynamic().replace('"', '\\"').replace('\n', '\\n')
# return {"text": popup_more}