Geoapi: checkviewable permissions

This commit is contained in:
phil 2024-01-03 12:24:13 +05:30
parent 76a4e8f023
commit f04aac5667
3 changed files with 23 additions and 10 deletions

View file

@ -7,12 +7,14 @@ import logging
from typing import Annotated from typing import Annotated
from asyncio import CancelledError from asyncio import CancelledError
from fastapi import (FastAPI, HTTPException, Response, Header, WebSocket, WebSocketDisconnect, from fastapi import (Depends, FastAPI, HTTPException, Response, Header, WebSocket, WebSocketDisconnect,
status, responses) status, responses)
from gisaf.models.authentication import User
from gisaf.redis_tools import store as redis_store from gisaf.redis_tools import store as redis_store
from gisaf.live import live_server from gisaf.live import live_server
from gisaf.registry import registry from gisaf.registry import registry
from gisaf.security import get_current_active_user, can_view
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -72,6 +74,7 @@ async def live_layer(store: str, websocket: WebSocket):
@api.get('/{store_name}') @api.get('/{store_name}')
async def get_geojson(store_name, async def get_geojson(store_name,
user: User = Depends(get_current_active_user),
If_None_Match: Annotated[str | None, Header()] = None, If_None_Match: Annotated[str | None, Header()] = None,
simplify: Annotated[float | None, Header()] = 50.0, simplify: Annotated[float | None, Header()] = 50.0,
): ):
@ -86,8 +89,10 @@ async def get_geojson(store_name,
except KeyError: except KeyError:
raise HTTPException(status.HTTP_404_NOT_FOUND) raise HTTPException(status.HTTP_404_NOT_FOUND)
if hasattr(model, 'viewable_role') and model.viewable_role: if hasattr(model, 'viewable_role'):
await check_permission(request, model.viewable_role) if not(user and user.can_view(model)):
logger.info(f'{user.username if user else "Anonymous"} tried to access {model}')
raise HTTPException(status.HTTP_401_UNAUTHORIZED)
if await redis_store.has_channel(store_name): if await redis_store.has_channel(store_name):
## Live layers ## Live layers

View file

@ -16,6 +16,7 @@ class UserRoleLink(SQLModel, table=True):
class UserBase(SQLModel): class UserBase(SQLModel):
username: str username: str
email: str email: str
disabled: bool | None = False
class User(UserBase, table=True): class User(UserBase, table=True):
@ -25,6 +26,12 @@ class User(UserBase, table=True):
link_model=UserRoleLink) link_model=UserRoleLink)
password: str | None = None password: str | None = None
def can_view(self, model) -> bool:
if hasattr(model, 'viewable_role'):
return model.viewable_role in (role.name for role in self.roles)
else:
return True
class RoleBase(SQLModel): class RoleBase(SQLModel):
name: str = Field(unique=True) name: str = Field(unique=True)

View file

@ -1,5 +1,6 @@
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging import logging
from typing import Annotated
from fastapi import Depends, HTTPException, status from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
@ -130,18 +131,18 @@ async def get_current_user(
async def authenticate_user(username: str, password: str): async def authenticate_user(username: str, password: str):
async with db_session() as session: async with db_session() as session:
user = await get_user(session, username) user = await get_user(session, username)
if not user: if not user or user.disabled:
return False return False
if not verify_password(user, password): if not verify_password(user, password):
return False return False
return user return user
# async def get_current_active_user( async def get_current_active_user(
# current_user: Annotated[UserRead, Depends(get_current_user)]): current_user: Annotated[UserRead, Depends(get_current_user)]):
# if current_user.disabled: if current_user is not None and current_user.disabled:
# raise HTTPException(status_code=400, detail="Inactive user") raise HTTPException(status_code=400, detail="Inactive user")
# return current_user return current_user
def create_access_token(data: dict, expires_delta: timedelta): def create_access_token(data: dict, expires_delta: timedelta):