- These links should get different response codes depending on the authorization: -
-- Resources for this provider: -
+This application provides all these resources, eventually protected with scope or roles:
diff --git a/.forgejo/workflows/build.yaml b/.forgejo/workflows/build.yaml
index e02bf47..379aaa8 100644
--- a/.forgejo/workflows/build.yaml
+++ b/.forgejo/workflows/build.yaml
@@ -19,7 +19,7 @@ jobs:
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v4
with:
- version: "0.5.16"
+ version: "0.6.9"
- name: Install
run: uv sync
@@ -27,34 +27,26 @@ jobs:
- name: Run tests (API call)
run: .venv/bin/pytest -s tests/basic.py
- - name: Get version with git describe
- id: version
- run: |
- echo "version=$(git describe)" >> $GITHUB_OUTPUT
- echo "$VERSION"
+ - name: Get version
+ run: echo "VERSION=$(.venv/bin/dunamai from any --style semver)" >> $GITHUB_ENV
- - name: Check if the container should be built
- id: builder
- env:
- RUN: ${{ toJSON(inputs.build || !contains(steps.version.outputs.version, '-')) }}
- run: |
- echo "run=$RUN" >> $GITHUB_OUTPUT
- echo "Run build: $RUN"
+ - name: Version
+ run: echo $VERSION
- - name: Set the version in pyproject.toml (workaround for uv not supporting dynamic version)
- if: fromJSON(steps.builder.outputs.run)
- env:
- VERSION: ${{ steps.version.outputs.version }}
- run: sed "s/0.0.0/$VERSION/" -i pyproject.toml
+ - name: Get distance from tag
+ run: echo "DISTANCE=$(.venv/bin/dunamai from any --format '{distance}')" >> $GITHUB_ENV
+
+ - name: Distance
+ run: echo $DISTANCE
- name: Workaround for bug of podman-login
- if: fromJSON(steps.builder.outputs.run)
+ if: env.DISTANCE == '0'
run: |
mkdir -p $HOME/.docker
echo "{ \"auths\": {} }" > $HOME/.docker/config.json
- name: Log in to the container registry (with another workaround)
- if: fromJSON(steps.builder.outputs.run)
+ if: env.DISTANCE == '0'
uses: actions/podman-login@v1
with:
registry: ${{ vars.REGISTRY }}
@@ -63,30 +55,31 @@ jobs:
auth_file_path: /tmp/auth.json
- name: Build the container image
- if: fromJSON(steps.builder.outputs.run)
+ if: env.DISTANCE == '0'
uses: actions/buildah-build@v1
with:
image: oidc-fastapi-test
oci: true
labels: oidc-fastapi-test
- tags: latest ${{ steps.version.outputs.version }}
+ tags: "latest ${{ env.VERSION }}"
containerfiles: |
./Containerfile
- name: Push the image to the registry
- if: fromJSON(steps.builder.outputs.run)
+ if: env.DISTANCE == '0'
uses: actions/push-to-registry@v2
with:
registry: "docker://${{ vars.REGISTRY }}/${{ vars.ORGANISATION }}"
image: oidc-fastapi-test
- tags: latest ${{ steps.version.outputs.version }}
+ tags: "latest ${{ env.VERSION }}"
- name: Build wheel
- if: fromJSON(steps.builder.outputs.run)
+ if: env.DISTANCE == '0'
run: uv build --wheel
- name: Publish Python package (home)
- if: fromJSON(steps.builder.outputs.run)
+ if: env.DISTANCE == '0'
env:
LOCAL_PYPI_TOKEN: ${{ secrets.LOCAL_PYPI_TOKEN }}
run: uv publish --publish-url https://code.philo.ydns.eu/api/packages/philorg/pypi --token $LOCAL_PYPI_TOKEN
+ continue-on-error: true
diff --git a/.forgejo/workflows/test.yaml b/.forgejo/workflows/test.yaml
index a56a9ce..f4d994e 100644
--- a/.forgejo/workflows/test.yaml
+++ b/.forgejo/workflows/test.yaml
@@ -19,7 +19,7 @@ jobs:
- name: Install the latest version of uv
uses: astral-sh/setup-uv@v4
with:
- version: "0.5.16"
+ version: "0.6.3"
- name: Install
run: uv sync
diff --git a/Containerfile b/Containerfile
index aef57f8..0ec45d1 100644
--- a/Containerfile
+++ b/Containerfile
@@ -1,4 +1,4 @@
-FROM docker.io/library/python:alpine
+FROM docker.io/library/python:latest
COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /usr/local/bin/
@@ -9,6 +9,9 @@ WORKDIR /app
RUN uv pip install --system .
+# Add demo plugin
+RUN PIP_EXTRA_INDEX_URL=https://pypi.org/simple/ uv pip install --system --index-url https://code.philo.ydns.eu/api/packages/philorg/pypi/simple/ oidc-fastapi-test-resource-provider-demo
+
# Possible to run with:
#CMD ["oidc-test", "--port", "80"]
#CMD ["fastapi", "run", "src/oidc_test/main.py", "--port", "8873", "--root-path", "/oidc-test"]
diff --git a/README.md b/README.md
index 9e00474..68f335d 100644
--- a/README.md
+++ b/README.md
@@ -52,31 +52,59 @@ given by the OIDC providers.
For example:
```yaml
-oidc:
- secret_key: "ASecretNoOneKnows"
- show_session_details: yes
+secret_key: AVeryWellKeptSecret
+debug_token: no
+show_token: yes
+log: yes
+
+auth:
providers:
- id: auth0
name: Okta / Auth0
- url: "https://Not protected
")
-
-
-@app.get("/protected")
-async def get_protected(
- user: Annotated[User, Depends(get_current_user)]
-) -> HTMLResponse:
- assert user is not None # Just to keep QA checks happy
- return HTMLResponse("Only authenticated users can see this
")
-
-
-@app.get("/protected-by-foorole")
-@hasrole("foorole")
-async def get_protected_by_foorole(request: Request) -> HTMLResponse:
- assert request is not None # Just to keep QA checks happy
- return HTMLResponse("Only users with foorole can see this
")
-
-
-@app.get("/protected-by-barrole")
-@hasrole("barrole")
-async def get_protected_by_barrole(request: Request) -> HTMLResponse:
- assert request is not None # Just to keep QA checks happy
- return HTMLResponse("Protected by barrole
")
-
-
-@app.get("/protected-by-foorole-and-barrole")
-@hasrole("barrole")
-@hasrole("foorole")
-async def get_protected_by_foorole_and_barrole(request: Request) -> HTMLResponse:
- assert request is not None # Just to keep QA checks happy
- return HTMLResponse("Only users with foorole and barrole can see this
")
-
-
-@app.get("/protected-by-foorole-or-barrole")
-@hasrole(["foorole", "barrole"])
-async def get_protected_by_foorole_or_barrole(request: Request) -> HTMLResponse:
- assert request is not None # Just to keep QA checks happy
- return HTMLResponse("Only users with foorole or barrole can see this
")
-
-
-@app.get("/introspect")
-async def get_introspect(
+@app.get("/refresh")
+async def refresh(
request: Request,
- oidc_provider: Annotated[StarletteOAuth2App, Depends(get_oidc_provider)],
- token: Annotated[OAuth2Token, Depends(get_token)],
-) -> JSONResponse:
- assert request is not None # Just to keep QA checks happy
- if (url := oidc_provider.server_metadata.get("introspection_endpoint")) is None:
+ provider: Annotated[Provider, Depends(get_auth_provider)],
+ token: Annotated[OAuth2Token, Depends(get_token_from_session)],
+) -> RedirectResponse:
+ """Manually refresh token"""
+ new_token = await provider.authlib_client.fetch_access_token(
+ refresh_token=token["refresh_token"],
+ grant_type="refresh_token",
+ )
+ try:
+ await update_token(provider.id, new_token)
+ except PyJWTError as err:
+ logger.info(f"Cannot refresh token: {err.__class__.__name__}")
raise HTTPException(
- status_code=status.HTTP_401_UNAUTHORIZED,
- detail="No intrispection endpoint found for the OIDC provider",
+ status.HTTP_510_NOT_EXTENDED, f"Token refresh error: {err.__class__.__name__}"
)
- if (
- response := await oidc_provider.post(
- url,
- token=token,
- data={"token": token["access_token"]},
- )
- ).is_success:
- return response.json()
- else:
- raise HTTPException(status_code=response.status_code, detail=response.text)
+ return RedirectResponse(url=request.url_for("home"))
# Snippet for running standalone
@@ -397,9 +341,7 @@ def main():
parser.add_argument(
"-p", "--port", type=int, default=80, help="Port to listen to (default: 80)"
)
- parser.add_argument(
- "-v", "--version", action="store_true", help="Print version and exit"
- )
+ parser.add_argument("-v", "--version", action="store_true", help="Print version and exit")
args = parser.parse_args()
if args.version:
diff --git a/src/oidc_test/models.py b/src/oidc_test/models.py
index fc0dba7..7b6fd0e 100644
--- a/src/oidc_test/models.py
+++ b/src/oidc_test/models.py
@@ -1,6 +1,6 @@
import logging
from functools import cached_property
-from typing import Self, Any
+from typing import Any
from pydantic import (
computed_field,
@@ -8,7 +8,6 @@ from pydantic import (
EmailStr,
ConfigDict,
)
-from authlib.integrations.starlette_client.apps import StarletteOAuth2App
from sqlmodel import SQLModel, Field
logger = logging.getLogger("oidc-test")
@@ -37,7 +36,7 @@ class User(UserBase):
userinfo: dict = {}
access_token: str | None = None
access_token_decoded: dict[str, Any] | None = None
- oidc_provider: StarletteOAuth2App | None = None
+ auth_provider_id: str
@computed_field
@cached_property
@@ -50,16 +49,18 @@ class User(UserBase):
try:
access_token_scopes = self.decode_access_token().get("scope", "").split(" ")
except Exception as err:
- logger.info(f"Access token cannot be decoded: {err}")
+ logger.debug(f"Cannot find scope because the access token cannot be decoded: {err}")
access_token_scopes = []
return scope in set(info_scopes + access_token_scopes)
- def decode_access_token(self):
- assert self.access_token is not None
- assert self.oidc_provider is not None
- assert self.oidc_provider.name is not None
- from .settings import oidc_providers_settings
+ def decode_access_token(self, verify_signature: bool = True):
+ assert self.access_token is not None, "no access_token"
+ assert self.auth_provider_id is not None, "no auth_provider_id"
+ from .auth_providers import providers
- return oidc_providers_settings[self.oidc_provider.name].decode(
- self.access_token
+ return providers[self.auth_provider_id].decode(
+ self.access_token, verify_signature=verify_signature
)
+
+ def get_scope(self, verify_signature: bool = True):
+ return self.decode_access_token(verify_signature=verify_signature)["scope"]
diff --git a/src/oidc_test/registry.py b/src/oidc_test/registry.py
new file mode 100644
index 0000000..3b91ad4
--- /dev/null
+++ b/src/oidc_test/registry.py
@@ -0,0 +1,47 @@
+from importlib.metadata import entry_points
+import logging
+
+from pydantic import BaseModel, ConfigDict
+
+from oidc_test.models import User
+
+logger = logging.getLogger("registry")
+
+
+class ProcessResult(BaseModel):
+ model_config = ConfigDict(
+ extra="allow",
+ )
+
+
+class ProcessError(Exception):
+ pass
+
+
+class Resource(BaseModel):
+ name: str
+ scope_required: str | None = None
+ role_required: str | None = None
+ is_public: bool = False
+ default_resource_id: str | None = None
+
+ def __init__(self, name: str):
+ super().__init__()
+ self.__id__ = name
+
+ async def process(self, user: User | None, resource_id: str | None = None) -> ProcessResult:
+ logger.warning(f"{self.__id__} should define a process method")
+ return ProcessResult()
+
+
+class ResourceRegistry(BaseModel):
+ resources: dict[str, Resource] = {}
+
+ def make_registry(self):
+ for ep in entry_points().select(group="oidc_test.resource_provider"):
+ ResourceClass = ep.load()
+ if issubclass(ResourceClass, Resource):
+ self.resources[ep.name] = ResourceClass(ep.name)
+
+
+registry = ResourceRegistry()
diff --git a/src/oidc_test/resource_server.py b/src/oidc_test/resource_server.py
index 0d90533..ddc5762 100644
--- a/src/oidc_test/resource_server.py
+++ b/src/oidc_test/resource_server.py
@@ -1,68 +1,310 @@
-from datetime import datetime
+from typing import Annotated, Any
import logging
+from json import JSONDecodeError
-from httpx import AsyncClient
-from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
-from fastapi import HTTPException, status
-from starlette.status import HTTP_401_UNAUTHORIZED
+from authlib.oauth2.rfc6749 import OAuth2Token
+from httpx import AsyncClient, HTTPError
+from jwt.exceptions import DecodeError, ExpiredSignatureError, InvalidTokenError
+from fastapi import FastAPI, HTTPException, Depends, Request, status
+from fastapi.middleware.cors import CORSMiddleware
-from .models import User
+# from starlette.middleware.sessions import SessionMiddleware
+# from authlib.integrations.starlette_client.apps import StarletteOAuth2App
+# from authlib.oauth2.rfc6749 import OAuth2Token
+
+from oidc_test.auth.provider import Provider
+from oidc_test.auth.utils import (
+ get_user_from_token_or_none,
+ oauth2_scheme_optional,
+)
+from oidc_test.auth_providers import providers
+from oidc_test.settings import ResourceProvider, settings
+from oidc_test.models import User
+from oidc_test.registry import ProcessError, ProcessResult, registry
logger = logging.getLogger("oidc-test")
+resource_server = FastAPI()
-async def get_resource(resource_id: str, user: User) -> dict:
- """
- Resource processing: build an informative rely as a simple showcase
- """
- pname = getattr(user.oidc_provider, "name", "?")
- resp = {
- "hello": f"Hi {user.name} from an OAuth resource provider",
- "comment": f"I received a request for '{resource_id}' "
- + f"with an access token signed by {pname}",
- }
- # For the demo, resource resource_id matches a scope get:resource_id,
- # but this has to be refined for production
- required_scope = f"get:{resource_id}"
- # Check if the required scope is in the scopes allowed in userinfo
- try:
- if user.has_scope(required_scope):
- await process(user, resource_id, resp)
- else:
- ## For the showcase, giving a explanation.
- ## Alternatively, raise HTTP_401_UNAUTHORIZED
- raise HTTPException(
- status.HTTP_401_UNAUTHORIZED,
- f"No scope {required_scope} in the access token "
- + "but it is required for accessing this resource.",
+
+resource_server.add_middleware(
+ CORSMiddleware,
+ allow_origins=settings.cors_origins,
+ allow_credentials=True,
+ allow_methods=["*"],
+ allow_headers=["*"],
+)
+
+# SessionMiddleware is required by authlib
+# resource_server.add_middleware(
+# SessionMiddleware,
+# secret_key=settings.secret_key,
+# )
+
+# Route for OAuth resource server
+
+
+# Routes for RBAC based tests
+
+
+@resource_server.get("/")
+async def resources() -> dict[str, dict[str, Any]]:
+ return {"internal": {}, "plugins": registry.resources}
+
+
+@resource_server.get("/{resource_name}")
+@resource_server.get("/{resource_name}/{resource_id}")
+async def get_resource(
+ resource_name: str,
+ user: Annotated[User | None, Depends(get_user_from_token_or_none)],
+ token: Annotated[OAuth2Token | None, Depends(oauth2_scheme_optional)],
+ resource_id: str | None = None,
+):
+ """Generic path for testing a resource provided by a provider.
+ There's no field validation (response type of ProcessResult) on purpose,
+ leaving the responsibility of the response validation to resource providers"""
+ # Get the resource if it's defined in user auth provider's resources (external)
+ if user is not None:
+ provider = providers[user.auth_provider_id]
+ if ":" in resource_name:
+ # Third-party resource provider: send the request with the request token
+ resource_provider_id, resource_name = resource_name.split(":", 1)
+ provider = providers[user.auth_provider_id]
+ resource_provider: ResourceProvider = provider.get_resource_provider(
+ resource_provider_id
)
- except ExpiredSignatureError:
- raise HTTPException(
- status.HTTP_401_UNAUTHORIZED, "The token's signature has expired"
- )
- except InvalidTokenError:
- raise HTTPException(status.HTTP_401_UNAUTHORIZED, "The token is invalid")
- return resp
-
-
-async def process(user, resource_id, resp):
- """
- Too simple to be serious.
- It's a good fit for a plugin architecture for production
- """
- assert user is not None
- if resource_id == "time":
- resp["time"] = datetime.now().strftime("%c")
- elif resource_id == "bs":
- async with AsyncClient() as client:
- bs = await client.get("https://corporatebs-generator.sameerkumar.website/")
- resp["bs"] = bs.json().get("phrase", "Sorry, i am out of BS today.")
+ resource_url = resource_provider.get_resource_url(resource_name)
+ async with AsyncClient(verify=resource_provider.verify_ssl) as client:
+ try:
+ logger.debug(f"GET request to {resource_url}")
+ resp = await client.get(
+ resource_url,
+ headers={
+ "Content-type": "application/json",
+ "Authorization": f"Bearer {token}",
+ "auth_provider": user.auth_provider_id,
+ },
+ )
+ except HTTPError as err:
+ raise HTTPException(
+ status.HTTP_503_SERVICE_UNAVAILABLE, err.__class__.__name__
+ )
+ except Exception as err:
+ raise HTTPException(
+ status.HTTP_500_INTERNAL_SERVER_ERROR, err.__class__.__name__
+ )
+ else:
+ if resp.is_success:
+ return resp.json()
+ else:
+ reason_str: str
+ try:
+ reason_str = resp.json().get("detail", str(resp))
+ except Exception:
+ reason_str = str(resp.text)
+ raise HTTPException(resp.status_code, reason_str)
+ # Third party resource (provided through the auth provider)
+ # The token is just passed on
+ # XXX: is this branch valid anymore?
+ if resource_name in [r.resource_name for r in provider.resources]:
+ return await get_auth_provider_resource(
+ provider=provider,
+ resource_name=resource_name,
+ token=token,
+ user=user,
+ )
+ # Internal resource (provided here)
+ if resource_name in registry.resources:
+ resource = registry.resources[resource_name]
+ reason: dict[str, str] = {}
+ if not resource.is_public:
+ if user is None:
+ raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Resource is not public")
+ else:
+ if resource.scope_required is not None and not user.has_scope(
+ resource.scope_required
+ ):
+ reason["scope"] = (
+ f"No scope {resource.scope_required} in the access token "
+ "but it is required for accessing this resource"
+ )
+ if (
+ resource.role_required is not None
+ and resource.role_required not in user.roles_as_set
+ ):
+ reason["role"] = (
+ f"You don't have the role {resource.role_required} "
+ "but it is required for accessing this resource"
+ )
+ if len(reason) == 0:
+ try:
+ resp = await resource.process(user=user, resource_id=resource_id)
+ return resp
+ except ProcessError as err:
+ raise HTTPException(
+ status.HTTP_401_UNAUTHORIZED, f"Cannot process resource: {err}"
+ )
+ else:
+ raise HTTPException(status.HTTP_401_UNAUTHORIZED, ", ".join(reason.values()))
else:
- raise HTTPException(
- status.HTTP_401_UNAUTHORIZED, f"I don't known how to give '{resource_id}'."
- )
+ raise HTTPException(status.HTTP_401_UNAUTHORIZED, "Unknown resource")
+ # return await get_resource_(resource_name, user, **request.query_params)
+async def get_auth_provider_resource(
+ provider: Provider, resource_name: str, token: OAuth2Token | None, user: User
+) -> ProcessResult:
+ if token is None:
+ raise HTTPException(status.HTTP_401_UNAUTHORIZED, "No auth token")
+ access_token = token
+ async with AsyncClient() as client:
+ resp = await client.get(
+ url=provider.get_resource_url(resource_name),
+ headers={
+ "Content-type": "application/json",
+ "Authorization": f"Bearer {access_token}",
+ },
+ )
+ if resp.is_error:
+ raise HTTPException(resp.status_code, f"Cannot fetch resource: {resp.reason_phrase}")
+ # Only a demo, real application would really process the response
+ resp_length = len(resp.text)
+ if resp_length > 1024:
+ return ProcessResult(
+ msg=f"The resource is too long ({resp_length} bytes) to show in this demo, here is just the begining in raw format",
+ start=resp.text[:100] + "...",
+ )
+ else:
+ try:
+ resp_json = resp.json()
+ except JSONDecodeError:
+ return ProcessResult(msg="The resource is not formatted in JSON", text=resp.text)
+ if isinstance(resp_json, dict):
+ return ProcessResult(**resp.json())
+ elif isinstance(resp_json, list):
+ return ProcessResult(**{str(i): line for i, line in enumerate(resp_json)})
+
+
+# @resource_server.get("/public")
+# async def public() -> dict:
+# return {"msg": "Not protected"}
+#
+#
+# @resource_server.get("/protected")
+# async def get_protected(user: Annotated[User, Depends(get_user_from_token)]):
+# assert user is not None # Just to keep QA checks happy
+# return {"msg": "Only authenticated users can see this"}
+#
+#
+# @resource_server.get("/protected-by-foorole")
+# async def get_protected_by_foorole(
+# user: Annotated[User, Depends(UserWithRole("foorole"))],
+# ):
+# assert user is not None
+# return {"msg": "Only users with foorole can see this"}
+#
+#
+# @resource_server.get("/protected-by-barrole")
+# async def get_protected_by_barrole(
+# user: Annotated[User, Depends(UserWithRole("barrole"))],
+# ):
+# assert user is not None
+# return {"msg": "Protected by barrole"}
+#
+#
+# @resource_server.get("/protected-by-foorole-and-barrole")
+# async def get_protected_by_foorole_and_barrole(
+# user: Annotated[User, Depends(UserWithRole("foorole")), Depends(UserWithRole("barrole"))],
+# ):
+# assert user is not None # Just to keep QA checks happy
+# return {"msg": "Only users with foorole and barrole can see this"}
+#
+#
+# @resource_server.get("/protected-by-foorole-or-barrole")
+# async def get_protected_by_foorole_or_barrole(
+# user: Annotated[User, Depends(UserWithRole(["foorole", "barrole"]))],
+# ):
+# assert user is not None # Just to keep QA checks happy
+# return {"msg": "Only users with foorole or barrole can see this"}
+
+# async def get_resource_(resource_id: str, user: User, **kwargs) -> dict:
+# """
+# Resource processing: build an informative rely as a simple showcase
+# """
+# if resource_id == "petition":
+# return await sign(user, kwargs["petition_id"])
+# provider = providers[user.auth_provider_id]
+# try:
+# pname = provider.name
+# except KeyError:
+# pname = "?"
+# resp = {
+# "hello": f"Hi {user.name} from an OAuth resource provider",
+# "comment": f"I received a request for '{resource_id}' "
+# + f"with an access token signed by {pname}",
+# }
+# # For the demo, resource resource_id matches a scope get:resource_id,
+# # but this has to be refined for production
+# required_scope = f"get:{resource_id}"
+# # Check if the required scope is in the scopes allowed in userinfo
+# try:
+# if user.has_scope(required_scope):
+# await process(user, resource_id, resp)
+# else:
+# ## For the showcase, giving a explanation.
+# ## Alternatively, raise HTTP_401_UNAUTHORIZED
+# raise HTTPException(
+# status.HTTP_401_UNAUTHORIZED,
+# f"No scope {required_scope} in the access token "
+# + "but it is required for accessing this resource",
+# )
+# except ExpiredSignatureError:
+# raise HTTPException(status.HTTP_401_UNAUTHORIZED, "The token's signature has expired")
+# except InvalidTokenError:
+# raise HTTPException(status.HTTP_401_UNAUTHORIZED, "The token is invalid")
+# return resp
+
+
+# async def process(user, resource_id, resp):
+# """
+# Too simple to be serious.
+# It's a good fit for a plugin architecture for production
+# """
+# if resource_id == "time":
+# resp["time"] = datetime.now().strftime("%c")
+# elif resource_id == "bs":
+# async with AsyncClient() as client:
+# bs = await client.get("https://corporatebs-generator.sameerkumar.website/")
+# resp["bs"] = bs.json().get("phrase", "Sorry, i am out of BS today.")
+# else:
+# raise HTTPException(
+# status.HTTP_401_UNAUTHORIZED, f"I don't known how to give '{resource_id}'."
+# )
+
+
+# @resource_server.get("/introspect")
+# async def get_introspect(
+# request: Request,
+# oidc_provider: Annotated[StarletteOAuth2App, Depends(get_oidc_provider)],
+# token: Annotated[OAuth2Token, Depends(get_token)],
+# ) -> JSONResponse:
+# assert request is not None # Just to keep QA checks happy
+# if (url := oidc_provider.server_metadata.get("introspection_endpoint")) is None:
+# raise HTTPException(
+# status_code=status.HTTP_401_UNAUTHORIZED,
+# detail="No introspection endpoint found for the OIDC provider",
+# )
+# if (
+# response := await oidc_provider.post(
+# url,
+# token=token,
+# data={"token": token["access_token"]},
+# )
+# ).is_success:
+# return response.json()
+# else:
+# raise HTTPException(status_code=response.status_code, detail=response.text)
+
# assert user.oidc_provider is not None
### Get some info (TODO: refactor)
# if (auth_provider_id := user.oidc_provider.name) is None:
diff --git a/src/oidc_test/settings.py b/src/oidc_test/settings.py
index 2544bd7..ad80c06 100644
--- a/src/oidc_test/settings.py
+++ b/src/oidc_test/settings.py
@@ -1,12 +1,10 @@
from os import environ
import string
import random
-from typing import Type, Tuple, Any
+from typing import Type, Tuple
from pathlib import Path
-import logging
-from jwt import decode
-from pydantic import BaseModel, computed_field, AnyUrl
+from pydantic import AnyHttpUrl, BaseModel, computed_field, AnyUrl
from pydantic_settings import (
BaseSettings,
SettingsConfigDict,
@@ -15,20 +13,33 @@ from pydantic_settings import (
)
from starlette.requests import Request
-from .models import User
-
-logger = logging.getLogger("oidc-test")
-
class Resource(BaseModel):
"""A resource with an URL that can be accessed with an OAuth2 access token"""
+ resource_name: str
+ name: str
+ url: str
+
+
+class ResourceProvider(BaseModel):
id: str
name: str
+ base_url: AnyUrl
+ resources: list[Resource] = []
+ verify_ssl: bool = True
+
+ def get_resource(self, resource_name: str) -> Resource:
+ return [
+ resource for resource in self.resources if resource.resource_name == resource_name
+ ][0]
+
+ def get_resource_url(self, resource_name: str) -> str:
+ return f"{self.base_url}{self.get_resource(resource_name).url}"
-class OIDCProvider(BaseModel):
- """OIDC provider, can also be a resource server"""
+class AuthProviderSettings(BaseModel):
+ """Auth provider, can also be a resource server"""
id: str
name: str
@@ -43,12 +54,14 @@ class OIDCProvider(BaseModel):
info_url: str | None = (
None # Used eg. for Keycloak's public key (see https://stackoverflow.com/questions/54318633/getting-keycloaks-public-key)
)
- info: dict[str, str | int] | None = (
- None # Info fetched from info_url, eg. public key
- )
public_key: str | None = None
+ public_key_url: str | None = None
signature_alg: str = "RS256"
resource_provider_scopes: list[str] = []
+ session_key: str = "sid"
+ skip_verify_signature: bool = True
+ disabled: bool = False
+ resource_providers: list[ResourceProvider] = []
@computed_field
@property
@@ -60,69 +73,20 @@ class OIDCProvider(BaseModel):
def token_url(self) -> str:
return "auth/" + self.id
- def get_account_url(self, request: Request, user: User) -> str | None:
+ def get_account_url(self, request: Request, user: dict) -> str | None:
if self.account_url_template:
- if not (
- self.url.endswith("/") or self.account_url_template.startswith("/")
- ):
+ if not (self.url.endswith("/") or self.account_url_template.startswith("/")):
sep = "/"
else:
sep = ""
- return (
- self.url
- + sep
- + self.account_url_template.format(request=request, user=user)
- )
+ return self.url + sep + self.account_url_template.format(request=request, user=user)
else:
return None
- def get_public_key(self) -> str:
- """Return the public key formatted for decoding token"""
- public_key = self.public_key or (
- self.info is not None and self.info["public_key"]
- )
- if public_key is None:
- raise AttributeError(f"Cannot get public key for {self.name}")
- return f"""
- -----BEGIN PUBLIC KEY-----
- {public_key}
- -----END PUBLIC KEY-----
- """
- def decode(self, token: str, verify_signature: bool = True) -> dict[str, Any]:
- """Decode the token with signature check"""
- decoded = decode(
- token,
- self.get_public_key(),
- algorithms=[self.signature_alg],
- audience=["account", "oidc-test", "oidc-test-web"],
- options={
- "verify_signature": False,
- "verify_aud": False,
- }, # not settings.insecure.skip_verify_signature},
- )
- logger.debug(str(decoded))
- return decode(
- token,
- self.get_public_key(),
- algorithms=[self.signature_alg],
- audience=["account", "oidc-test", "oidc-test-web"],
- options={
- "verify_signature": verify_signature,
- }, # not settings.insecure.skip_verify_signature},
- )
-
-
-class ResourceProvider(BaseModel):
- id: str
- name: str
- base_url: AnyUrl
- resources: list[Resource] = []
-
-
-class OIDCSettings(BaseModel):
+class AuthSettings(BaseModel):
show_session_details: bool = False
- providers: list[OIDCProvider] = []
+ providers: list[AuthProviderSettings] = []
swagger_provider: str = ""
@@ -137,12 +101,15 @@ class Settings(BaseSettings):
model_config = SettingsConfigDict(env_nested_delimiter="__")
- oidc: OIDCSettings = OIDCSettings()
- resource_providers: list[ResourceProvider] = []
+ auth: AuthSettings = AuthSettings()
secret_key: str = "".join(random.choice(string.ascii_letters) for _ in range(16))
log: bool = False
+ log_config_file: str = "log_conf.yaml"
insecure: Insecure = Insecure()
cors_origins: list[str] = []
+ debug_token: bool = False
+ show_token: bool = False
+ show_external_resource_providers_links: bool = False
@classmethod
def settings_customise_sources(
@@ -161,9 +128,7 @@ class Settings(BaseSettings):
settings_cls,
Path(
Path(
- environ.get(
- "OIDC_TEST_SETTINGS_FILE", Path.cwd() / "settings.yaml"
- ),
+ environ.get("OIDC_TEST_SETTINGS_FILE", Path.cwd() / "settings.yaml"),
)
),
),
@@ -172,8 +137,3 @@ class Settings(BaseSettings):
settings = Settings()
-
-
-oidc_providers_settings: dict[str, OIDCProvider] = dict(
- [(provider.id, provider) for provider in settings.oidc.providers]
-)
diff --git a/src/oidc_test/static/styles.css b/src/oidc_test/static/styles.css
index 7e1260b..1e8dc03 100644
--- a/src/oidc_test/static/styles.css
+++ b/src/oidc_test/static/styles.css
@@ -21,6 +21,12 @@ hr {
.hidden {
display: none;
}
+.version {
+ position: absolute;
+ font-size: 75%;
+ top: 0.3em;
+ right: 0.3em;
+}
.center {
text-align: center;
}
@@ -73,7 +79,6 @@ hr {
}
.debug-auth p {
border-bottom: 1px solid black;
- text-align: left;
}
.debug-auth ul {
padding: 0;
@@ -143,19 +148,27 @@ hr {
.providers .provider {
min-height: 2em;
}
-.providers .provider a.link {
+.providers .provider .link {
text-decoration: none;
max-height: 2em;
}
-.providers .provider .link div {
+.providers .provider .link {
background-color: #f7c7867d;
border-radius: 8px;
padding: 6px;
text-align: center;
color: black;
- font-weight: bold;
+ font-weight: 400;
cursor: pointer;
+ border: 0;
+ width: 100%;
}
+
+.providers .provider .link.disabled {
+ color: gray;
+ cursor: not-allowed;
+}
+
.providers .provider .hint {
font-size: 80%;
max-width: 13em;
@@ -171,11 +184,13 @@ hr {
gap: 0.5em;
flex-flow: wrap;
}
-.content .links-to-check a {
+.content .links-to-check button {
color: black;
padding: 5px 10px;
text-decoration: none;
border-radius: 8px;
+ border: none;
+ cursor: pointer;
}
.token {
@@ -183,16 +198,10 @@ hr {
font-family: monospace;
}
-.actions {
- display: flex;
- justify-content: center;
- gap: 0.5em;
-}
-
.resourceResult {
padding: 0.5em;
+ display: flex;
gap: 0.5em;
- flex-direction: column;
width: fit-content;
align-items: center;
margin: 5px auto;
diff --git a/src/oidc_test/static/utils.js b/src/oidc_test/static/utils.js
index a982267..e988dfe 100644
--- a/src/oidc_test/static/utils.js
+++ b/src/oidc_test/static/utils.js
@@ -1,28 +1,45 @@
-function checkHref(elem) {
- var xmlHttp = new XMLHttpRequest()
- xmlHttp.onreadystatechange = function () {
- if (xmlHttp.readyState == 4) {
- elem.classList.add("hasResponseStatus")
- elem.classList.add("status-" + xmlHttp.status)
- elem.title = "Response code: " + xmlHttp.status + " - " + xmlHttp.statusText
- }
+async function checkHref(elem, token, authProvider) {
+ const msg = document.getElementById("msg")
+ const resourceName = elem.getAttribute("resource-name")
+ const resourceId = elem.getAttribute("resource-id")
+ const resourceProviderId = elem.getAttribute("resource-provider-id") ? elem.getAttribute("resource-provider-id") : ""
+ const fqResourceName = resourceProviderId ? `${resourceProviderId}:${resourceName}` : resourceName
+ const url = resourceId ? `resource/${fqResourceName}/${resourceId}` : `resource/${fqResourceName}`
+ const resp = await fetch(url, {
+ method: "GET",
+ headers: new Headers({
+ "Content-type": "application/json",
+ "Authorization": `Bearer ${token}`,
+ "auth_provider": authProvider,
+ }),
+ }).catch(err => {
+ msg.innerHTML = "Cannot fetch resource: " + err.message
+ resourceElem.innerHTML = ""
+ })
+ if (resp === undefined) {
+ return
+ } else {
+ elem.classList.add("hasResponseStatus")
+ elem.classList.add("status-" + resp.status)
+ elem.title = "Response code: " + resp.status + " - " + resp.statusText
}
- xmlHttp.open("GET", elem.href, true) // true for asynchronous
- xmlHttp.send(null)
}
-function checkPerms(className) {
+function checkPerms(className, token, authProvider) {
var rootElems = document.getElementsByClassName(className)
Array.from(rootElems).forEach(elem =>
- Array.from(elem.children).forEach(elem => checkHref(elem))
+ Array.from(elem.children).forEach(elem => checkHref(elem, token, authProvider))
)
}
-async function get_resource(id, token, authProvider) {
+async function get_resource(resourceName, token, authProvider, resourceId, resourceProviderId) {
+ // BaseUrl for an external resource provider
//if (!keycloak.keycloak) { return }
const msg = document.getElementById("msg")
const resourceElem = document.getElementById('resource')
- const resp = await fetch("resource/" + id, {
+ const fqResourceName = resourceProviderId ? `${resourceProviderId}:${resourceName}` : resourceName
+ const url = resourceId ? `resource/${fqResourceName}/${resourceId}` : `resource/${fqResourceName}`
+ const resp = await fetch(url, {
method: "GET",
headers: new Headers({
"Content-type": "application/json",
@@ -45,15 +62,24 @@ async function get_resource(id, token, authProvider) {
msg.innerHTML = ""
resourceElem.innerHTML = ""
Object.entries(resource).forEach(
- ([k, v]) => {
+ ([key, value]) => {
let r = document.createElement('div')
let kElem = document.createElement('div')
- kElem.innerText = k
+ kElem.innerText = key
kElem.className = "key"
let vElem = document.createElement('div')
- vElem.innerText = v
+ if (typeof value == "object") {
+ Object.entries(value).forEach(v => {
+ const ne = document.createElement('div')
+ ne.innerHTML = `${v[0]}: ${v[1]}`
+ vElem.appendChild(ne)
+ })
+ }
+ else {
+ vElem.innerText = value
+ }
vElem.className = "value"
- if (k == "sorry") {
+ if (key == "sorry") {
vElem.classList.add("error")
}
r.appendChild(kElem)
diff --git a/src/oidc_test/templates/base.html b/src/oidc_test/templates/base.html
index 3bdb3f3..157e26f 100644
--- a/src/oidc_test/templates/base.html
+++ b/src/oidc_test/templates/base.html
@@ -4,7 +4,8 @@
-
+
+ OIDC-test - FastAPI client
{% block content %}
{% endblock %}
diff --git a/src/oidc_test/templates/home.html b/src/oidc_test/templates/home.html
index ce344cc..167616f 100644
--- a/src/oidc_test/templates/home.html
+++ b/src/oidc_test/templates/home.html
@@ -5,25 +5,28 @@
with OpenID Connect and OAuth2 with different providers.
Log in with:
-
- {{ provider.name }}
- |
- {{ provider.hint }} - | -
Log in with:
++ + | +{{ provider.hint }} + | +
- Fetch resources from the resource server with your authentication token: -
-- These links should get different response codes depending on the authorization: -
-- Resources for this provider: -
+This application provides all these resources, eventually protected with scope or roles:
User info
-{{ auth_provider.name }} is also defined as a provider for these resources:
+{{ auth_provider.name }} allows this application to request resources from third party resource providers:
+ {% for resource_provider in resource_providers %} +- This is because {{ oidc_provider.name }} does not provide "end_session_endpoint" in its metadata - (see: {{ oidc_provider._server_metadata_url }}). + This is because {{ auth_provider.name }} does not provide "end_session_endpoint" in its metadata + (see: {{ auth_provider.authlib_client._server_metadata_url }}).
You can just also go back to the application home page, but - it recommended to go to the OIDC provider's site + it recommended to go to the OIDC provider's site and log out explicitely from there.
{% endblock %} diff --git a/uv.lock b/uv.lock index 01b64de..0566bb5 100644 --- a/uv.lock +++ b/uv.lock @@ -1,4 +1,5 @@ version = 1 +revision = 1 requires-python = ">=3.13" [[package]] @@ -206,6 +207,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/68/1b/e0a87d256e40e8c888847551b20a017a6b98139178505dc7ffb96f04e954/dnspython-2.7.0-py3-none-any.whl", hash = "sha256:b4c34b7d10b51bcc3a5071e7b8dee77939f1e878477eeecc965e9835f63c6c86", size = 313632 }, ] +[[package]] +name = "dunamai" +version = "1.23.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "packaging" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/4e/a5c8c337a1d9ac0384298ade02d322741fb5998041a5ea74d1cd2a4a1d47/dunamai-1.23.0.tar.gz", hash = "sha256:a163746de7ea5acb6dacdab3a6ad621ebc612ed1e528aaa8beedb8887fccd2c4", size = 44681 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/21/4c/963169386309fec4f96fd61210ac0a0666887d0fb0a50205395674d20b71/dunamai-1.23.0-py3-none-any.whl", hash = "sha256:a0906d876e92441793c6a423e16a4802752e723e9c9a5aabdc5535df02dbe041", size = 26342 }, +] + [[package]] name = "ecdsa" version = "0.19.0" @@ -482,7 +495,6 @@ wheels = [ [[package]] name = "oidc-fastapi-test" -version = "0.0.0" source = { editable = "." } dependencies = [ { name = "authlib" }, @@ -501,6 +513,7 @@ dependencies = [ [package.dev-dependencies] dev = [ + { name = "dunamai" }, { name = "ipdb" }, { name = "pytest" }, ] @@ -523,6 +536,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ + { name = "dunamai", specifier = ">=1.23.0" }, { name = "ipdb", specifier = ">=0.13.13" }, { name = "pytest", specifier = ">=8.3.4" }, ]