Remove OAuthToken from db (use authlib dict); basic OAuth2 service provider with Forgejo
Some checks failed
/ build (push) Failing after 13s
/ test (push) Successful in 4s

This commit is contained in:
phil 2025-01-18 06:20:44 +01:00
parent 21ccdad953
commit 2fe7536c53
10 changed files with 106 additions and 50 deletions

View file

@ -10,12 +10,15 @@ from urllib.parse import urlencode
from httpx import HTTPError
from fastapi import Depends, FastAPI, HTTPException, Request, status
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.responses import HTMLResponse, RedirectResponse, JSONResponse
from fastapi.templating import Jinja2Templates
from fastapi.security import OpenIdConnect
from starlette.middleware.sessions import SessionMiddleware
from authlib.integrations.starlette_client.apps import StarletteOAuth2App
from authlib.integrations.starlette_client import OAuthError
from authlib.integrations.base_client import OAuthError
from authlib.integrations.httpx_client import AsyncOAuth2Client
from authlib.oauth2.rfc6749 import OAuth2Token
from pkce import generate_code_verifier, generate_pkce_pair
from .settings import settings
from .models import User
@ -25,6 +28,7 @@ from .auth_utils import (
get_current_user_or_none,
get_current_user,
authlib_oauth,
get_token,
)
from .auth_misc import pretty_details
from .database import db
@ -77,8 +81,7 @@ for provider in settings.oidc.providers:
@app.get("/login/{oidc_provider_id}")
async def login(request: Request, oidc_provider_id: str) -> RedirectResponse:
"""Login with the provider id,
by giving the browser a redirect to its authorize page.
"""Login with the provider id, giving the browser a redirect to its authorize page.
The provider is expected to send the browser back to our own /auth/{oidc_provider_id} url
with the token.
"""
@ -87,9 +90,20 @@ async def login(request: Request, oidc_provider_id: str) -> RedirectResponse:
provider_: StarletteOAuth2App = getattr(authlib_oauth, oidc_provider_id)
except AttributeError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No such provider")
if (
code_challenge_method := _providers[oidc_provider_id].code_challenge_method
) is not None:
client = AsyncOAuth2Client(..., code_challenge_method=code_challenge_method)
code_verifier = generate_code_verifier()
logger.debug("TODO: PKCE")
else:
code_verifier = None
try:
response = await provider_.authorize_redirect(
request, redirect_uri, access_type="offline"
request,
redirect_uri,
access_type="offline",
code_verifier=code_verifier,
)
return response
except HTTPError:
@ -106,7 +120,7 @@ async def auth(request: Request, oidc_provider_id: str) -> RedirectResponse:
except AttributeError:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No such provider")
try:
token = await oidc_provider.authorize_access_token(request)
token: OAuth2Token = await oidc_provider.authorize_access_token(request)
except OAuthError as error:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, detail=error.error)
# Remember the oidc_provider in the session
@ -166,7 +180,7 @@ async def logout(
logger.warn(f"Cannot find end_session_endpoint for provider {provider.name}")
return RedirectResponse(request.url_for("non_compliant_logout"))
post_logout_uri = request.url_for("home")
if (id_token := await db.get_token(request.session.pop("token", None))) is None:
if (token := await db.get_token(request.session.pop("token", None))) is None:
logger.warn("No session in db for the token")
return RedirectResponse(request.url_for("home"))
logout_url = (
@ -175,7 +189,7 @@ async def logout(
+ urlencode(
{
"post_logout_redirect_uri": post_logout_uri,
"id_token_hint": id_token.raw_id_token,
"id_token_hint": token["id_token"],
"cliend_id": "oidc_local_test",
}
)
@ -260,6 +274,44 @@ async def get_protected_by_foorole_or_barrole(request: Request) -> HTMLResponse:
return HTMLResponse("<h1>Only users with foorole or barrole can see this</h1>")
@app.get("/introspect")
async def get_introspect(
request: Request,
provider: Annotated[StarletteOAuth2App, Depends(get_provider)],
token: Annotated[OAuth2Token, Depends(get_token)],
) -> JSONResponse:
if (
response := await provider.get(
provider.server_metadata["introspection_endpoint"],
token=token,
)
).is_success:
return response.json()
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
@app.get("/oauth2-forgejo-test")
async def get_forgejo_user_info(
request: Request,
user: Annotated[User, Depends(get_current_user)],
provider: Annotated[StarletteOAuth2App, Depends(get_provider)],
token: Annotated[OAuth2Token, Depends(get_token)],
) -> HTMLResponse:
if (
response := await provider.get(
"/api/v1/user/repos",
# headers={"Authorization": f"token {token['access_token']}"},
token=token,
)
).is_success:
repos = response.json()
names = [repo["name"] for repo in repos]
return HTMLResponse(f"{user.name} has {len(repos)} repos: {', '.join(names)}")
else:
raise HTTPException(status_code=response.status_code, detail=response.text)
# @app.get("/fast_api_depends")
# def fast_api_depends(
# token: Annotated[str, Depends(fastapi_providers["Keycloak"])]