oidc-fastapi-test/src/oidc_test/database.py

48 lines
1.4 KiB
Python
Raw Normal View History

# Implement a fake in-memory database interface for demo purpose
import logging
from authlib.integrations.starlette_client.apps import StarletteOAuth2App
from .models import User, OAuth2Token, Role
logger = logging.getLogger(__name__)
class Database:
users: dict[str, User] = {}
2025-01-09 23:41:32 +01:00
tokens: dict[str, OAuth2Token] = {}
# Last sessions for the user (key: users's subject id (sub))
async def add_user(
self,
sub: str,
user_info: dict,
oidc_provider: StarletteOAuth2App,
user_info_from_endpoint: dict,
) -> User:
user = User.from_auth(userinfo=user_info, oidc_provider=oidc_provider)
try:
raw_roles = user_info_from_endpoint["resource_access"][
oidc_provider.client_id
]["roles"]
except Exception as err:
logger.debug(f"Cannot read additional roles: {err}")
raw_roles = []
for raw_role in raw_roles:
user.roles.append(Role(name=raw_role))
self.users[sub] = user
return user
2025-01-09 23:41:32 +01:00
async def get_user(self, sub: str) -> User:
return self.users[sub]
2025-01-09 23:41:32 +01:00
async def add_token(self, token_dict: dict, user: User) -> None:
2025-01-11 20:41:33 +01:00
self.tokens[token_dict['id_token']] = OAuth2Token.from_dict(token_dict=token_dict, user=user)
2025-01-09 23:41:32 +01:00
async def get_token(self, name) -> OAuth2Token | None:
return self.tokens.get(name)
db = Database()