Fix primary keys (optional) Add baskets, importers, plugins, reactor Add fake replacement fro graphql defs (to_migrate) Add typing marker (py.typed)
166 lines
6 KiB
Python
166 lines
6 KiB
Python
"""
|
|
Geographical json stores, served under /gj
|
|
Used for displaying features on maps
|
|
"""
|
|
from json import JSONDecodeError
|
|
import logging
|
|
from typing import Annotated
|
|
from asyncio import CancelledError
|
|
|
|
from fastapi import (FastAPI, HTTPException, Response, Header, WebSocket, WebSocketDisconnect,
|
|
status, responses)
|
|
|
|
from gisaf.redis_tools import store as redis_store
|
|
from gisaf.live import live_server
|
|
from gisaf.registry import registry
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
api = FastAPI(
|
|
default_response_class=responses.ORJSONResponse,
|
|
)
|
|
|
|
class ConnectionManager:
|
|
active_connections: list[WebSocket]
|
|
def __init__(self):
|
|
self.active_connections = []
|
|
|
|
async def connect(self, websocket: WebSocket):
|
|
await websocket.accept()
|
|
self.active_connections.append(websocket)
|
|
|
|
def disconnect(self, websocket: WebSocket):
|
|
self.active_connections.remove(websocket)
|
|
|
|
async def send_personal_message(self, message: str, websocket: WebSocket):
|
|
await websocket.send_text(message)
|
|
|
|
async def broadcast(self, message: str):
|
|
for connection in self.active_connections:
|
|
await connection.send_text(message)
|
|
|
|
manager = ConnectionManager()
|
|
|
|
@api.websocket('/live/{store}')
|
|
async def live_layer(store: str, websocket: WebSocket):
|
|
"""
|
|
Websocket for live layer updates
|
|
"""
|
|
await websocket.accept()
|
|
try:
|
|
while True:
|
|
try:
|
|
msg_data = await websocket.receive_json()
|
|
except JSONDecodeError:
|
|
msg_text = await websocket.receive_text()
|
|
if msg_text == 'close':
|
|
await websocket.close()
|
|
continue
|
|
# else:
|
|
if 'message' in msg_data:
|
|
if msg_data['message'] == 'subscribeLiveLayer':
|
|
live_server.add_subscription(websocket, store)
|
|
elif msg_data['message'] == 'unsubscribeLiveLayer':
|
|
live_server.remove_subscription(websocket, store)
|
|
else:
|
|
logger.warning(f'Got websocket message with no message field: {msg_data}')
|
|
except WebSocketDisconnect:
|
|
logger.debug('Websocket disconnected')
|
|
|
|
# logger.debug('websocket connection closed')
|
|
|
|
@api.get('/{store_name}')
|
|
async def get_geojson(store_name,
|
|
If_None_Match: Annotated[str | None, Header()] = None,
|
|
simplify: Annotated[float | None, Header()] = 50.0,
|
|
):
|
|
"""
|
|
Some REST stores coded manually (route prefixed with "gj": geojson).
|
|
:param store_name: name of the model
|
|
:return: json
|
|
"""
|
|
use_cache = False
|
|
try:
|
|
model = registry.stores.loc[store_name].model
|
|
except KeyError:
|
|
raise HTTPException(status.HTTP_404_NOT_FOUND)
|
|
|
|
if hasattr(model, 'viewable_role') and model.viewable_role:
|
|
await check_permission(request, model.viewable_role)
|
|
|
|
if await redis_store.has_channel(store_name):
|
|
## Live layers
|
|
data = await redis_store.get_layer_as_json(store_name)
|
|
return Response(content=data.decode(),
|
|
media_type="application/json")
|
|
|
|
# elif not model:
|
|
# raise HTTPException(status.HTTP_404_NOT_FOUND)
|
|
|
|
if model.cache_enabled:
|
|
ttag = await redis_store.get_ttag(store_name)
|
|
if ttag and If_None_Match == ttag:
|
|
return status.HTTP_304_NOT_MODIFIED
|
|
|
|
if hasattr(model, 'get_geojson'):
|
|
geojson = await model.get_geojson(simplify_tolerance=simplify, registry=registry)
|
|
## Store to redis for caching
|
|
if use_cache:
|
|
await redis_store.store_json(model, geojson)
|
|
resp = geojson
|
|
|
|
elif model.can_get_features_as_df:
|
|
## Get the GeoDataframe (gdf) with GeoPandas
|
|
## get_popup and get_propertites get the gdf as argument and can use vectorised operations
|
|
try:
|
|
gdf = await model.get_geo_df(cast=True, with_related=True, filter_columns=True)
|
|
except CancelledError as err:
|
|
logger.debug(f'Request for {store_name} cancelled while getting gdf')
|
|
raise err
|
|
except Exception as err:
|
|
logger.exception(err)
|
|
raise status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
## The query of category defined models gets the status (not sure how and this could be skipped)
|
|
## Other models do not have: just add it manually from the model itself
|
|
if 'status' not in gdf.columns:
|
|
gdf['status'] = model.status
|
|
if 'popup' not in gdf.columns:
|
|
gdf['popup'] = await model.get_popup(gdf)
|
|
properties = await model.get_properties(gdf)
|
|
columns = ['geometry', 'status', 'popup']
|
|
for property, values in properties.items():
|
|
columns.append(property)
|
|
gdf[property] = values
|
|
geojson = gdf[columns].to_json(separators=(',', ':'), check_circular=False)
|
|
## Store to redis for caching
|
|
if use_cache:
|
|
await redis_store.store_json(model, geojson)
|
|
resp = geojson
|
|
|
|
else:
|
|
logger.warn(f"{model} doesn't allow using dataframe for generating json!")
|
|
attrs, features_kwargs = await model.get_features_attrs(simplify)
|
|
## Using gino: allows OO model (get_info, etc)
|
|
try:
|
|
attrs['features'] = await model.get_features_in_bulk_gino(**features_kwargs)
|
|
except Exception as err:
|
|
logger.exception(err)
|
|
raise status.HTTP_500_INTERNAL_SERVER_ERROR
|
|
resp = attrs
|
|
|
|
headers = {}
|
|
if model.cache_enabled and ttag:
|
|
headers['ETag'] = ttag
|
|
return Response(content=resp, media_type="application/json", headers=headers)
|
|
|
|
|
|
@api.get('/gj/{store_name}/popup/{id}')
|
|
async def gj_popup(store_name: str, id: int):
|
|
model = registry.geom.get(store_name)
|
|
if not hasattr(model, 'get_popup_dynamic'):
|
|
return ''
|
|
obj = await model.get(id)
|
|
## Escape characters for json
|
|
popup_more = obj.get_popup_dynamic().replace('"', '\\"').replace('\n', '\\n')
|
|
return {"text": popup_more}
|