List of resources for OIDC providers

This commit is contained in:
phil 2025-01-19 14:26:54 +01:00
parent f14d8d3114
commit 54345dcafd
6 changed files with 74 additions and 32 deletions
src/oidc_test

View file

@ -21,10 +21,11 @@ from authlib.integrations.httpx_client import AsyncOAuth2Client
from authlib.oauth2.rfc6749 import OAuth2Token
from pkce import generate_code_verifier, generate_pkce_pair
from .settings import settings
from .settings import settings, OIDCProvider
from .models import User
from .auth_utils import (
get_oidc_provider,
get_oidc_provider_or_none,
hasrole,
get_current_user_or_none,
get_current_user,
@ -55,8 +56,8 @@ app.add_middleware(
# Add oidc providers to authlib from the settings
fastapi_providers = {}
_providers = {}
# fastapi_providers: dict[str, OpenIdConnect] = {}
providers_settings: dict[str, OIDCProvider] = {}
for provider in settings.oidc.providers:
authlib_oauth.register(
@ -74,17 +75,27 @@ for provider in settings.oidc.providers:
# update_token=update_token,
# client_id="some-client-id", # if enabled, authlib will also check that the access token belongs to this client id (audience)
)
fastapi_providers[provider.id] = OpenIdConnect(
openIdConnectUrl=provider.openid_configuration
)
_providers[provider.id] = provider
# fastapi_providers[provider.id] = OpenIdConnect(
# openIdConnectUrl=provider.openid_configuration
# )
providers_settings[provider.id] = provider
@app.get("/")
async def home(
request: Request, user: Annotated[User, Depends(get_current_user_or_none)]
request: Request,
user: Annotated[User, Depends(get_current_user_or_none)],
oidc_provider: Annotated[
StarletteOAuth2App | None, Depends(get_oidc_provider_or_none)
],
) -> HTMLResponse:
now = datetime.now()
if oidc_provider and (
(provider := providers_settings.get(oidc_provider.name)) is not None
):
resources = provider.resources
else:
resources = []
return templates.TemplateResponse(
name="home.html",
request=request,
@ -92,6 +103,7 @@ async def home(
"settings": settings.model_dump(),
"user": user,
"now": now,
"resources": resources,
"user_info_details": (
pretty_details(user, now)
if user and settings.oidc.show_session_details
@ -112,11 +124,13 @@ async def login(request: Request, oidc_provider_id: str) -> RedirectResponse:
"""
redirect_uri = request.url_for("auth", oidc_provider_id=oidc_provider_id)
try:
provider_: StarletteOAuth2App = getattr(authlib_oauth, oidc_provider_id)
provider: StarletteOAuth2App = getattr(authlib_oauth, oidc_provider_id)
except AttributeError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No such provider")
if (
code_challenge_method := _providers[oidc_provider_id].code_challenge_method
code_challenge_method := providers_settings[
oidc_provider_id
].code_challenge_method
) is not None:
client = AsyncOAuth2Client(..., code_challenge_method=code_challenge_method)
code_verifier = generate_code_verifier()
@ -124,7 +138,7 @@ async def login(request: Request, oidc_provider_id: str) -> RedirectResponse:
else:
code_verifier = None
try:
response = await provider_.authorize_redirect(
response = await provider.authorize_redirect(
request,
redirect_uri,
access_type="offline",
@ -238,26 +252,37 @@ async def non_compliant_logout(
# Route for OAuth resource server
@app.get("/resource/{name}")
@app.get("/resource/{id}")
async def get_resource(
name: str,
id: str,
request: Request,
user: Annotated[User, Depends(get_current_user)],
oidc_provider: Annotated[StarletteOAuth2App, Depends(get_oidc_provider)],
token: Annotated[OAuth2Token, Depends(get_token)],
) -> HTMLResponse:
) -> JSONResponse:
"""Generic path for testing a resource provided by a provider"""
provider = _providers[oidc_provider.name]
if oidc_provider is None:
raise HTTPException(
status.HTTP_406_NOT_ACCEPTABLE, detail="No such oidc provider"
)
if (provider := providers_settings.get(oidc_provider.name)) is None:
raise HTTPException(
status.HTTP_406_NOT_ACCEPTABLE, detail="No oidc provider setting"
)
try:
resource = next(x for x in provider.resources if x.id == id)
except StopIteration:
raise HTTPException(
status.HTTP_406_NOT_ACCEPTABLE, detail="No such resource for this provider"
)
if (
response := await oidc_provider.get(
"/api/v1/user/repos",
resource.url,
# headers={"Authorization": f"token {token['access_token']}"},
token=token,
)
).is_success:
repos = response.json()
names = [repo["name"] for repo in repos]
return HTMLResponse(f"{user.name} has {len(repos)} repos: {', '.join(names)}")
return JSONResponse(response.json())
else:
raise HTTPException(status_code=response.status_code, detail=response.text)