Use experimental pydantic, sqlmodel 2 and sqlalchemy 2

JWT based user auth
pydantic_settings conf
This commit is contained in:
phil 2023-11-17 11:35:09 +05:30
parent 3355b9d716
commit 90091e8a25
14 changed files with 840 additions and 237 deletions

View file

@ -1,7 +1,12 @@
import logging
from datetime import timedelta
from time import time
from uuid import uuid1
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi import Depends, FastAPI, HTTPException, status, Request
from fastapi.security import OAuth2PasswordRequestForm
from starlette.middleware.sessions import SessionMiddleware
from sqlmodel import select
from sqlmodel.ext.asyncio.session import AsyncSession
@ -17,22 +22,38 @@ from .models.category import (
CategoryGroup, CategoryModelType,
Category, CategoryRead
)
from .models.bootstrap import BootstrapData
from .database import get_db_session, pandas_query
from .security import (
User, Token,
authenticate_user, get_current_active_user, create_access_token,
User as UserAuth,
Token,
authenticate_user, get_current_user, create_access_token,
)
from .config import conf
api = FastAPI()
logger = logging.getLogger(__name__)
api = FastAPI()
api.add_middleware(SessionMiddleware, secret_key=conf.crypto.secret)
db_session = Annotated[AsyncSession, Depends(get_db_session)]
@api.get("/nothing")
async def get_nothing() -> str:
return ''
@api.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
@api.get('/bootstrap')
async def bootstrap(
user: Annotated[UserRead, Depends(get_current_user)]) -> BootstrapData:
return BootstrapData(user=user)
@api.post("/token")
async def login_for_access_token(
db_session: db_session,
form_data: OAuth2PasswordRequestForm = Depends()
) -> Token:
user = await authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
@ -40,16 +61,14 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(
minutes=conf.security['access_token_expire_minutes'])
access_token = create_access_token(
data={"sub": user.username},
expires_delta=access_token_expires)
expires_delta=timedelta(seconds=conf.crypto.expire))
return {"access_token": access_token, "token_type": "bearer"}
@api.get("/users")
async def get_users(
*, db_session: AsyncSession = Depends(get_db_session)
db_session: db_session,
) -> list[UserRead]:
query = select(User).options(selectinload(User.roles))
data = await db_session.exec(query)
@ -57,7 +76,7 @@ async def get_users(
@api.get("/roles")
async def get_roles(
*, db_session: AsyncSession = Depends(get_db_session)
db_session: db_session,
) -> list[RoleRead]:
query = select(Role).options(selectinload(Role.users))
data = await db_session.exec(query)
@ -65,7 +84,7 @@ async def get_roles(
@api.get("/categories")
async def get_categories(
*, db_session: AsyncSession = Depends(get_db_session)
db_session: db_session,
) -> list[CategoryRead]:
query = select(Category)
data = await db_session.exec(query)
@ -74,7 +93,7 @@ async def get_categories(
@api.get("/categories_p")
async def get_categories_p(
*, db_session: AsyncSession = Depends(get_db_session)
db_session: db_session,
) -> list[CategoryRead]:
query = select(Category)
df = await db_session.run_sync(pandas_query, query)