oidc-fastapi-test/src/oidc_test/registry.py
phil 381ce1ebc1
All checks were successful
/ build (push) Successful in 5s
/ test (push) Successful in 5s
Use pydantic on ResourceServer
2025-02-13 12:23:18 +01:00

43 lines
1.1 KiB
Python

from importlib.metadata import entry_points
import logging
from typing import Any
from pydantic import BaseModel
from oidc_test.models import User
logger = logging.getLogger("registry")
class ProcessResult(BaseModel):
result: dict[str, Any] = {}
class ProcessError(Exception):
pass
class ResourceProvider(BaseModel):
scope_required: str | None = None
default_resource_id: str | None = None
def __init__(self, name: str):
super().__init__()
self.__name__ = name
async def process(self, user: User, resource_id: str | None = None) -> ProcessResult:
logger.warning(f"{self.__name__} should define a process method")
return ProcessResult()
class ResourceRegistry(BaseModel):
resource_providers: dict[str, ResourceProvider] = {}
def make_registry(self):
for ep in entry_points().select(group="oidc_test.resource_provider"):
ResourceProviderClass = ep.load()
if issubclass(ResourceProviderClass, ResourceProvider):
self.resource_providers[ep.name] = ResourceProviderClass(ep.name)
registry = ResourceRegistry()