Add resource provider
Some checks failed
/ build (push) Failing after 14s
/ test (push) Successful in 4s

This commit is contained in:
phil 2025-01-28 19:48:35 +01:00
parent 61be70054b
commit 5b31ef888c
7 changed files with 152 additions and 83 deletions

View file

@ -13,6 +13,7 @@ from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.middleware.cors import CORSMiddleware
from starlette.middleware.sessions import SessionMiddleware
from authlib.integrations.starlette_client.apps import StarletteOAuth2App
from authlib.integrations.base_client import OAuthError
@ -23,7 +24,7 @@ from authlib.oauth2.rfc6749 import OAuth2Token
# from fastapi.security import OpenIdConnect
# from pkce import generate_code_verifier, generate_pkce_pair
from .settings import settings, OIDCProvider
from .settings import settings
from .models import User
from .auth_utils import (
get_oidc_provider,
@ -31,21 +32,37 @@ from .auth_utils import (
hasrole,
get_current_user_or_none,
get_current_user,
get_resource_user,
authlib_oauth,
get_token,
oidc_providers_settings,
)
from .auth_misc import pretty_details
from .database import db
from .resource_server import get_resource
logger = logging.getLogger("uvicorn.error")
templates = Jinja2Templates(Path(__file__).parent / "templates")
origins = [
"https://tiptop:3002",
"https://philo.ydns.eu/",
]
app = FastAPI(
title="OIDC auth test",
)
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.mount(
"/static", StaticFiles(directory=Path(__file__).parent / "static"), name="static"
)
@ -56,32 +73,6 @@ app.add_middleware(
secret_key=settings.secret_key,
)
# Add oidc providers to authlib from the settings
# fastapi_providers: dict[str, OpenIdConnect] = {}
oidc_providers_settings: dict[str, OIDCProvider] = {}
for provider in settings.oidc.providers:
authlib_oauth.register(
name=provider.id,
server_metadata_url=provider.openid_configuration,
client_kwargs={
"scope": "openid email offline_access profile",
},
client_id=provider.client_id,
client_secret=provider.client_secret,
api_base_url=provider.url,
# For PKCE (not implemented yet):
# code_challenge_method="S256",
# fetch_token=fetch_token,
# update_token=update_token,
# client_id="some-client-id", # if enabled, authlib will also check that the access token belongs to this client id (audience)
)
# fastapi_providers[provider.id] = OpenIdConnect(
# openIdConnectUrl=provider.openid_configuration
# )
oidc_providers_settings[provider.id] = provider
@app.get("/")
async def home(
@ -281,43 +272,16 @@ async def non_compliant_logout(
@app.get("/resource/{id}")
async def get_resource(
async def get_resource_(
id: str,
request: Request,
user: Annotated[User, Depends(get_current_user)],
oidc_provider: Annotated[StarletteOAuth2App, Depends(get_oidc_provider)],
token: Annotated[OAuth2Token, Depends(get_token)],
# user: Annotated[User, Depends(get_current_user)],
# oidc_provider: Annotated[StarletteOAuth2App, Depends(get_oidc_provider)],
# token: Annotated[OAuth2Token, Depends(get_token)],
user: Annotated[User, Depends(get_resource_user)],
) -> JSONResponse:
"""Generic path for testing a resource provided by a provider"""
assert user is not None # Just to keep QA checks happy
if oidc_provider is None:
raise HTTPException(
status.HTTP_406_NOT_ACCEPTABLE, detail="No such oidc provider"
)
if (
provider := oidc_providers_settings.get(
request.session.get("oidc_provider_id", "")
)
) is None:
raise HTTPException(
status.HTTP_406_NOT_ACCEPTABLE, detail="No oidc provider setting"
)
try:
resource = next(x for x in provider.resources if x.id == id)
except StopIteration:
raise HTTPException(
status.HTTP_406_NOT_ACCEPTABLE, detail="No such resource for this provider"
)
if (
response := await oidc_provider.get(
resource.url,
# headers={"Authorization": f"token {token['access_token']}"},
token=token,
)
).is_success:
return JSONResponse(response.json())
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
return JSONResponse(await get_resource(id, user))
# Routes for test