Fix resource server error with scope
Some checks failed
/ build (push) Failing after 14s
/ test (push) Successful in 5s

This commit is contained in:
phil 2025-02-01 18:51:17 +01:00
parent e9bc6c671a
commit 8b8bbcd7a0
4 changed files with 20 additions and 15 deletions

View file

@ -221,6 +221,7 @@ async def get_user_from_token(
)
try:
user = await db.get_user(user_id)
user.access_token = payload
except UserNotInDB:
logger.info(
f"User {user_id} not found in DB, creating it (real apps can behave differently"
@ -230,5 +231,6 @@ async def get_user_from_token(
user_info=payload,
oidc_provider=getattr(authlib_oauth, auth_provider_id),
user_info_from_endpoint={},
access_token=payload,
)
return user

View file

@ -26,8 +26,10 @@ class Database:
user_info: dict,
oidc_provider: StarletteOAuth2App,
user_info_from_endpoint: dict,
access_token: dict,
) -> User:
user = User.from_auth(userinfo=user_info, oidc_provider=oidc_provider)
user.access_token = access_token
try:
raw_roles = user_info_from_endpoint["resource_access"][
oidc_provider.client_id

View file

@ -32,6 +32,7 @@ class User(UserBase):
also the key for the database 'table'""",
)
userinfo: dict = {}
access_token: dict = {}
oidc_provider: StarletteOAuth2App | None = None
@classmethod
@ -49,3 +50,9 @@ class User(UserBase):
@cached_property
def roles_as_set(self) -> set[str]:
return set([role.name for role in self.roles])
def has_scope(self, scope: str) -> bool:
"""Check if the scope is present in user info or access token"""
info_scopes = self.userinfo.get("scope", "").split(" ")
access_token_scopes = self.access_token.get("scope", "").split(" ")
return scope in set(info_scopes + access_token_scopes)

View file

@ -22,19 +22,15 @@ async def get_resource(resource_id: str, user: User) -> dict:
# but this has to be refined for production
required_scope = f"get:{resource_id}"
# Check if the required scope is in the scopes allowed in userinfo
if "required_scope" in user.userinfo:
user_scopes = user.userinfo["required_scope"].split(" ")
if required_scope in user_scopes:
await process(user, required_scope, resp)
else:
## For the showcase, giving a explanation.
## Alternatively, raise HTTP_401_UNAUTHORIZED
resp["sorry"] = (
f"No scope {required_scope} in the access token "
+ "but it is required for accessing this resource."
)
if user.has_scope(required_scope):
await process(user, resource_id, resp)
else:
resp["sorry"] = "There is no scope in id token"
## For the showcase, giving a explanation.
## Alternatively, raise HTTP_401_UNAUTHORIZED
resp["sorry"] = (
f"No scope {required_scope} in the access token "
+ "but it is required for accessing this resource."
)
return resp
@ -51,9 +47,7 @@ async def process(user, resource_id, resp):
bs = await client.get("https://corporatebs-generator.sameerkumar.website/")
resp["bs"] = bs.json().get("phrase", "Sorry, i am out of BS today.")
else:
resp["sorry"] = (
f"I don't known how to give '{resource_id}' but i know corporate bs."
)
resp["sorry"] = f"I don't known how to give '{resource_id}'."
# assert user.oidc_provider is not None