From ae19ba9f2777b33bb91af0664cadf033b530f2e4 Mon Sep 17 00:00:00 2001 From: phil Date: Sat, 11 May 2024 09:52:34 +0200 Subject: [PATCH] Code cosmetic --- src/gisaf/api/geoapi.py | 102 +++++++++++++++++++++++---------------- src/gisaf/live.py | 37 ++++++++------ src/gisaf/redis_tools.py | 12 ++--- 3 files changed, 89 insertions(+), 62 deletions(-) diff --git a/src/gisaf/api/geoapi.py b/src/gisaf/api/geoapi.py index 36b5c11..c32ca17 100644 --- a/src/gisaf/api/geoapi.py +++ b/src/gisaf/api/geoapi.py @@ -2,13 +2,22 @@ Geographical json stores, served under /gj Used for displaying features on maps """ + from json import JSONDecodeError import logging from typing import Annotated from asyncio import CancelledError -from fastapi import (Depends, APIRouter, HTTPException, Response, Header, - WebSocket, WebSocketDisconnect, status) +from fastapi import ( + Depends, + APIRouter, + HTTPException, + Response, + Header, + WebSocket, + WebSocketDisconnect, + status, +) from gisaf.models.authentication import User from gisaf.redis_tools import store as redis_store @@ -25,8 +34,10 @@ api = APIRouter( responses={404: {"description": "Not found"}}, ) + class ConnectionManager: active_connections: list[WebSocket] + def __init__(self): self.active_connections = [] @@ -44,9 +55,11 @@ class ConnectionManager: for connection in self.active_connections: await connection.send_text(message) + manager = ConnectionManager() -@api.websocket('/live/{store}') + +@api.websocket("/live/{store}") async def live_layer(store: str, websocket: WebSocket): """ Websocket for live layer updates @@ -58,29 +71,33 @@ async def live_layer(store: str, websocket: WebSocket): msg_data = await websocket.receive_json() except JSONDecodeError: msg_text = await websocket.receive_text() - if msg_text == 'close': + if msg_text == "close": await websocket.close() continue # else: - if 'message' in msg_data: - if msg_data['message'] == 'subscribeLiveLayer': + if "message" in msg_data: + if msg_data["message"] == "subscribeLiveLayer": live_server.add_subscription(websocket, store) - elif msg_data['message'] == 'unsubscribeLiveLayer': + elif msg_data["message"] == "unsubscribeLiveLayer": live_server.remove_subscription(websocket, store) else: - logger.warning(f'Got websocket message with no message field: {msg_data}') + logger.warning( + f"Got websocket message with no message field: {msg_data}" + ) except WebSocketDisconnect: - logger.debug('Websocket disconnected') + logger.debug("Websocket disconnected") # logger.debug('websocket connection closed') -@api.get('/{store_name}') -async def get_geojson(store_name, - user: User = Depends(get_current_active_user), - If_None_Match: Annotated[str | None, Header()] = None, - simplify: Annotated[float | None, Header()] = None, - preserveTopology: Annotated[bool|None, Header()] = None, - ): + +@api.get("/{store_name}") +async def get_geojson( + store_name, + user: User = Depends(get_current_active_user), + If_None_Match: Annotated[str | None, Header()] = None, + simplify: Annotated[float | None, Header()] = None, + preserveTopology: Annotated[bool | None, Header()] = None, +): """ Some REST stores coded manually (route prefixed with "gj": geojson). :param store_name: name of the model @@ -91,23 +108,23 @@ async def get_geojson(store_name, model = registry.stores.loc[store_name].model except KeyError: raise HTTPException(status.HTTP_404_NOT_FOUND) - if getattr(model, 'viewable_role', None): - if not(user and user.can_view(model)): + if getattr(model, "viewable_role", None): + if not (user and user.can_view(model)): username = user.username if user else "Anonymous" - logger.info(f'{username} tried to access {model}') + logger.info(f"{username} tried to access {model}") raise HTTPException(status.HTTP_401_UNAUTHORIZED) if await redis_store.has_channel(store_name): ## Live layers data = await redis_store.get_layer_as_json(store_name) - return Response(content=data.decode(), - media_type="application/json") + return Response(content=data.decode(), media_type="application/json") if model.cache_enabled: ttag = await redis_store.get_ttag(store_name) if ttag and If_None_Match == ttag: raise HTTPException(status.HTTP_304_NOT_MODIFIED) - if hasattr(model, 'get_geojson'): - geojson = await model.get_geojson(simplify_tolerance=simplify, - preserve_topology=preserveTopology) + if hasattr(model, "get_geojson"): + geojson = await model.get_geojson( + simplify_tolerance=simplify, preserve_topology=preserveTopology + ) ## Store to redis for caching if use_cache: await redis_store.store_json(model, geojson) @@ -117,12 +134,15 @@ async def get_geojson(store_name, ## get_popup and get_propertites get the gdf as argument ## and can use vectorised operations try: - gdf = await model.get_gdf(cast=True, with_related=True, - # filter_columns=True, - preserve_topology=preserveTopology, - simplify_tolerance=simplify) + gdf = await model.get_gdf( + cast=True, + with_related=True, + # filter_columns=True, + preserve_topology=preserveTopology, + simplify_tolerance=simplify, + ) except CancelledError as err: - logger.debug(f'Getting {store_name} cancelled while getting gdf') + logger.debug(f"Getting {store_name} cancelled while getting gdf") raise err except Exception as err: logger.exception(err) @@ -130,25 +150,26 @@ async def get_geojson(store_name, ## The query of category defined models gets the status ## (not sure how and this could be skipped) ## Other models do not have: just add it manually from the model itself - if 'status' not in gdf.columns: - gdf['status'] = model.status - if 'popup' not in gdf.columns: - gdf['popup'] = await model.get_popup(gdf) + if "status" not in gdf.columns: + gdf["status"] = model.status + if "popup" not in gdf.columns: + gdf["popup"] = await model.get_popup(gdf) # Add properties properties = await model.get_properties(gdf) - columns = ['geom', 'status', 'popup'] + columns = ["geom", "status", "popup"] for property, values in properties.items(): columns.append(property) gdf[property] = values - geojson = gdf[columns].to_json(separators=(',', ':'), - check_circular=False) + geojson = gdf[columns].to_json(separators=(",", ":"), check_circular=False) ## Store to redis for caching if use_cache: await redis_store.store_json(model, geojson) resp = geojson else: - raise HTTPException(status.HTTP_500_INTERNAL_SERVER_ERROR, - detail='Gino is for: Gino Is No Option') + raise HTTPException( + status.HTTP_500_INTERNAL_SERVER_ERROR, + detail="Gino is for: Gino Is No Option", + ) # logger.warn(f"{model} doesn't allow using dataframe for generating json!") # attrs, features_kwargs = await model.get_features_attrs(simplify) # ## Using gino: allows OO model (get_info, etc) @@ -161,9 +182,8 @@ async def get_geojson(store_name, headers = {} if model.cache_enabled and ttag: - headers['ETag'] = ttag - return Response(content=resp, - media_type="application/json", headers=headers) + headers["ETag"] = ttag + return Response(content=resp, media_type="application/json", headers=headers) # @api.get('/gj/{store_name}/popup/{id}') diff --git a/src/gisaf/live.py b/src/gisaf/live.py index bc69984..ce76c34 100644 --- a/src/gisaf/live.py +++ b/src/gisaf/live.py @@ -9,6 +9,7 @@ from gisaf.redis_tools import store logger = logging.getLogger(__name__) + class LiveServer: def __init__(self): self.ws_clients = defaultdict(set) @@ -18,10 +19,10 @@ class LiveServer: Setup for the live server """ if with_mqtt: - logger.warning('Gisaf LiveServer does not support with_mqtt: ignoring') + logger.warning("Gisaf LiveServer does not support with_mqtt: ignoring") if listen_to_redis: self.pub = store.redis.pubsub() - await self.pub.psubscribe('live:*:json') + await self.pub.psubscribe("live:*:json") asyncio.create_task(self._listen_to_redis()) async def _listen_to_redis(self): @@ -30,9 +31,10 @@ class LiveServer: and send the messages to websockets """ async for msg in self.pub.listen(): - if msg['type'] == 'pmessage': - await self._send_to_ws_clients(msg['channel'].decode(), - msg['data'].decode()) + if msg["type"] == "pmessage": + await self._send_to_ws_clients( + msg["channel"].decode(), msg["data"].decode() + ) async def _send_to_ws_clients(self, store_name, json_data): """ @@ -40,32 +42,36 @@ class LiveServer: to that channel (store_name) """ if len(self.ws_clients[store_name]) > 0: - logger.debug(f'WS channel {store_name} got {len(json_data)} bytes to send to:' - f' {", ".join([str(id(ws)) for ws in self.ws_clients[store_name]])}') + logger.debug( + f"WS channel {store_name} got {len(json_data)} bytes to send to:" + f" {', '.join([str(id(ws)) for ws in self.ws_clients[store_name]])}" + ) for ws in self.ws_clients[store_name]: - if ws.client_state.name != 'CONNECTED': - logger.debug(f'Cannot send {store_name} for WS {id(ws)}, state: {ws.client_state.name}') + if ws.client_state.name != "CONNECTED": + logger.debug( + f"Cannot send {store_name} for WS {id(ws)}, state: {ws.client_state.name}" + ) continue try: await ws.send_text(json_data) - logger.debug(f'Sent live update for WS {id(ws)}: {len(json_data)}') + logger.debug(f"Sent live update for WS {id(ws)}: {len(json_data)}") except RuntimeError as err: ## The ws is probably closed, remove it from the clients - logger.debug(f'Cannot send live update for {store_name}: {err}') + logger.debug(f"Cannot send live update for {store_name}: {err}") del self.ws_clients[store_name] else: pass - #logger.debug(f'WS channel {store_name} has no clients') + # logger.debug(f'WS channel {store_name} has no clients') - def add_subscription(self, ws, store_name): + def add_subscription(self, ws: WebSocket, store_name: str): """ Add the websocket subscription to the layer """ channel = store.get_json_channel(store_name) - logger.debug(f'WS {id(ws)} subscribed to {channel}') + logger.debug(f"WS {id(ws)} subscribed to {channel}") self.ws_clients[channel].add(ws) - def remove_subscription(self, ws, store_name): + def remove_subscription(self, ws: WebSocket, store_name: str): """ Remove the websocket subscription to the layer """ @@ -78,4 +84,5 @@ async def setup_live(): global live_server await live_server.setup(listen_to_redis=True) + live_server = LiveServer() diff --git a/src/gisaf/redis_tools.py b/src/gisaf/redis_tools.py index 3a2411b..f19df5b 100644 --- a/src/gisaf/redis_tools.py +++ b/src/gisaf/redis_tools.py @@ -125,37 +125,37 @@ class Store: self.uuid = await self.redis.client_getname() self.uuid = str(uuid1()) - def get_json_channel(self, store_name): + def get_json_channel(self, store_name) -> str: """ Name of the Redis channel for the json representation """ return f"{store_name}:json" - def get_gdf_channel(self, store_name): + def get_gdf_channel(self, store_name) -> str: """ Name of the Redis channel for the source gdf, in pickle format """ return f"{store_name}:gdf" - def get_layer_def_channel(self, store_name): + def get_layer_def_channel(self, store_name) -> str: """ Name of the Redis channel for the layer definition """ return f"{store_name}:layer_def" - def get_mapbox_layout_channel(self, store_name): + def get_mapbox_layout_channel(self, store_name) -> str: """ Name of the Redis channel for the mapbox layout style definition """ return f"{store_name}:mapbox_layout" - def get_mapbox_paint_channel(self, store_name): + def get_mapbox_paint_channel(self, store_name) -> str: """ Name of the Redis channel for the mapbox paint style definition """ return f"{store_name}:mapbox_paint" - async def store_json(self, model, geojson, **kwargs): + async def store_json(self, model, geojson, **kwargs) -> None: """ Store the json representation of the gdf for caching. """