oidc-fastapi-test/src/oidc-test/main.py

127 lines
4 KiB
Python

from typing import Annotated
from httpx import HTTPError
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.templating import Jinja2Templates
from starlette.middleware.sessions import SessionMiddleware
from authlib.integrations.starlette_client.apps import StarletteOAuth2App
from authlib.integrations.starlette_client import OAuth, OAuthError
from .settings import settings
from .models import User
from .auth_utils import hasrole, get_current_user_or_none, get_current_user
templates = Jinja2Templates("src/templates")
app = FastAPI(
title="OIDC auth test",
)
# SessionMiddleware is required by authlib
app.add_middleware(
SessionMiddleware,
secret_key=settings.secret_key,
)
# Add oidc providers from the settings
authlib_oauth = OAuth()
for provider in settings.oidc.providers:
authlib_oauth.register(
name=provider.name,
server_metadata_url=provider.provider_url,
client_kwargs={"scope": "openid email offline_access profile roles"},
client_id=provider.client_id,
client_secret=provider.client_secret,
# client_id="some-client-id", # if enabled, authlib will also check that the access token belongs to this client id (audience)
)
@app.get("/login")
async def login(request: Request, provider: str) -> RedirectResponse:
redirect_uri = request.url_for("auth", provider=provider)
try:
provider_: StarletteOAuth2App = getattr(authlib_oauth, provider)
except AttributeError:
raise HTTPException(500, "No such provider")
try:
return await provider_.authorize_redirect(request, redirect_uri)
except HTTPError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Cannot reach provider")
@app.get("/auth/{provider}")
async def auth(request: Request, provider: str) -> RedirectResponse:
try:
provider_: StarletteOAuth2App = getattr(authlib_oauth, provider)
except AttributeError:
raise HTTPException(500, "No such provider")
try:
token = await provider_.authorize_access_token(request)
except OAuthError as error:
raise HTTPException(status_code=401, detail=error.error)
user = token.get("userinfo")
if user:
request.session["user"] = dict(user)
return RedirectResponse(url="/")
else:
return RedirectResponse(url="/login")
@app.get("/logout")
async def logout(request: Request) -> RedirectResponse:
request.session.pop("user", None)
return RedirectResponse(url="/")
@app.get("/")
async def home(
request: Request, user: Annotated[User, Depends(get_current_user_or_none)]
) -> HTMLResponse:
return templates.TemplateResponse(
request=request,
context={
"settings": settings.model_dump(),
"user": user,
"auth_data": request.session.get("user"),
},
name="index.html",
)
@app.get("/public")
async def public() -> HTMLResponse:
return HTMLResponse("<h1>Not protected</h1>")
@app.get("/protected")
async def get_protected(
user: Annotated[User, Depends(get_current_user)]
) -> HTMLResponse:
return HTMLResponse("<h1>Only authenticated users can see this</h1>")
@app.get("/protected-by-foorole")
@hasrole("foorole")
async def get_protected_by_foorole(request: Request) -> HTMLResponse:
return HTMLResponse("<h1>Only users with foorole can see this</h1>")
@app.get("/protected-by-barrole")
@hasrole("barrole")
async def get_protected_by_barrole(request: Request) -> HTMLResponse:
return HTMLResponse("<h1>Protected by barrole</h1>")
@app.get("/protected-by-foorole-and-barrole")
@hasrole("barrole")
@hasrole("foorole")
async def get_protected_by_foorole_and_barrole(request: Request) -> HTMLResponse:
return HTMLResponse("<h1>Only users with foorole and barrole can see this</h1>")
@app.get("/protected-by-foorole-or-barrole")
@hasrole(["foorole", "barrole"])
async def get_protected_by_foorole_or_barrole(request: Request) -> HTMLResponse:
return HTMLResponse("<h1>Only users with foorole or barrole can see this</h1>")