Migrate all resources to json contents;
All checks were successful
/ build (push) Successful in 5s
/ test (push) Successful in 5s

improve token decoding & logging error messages
This commit is contained in:
phil 2025-02-07 16:09:49 +01:00
parent d39adf41ef
commit 3eb6dc3dcf
6 changed files with 77 additions and 87 deletions

View file

@ -5,7 +5,7 @@ import logging
from fastapi import HTTPException, Request, Depends, status from fastapi import HTTPException, Request, Depends, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from authlib.integrations.starlette_client import OAuth, OAuthError, StarletteOAuth2App from authlib.integrations.starlette_client import OAuth, OAuthError, StarletteOAuth2App
from jwt import ExpiredSignatureError, InvalidKeyError from jwt import ExpiredSignatureError, InvalidKeyError, DecodeError
from httpx import AsyncClient from httpx import AsyncClient
# from authlib.oauth1.auth import OAuthToken # from authlib.oauth1.auth import OAuthToken
@ -147,8 +147,8 @@ async def get_token(request: Request) -> OAuth2Token:
oidc_provider_settings, oidc_provider_settings,
request.session.get("sid"), request.session.get("sid"),
) )
except (TokenNotInDb, InvalidKeyError): except (TokenNotInDb, InvalidKeyError, DecodeError) as err:
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No token") raise HTTPException(status.HTTP_401_UNAUTHORIZED, err.__class__.__name__)
async def get_current_user_or_none(request: Request) -> User | None: async def get_current_user_or_none(request: Request) -> User | None:
@ -208,9 +208,14 @@ async def get_user_from_token(
try: try:
auth_provider_settings = oidc_providers_settings[auth_provider_id] auth_provider_settings = oidc_providers_settings[auth_provider_id]
except KeyError: except KeyError:
raise HTTPException( if auth_provider_id == "":
status.HTTP_401_UNAUTHORIZED, f"Unknown auth provider '{auth_provider_id}'" raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No auth provider")
) else:
raise HTTPException(
status.HTTP_401_UNAUTHORIZED, f"Unknown auth provider '{auth_provider_id}'"
)
if token == "":
raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No token")
try: try:
payload = auth_provider_settings.decode(token) payload = auth_provider_settings.decode(token)
except ExpiredSignatureError as err: except ExpiredSignatureError as err:

View file

@ -4,8 +4,7 @@ import logging
from httpx import AsyncClient from httpx import AsyncClient
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
from fastapi import FastAPI, HTTPException, Depends, Request, status from fastapi import FastAPI, HTTPException, Depends, status
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
# from starlette.middleware.sessions import SessionMiddleware # from starlette.middleware.sessions import SessionMiddleware
@ -16,8 +15,8 @@ from .models import User
from .auth_utils import ( from .auth_utils import (
get_user_from_token, get_user_from_token,
UserWithRole, UserWithRole,
get_oidc_provider, # get_oidc_provider,
get_token, # get_token,
) )
from .settings import settings from .settings import settings
@ -47,44 +46,46 @@ resource_server.add_middleware(
@resource_server.get("/public") @resource_server.get("/public")
async def public() -> HTMLResponse: async def public() -> dict:
return HTMLResponse("<h1>Not protected</h1>") return {"msg": "Not protected"}
@resource_server.get("/protected") @resource_server.get("/protected")
async def get_protected(user: Annotated[User, Depends(get_user_from_token)]) -> HTMLResponse: async def get_protected(user: Annotated[User, Depends(get_user_from_token)]):
assert user is not None # Just to keep QA checks happy assert user is not None # Just to keep QA checks happy
return HTMLResponse("<h1>Only authenticated users can see this</h1>") return {"msg": "Only authenticated users can see this"}
@resource_server.get("/protected-by-foorole") @resource_server.get("/protected-by-foorole")
async def get_protected_by_foorole( async def get_protected_by_foorole(
user: Annotated[User, Depends(UserWithRole("foorole"))] user: Annotated[User, Depends(UserWithRole("foorole"))],
) -> HTMLResponse: ):
return HTMLResponse("<h1>Only users with foorole can see this</h1>") assert user is not None
return {"msg": "Only users with foorole can see this"}
@resource_server.get("/protected-by-barrole") @resource_server.get("/protected-by-barrole")
async def get_protected_by_barrole( async def get_protected_by_barrole(
user: Annotated[User, Depends(UserWithRole("barrole"))] user: Annotated[User, Depends(UserWithRole("barrole"))],
) -> HTMLResponse: ):
return HTMLResponse("<h1>Protected by barrole</h1>") assert user is not None
return {"msg": "Protected by barrole"}
@resource_server.get("/protected-by-foorole-and-barrole") @resource_server.get("/protected-by-foorole-and-barrole")
async def get_protected_by_foorole_and_barrole( async def get_protected_by_foorole_and_barrole(
user: Annotated[User, Depends(UserWithRole("foorole")), Depends(UserWithRole("barrole"))], user: Annotated[User, Depends(UserWithRole("foorole")), Depends(UserWithRole("barrole"))],
) -> HTMLResponse: ):
assert user is not None # Just to keep QA checks happy assert user is not None # Just to keep QA checks happy
return HTMLResponse("<h1>Only users with foorole and barrole can see this</h1>") return {"msg": "Only users with foorole and barrole can see this"}
@resource_server.get("/protected-by-foorole-or-barrole") @resource_server.get("/protected-by-foorole-or-barrole")
async def get_protected_by_foorole_or_barrole( async def get_protected_by_foorole_or_barrole(
user: Annotated[User, Depends(UserWithRole(["foorole", "barrole"]))] user: Annotated[User, Depends(UserWithRole(["foorole", "barrole"]))],
) -> HTMLResponse: ):
assert user is not None # Just to keep QA checks happy assert user is not None # Just to keep QA checks happy
return HTMLResponse("<h1>Only users with foorole or barrole can see this</h1>") return {"msg": "Only users with foorole or barrole can see this"}
# @resource_server.get("/introspect") # @resource_server.get("/introspect")
@ -118,9 +119,9 @@ async def get_resource_(
# oidc_provider: Annotated[StarletteOAuth2App, Depends(get_oidc_provider)], # oidc_provider: Annotated[StarletteOAuth2App, Depends(get_oidc_provider)],
# token: Annotated[OAuth2Token, Depends(get_token)], # token: Annotated[OAuth2Token, Depends(get_token)],
user: Annotated[User, Depends(get_user_from_token)], user: Annotated[User, Depends(get_user_from_token)],
) -> JSONResponse: ):
"""Generic path for testing a resource provided by a provider""" """Generic path for testing a resource provided by a provider"""
return JSONResponse(await get_resource(id, user)) return await get_resource(id, user)
async def get_resource(resource_id: str, user: User) -> dict: async def get_resource(resource_id: str, user: User) -> dict:

View file

@ -43,9 +43,7 @@ class OIDCProvider(BaseModel):
info_url: str | None = ( info_url: str | None = (
None # Used eg. for Keycloak's public key (see https://stackoverflow.com/questions/54318633/getting-keycloaks-public-key) None # Used eg. for Keycloak's public key (see https://stackoverflow.com/questions/54318633/getting-keycloaks-public-key)
) )
info: dict[str, str | int] | None = ( info: dict[str, str | int] | None = None # Info fetched from info_url, eg. public key
None # Info fetched from info_url, eg. public key
)
public_key: str | None = None public_key: str | None = None
signature_alg: str = "RS256" signature_alg: str = "RS256"
resource_provider_scopes: list[str] = [] resource_provider_scopes: list[str] = []
@ -62,25 +60,17 @@ class OIDCProvider(BaseModel):
def get_account_url(self, request: Request, user: User) -> str | None: def get_account_url(self, request: Request, user: User) -> str | None:
if self.account_url_template: if self.account_url_template:
if not ( if not (self.url.endswith("/") or self.account_url_template.startswith("/")):
self.url.endswith("/") or self.account_url_template.startswith("/")
):
sep = "/" sep = "/"
else: else:
sep = "" sep = ""
return ( return self.url + sep + self.account_url_template.format(request=request, user=user)
self.url
+ sep
+ self.account_url_template.format(request=request, user=user)
)
else: else:
return None return None
def get_public_key(self) -> str: def get_public_key(self) -> str:
"""Return the public key formatted for decoding token""" """Return the public key formatted for decoding token"""
public_key = self.public_key or ( public_key = self.public_key or (self.info is not None and self.info["public_key"])
self.info is not None and self.info["public_key"]
)
if public_key is None: if public_key is None:
raise AttributeError(f"Cannot get public key for {self.name}") raise AttributeError(f"Cannot get public key for {self.name}")
return f""" return f"""
@ -91,17 +81,18 @@ class OIDCProvider(BaseModel):
def decode(self, token: str, verify_signature: bool = True) -> dict[str, Any]: def decode(self, token: str, verify_signature: bool = True) -> dict[str, Any]:
"""Decode the token with signature check""" """Decode the token with signature check"""
decoded = decode( if settings.debug_token:
token, decoded = decode(
self.get_public_key(), token,
algorithms=[self.signature_alg], self.get_public_key(),
audience=["account", "oidc-test", "oidc-test-web"], algorithms=[self.signature_alg],
options={ audience=["account", "oidc-test", "oidc-test-web"],
"verify_signature": False, options={
"verify_aud": False, "verify_signature": False,
}, # not settings.insecure.skip_verify_signature}, "verify_aud": False,
) }, # not settings.insecure.skip_verify_signature},
logger.debug(str(decoded)) )
logger.debug(str(decoded))
return decode( return decode(
token, token,
self.get_public_key(), self.get_public_key(),
@ -143,6 +134,7 @@ class Settings(BaseSettings):
log: bool = False log: bool = False
insecure: Insecure = Insecure() insecure: Insecure = Insecure()
cors_origins: list[str] = [] cors_origins: list[str] = []
debug_token: bool = False
@classmethod @classmethod
def settings_customise_sources( def settings_customise_sources(
@ -161,9 +153,7 @@ class Settings(BaseSettings):
settings_cls, settings_cls,
Path( Path(
Path( Path(
environ.get( environ.get("OIDC_TEST_SETTINGS_FILE", Path.cwd() / "settings.yaml"),
"OIDC_TEST_SETTINGS_FILE", Path.cwd() / "settings.yaml"
),
) )
), ),
), ),

View file

@ -171,11 +171,13 @@ hr {
gap: 0.5em; gap: 0.5em;
flex-flow: wrap; flex-flow: wrap;
} }
.content .links-to-check a { .content .links-to-check button {
color: black; color: black;
padding: 5px 10px; padding: 5px 10px;
text-decoration: none; text-decoration: none;
border-radius: 8px; border-radius: 8px;
border: none;
cursor: pointer;
} }
.token { .token {
@ -183,12 +185,6 @@ hr {
font-family: monospace; font-family: monospace;
} }
.actions {
display: flex;
justify-content: center;
gap: 0.5em;
}
.resourceResult { .resourceResult {
padding: 0.5em; padding: 0.5em;
gap: 0.5em; gap: 0.5em;

View file

@ -1,6 +1,7 @@
async function checkHref(elem, token, authProvider) { async function checkHref(elem, token, authProvider) {
const msg = document.getElementById("msg") const msg = document.getElementById("msg")
const resp = await fetch(elem.href, { const url = `resource/${elem.getAttribute("resource-id")}`
const resp = await fetch(url, {
headers: new Headers({ headers: new Headers({
"Content-type": "application/json", "Content-type": "application/json",
"Authorization": `Bearer ${token}`, "Authorization": `Bearer ${token}`,

View file

@ -61,33 +61,30 @@
</div> </div>
{% endif %} {% endif %}
<hr> <hr>
{% if user %}
<p class="center">
Fetch resources from the resource server with your authentication token:
</p>
<div class="actions">
<button onclick="get_resource('time', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Time</button>
<button onclick="get_resource('bs', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">BS</button>
</div>
<div class="resourceResult">
<div id="resource" class="resource"></div>
<div id="msg" class="msg error"></div>
</div>
<hr>
{% endif %}
<div class="content"> <div class="content">
<p> <p class="center">
These links should get different response codes depending on the authorization: Resources validated by scope:
</p> </p>
<div class="links-to-check"> <div class="links-to-check">
<a href="resource/public">Public</a> <button resource-id="time" onclick="get_resource('time', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Time</button>
<a href="resource/protected">Auth protected content</a> <button resource-id="bs" onclick="get_resource('bs', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">BS</button>
<a href="resource/protected-by-foorole">Auth + foorole protected content</a> </div>
<a href="resource/protected-by-foorole-or-barrole">Auth + foorole or barrole protected content</a> <p>
<a href="resource/protected-by-barrole">Auth + barrole protected content</a> Resources validated by role:
<a href="resource/protected-by-foorole-and-barrole">Auth + foorole and barrole protected content</a> </p>
<a href="resource/fast_api_depends" class="hidden">Using FastAPI Depends</a> <div class="links-to-check">
<!--<a href="resource/introspect">Introspect token (401 expected)</a>--> <button resource-id="public" onclick="get_resource('public', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Public</button>
<button resource-id="protected" onclick="get_resource('protected', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Auth protected content</button>
<button resource-id="protected-by-foorole" onclick="get_resource('protected-by-foorole', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Auth + foorole protected content</button>
<button resource-id="protected-by-foorole-or-barrole" onclick="get_resource('protected-by-foorole-or-barrole', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Auth + foorole or barrole protected content</button>
<button resource-id="protected-by-barrole" onclick="get_resource('protected-by-barrole', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Auth + barrole protected content</button>
<button resource-id="protected-by-foorole-and-barrole" onclick="get_resource('protected-by-foorole-and-barrole', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Auth + foorole and barrole protected content</button>
<button resource-id="fast_api_depends" class="hidden" onclick="get_resource('fast_api_depends', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Using FastAPI Depends</button>
<!--<button resource-id="introspect" onclick="get_resource('introspect', '{{ user.access_token }}', '{{ oidc_provider_settings.id }}')">Introspect token (401 expected)</button>-->
</div>
<div class="resourceResult">
<div id="resource" class="resource"></div>
<div id="msg" class="msg error"></div>
</div> </div>
{% if resources %} {% if resources %}
<p> <p>