gisaf-backend/src/security.py

139 lines
4.3 KiB
Python
Raw Normal View History

2023-11-06 17:04:17 +05:30
from datetime import datetime, timedelta
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel
from jose import JWTError, jwt
from sqlalchemy.future import select
from .config import conf
from .models.authentication import User as UserInDB
# openssl rand -hex 32
# import secrets
# SECRET_KEY = secrets.token_hex(32)
ALGORITHM: str = "HS256"
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
class User(BaseModel):
username: str
email: str | None = None
full_name: str | None = None
disabled: bool | None = None
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password: str):
return pwd_context.hash(password)
async def delete_user(username):
async with conf.session_maker.begin() as session:
user_in_db: UserInDB | None = await get_user(username)
if user_in_db is None:
raise SystemExit(f'User {username} does not exist in the database')
await session.delete(user_in_db)
async def enable_user(username, enable=True):
async with conf.session_maker.begin() as session:
user_in_db: UserInDB | None = await get_user(username)
if user_in_db is None:
raise SystemExit(f'User {username} does not exist in the database')
user_in_db.disabled = not enable # type: ignore
session.add(user_in_db)
await session.commit()
async def create_user(username: str, password: str, full_name: str,
email: str, **kwargs):
async with conf.session_maker.begin() as session:
user_in_db: UserInDB | None = await get_user(username)
if user_in_db is None:
user = UserInDB(
username=username,
password=get_password_hash(password),
full_name=full_name,
email=email,
disabled=False
)
session.add(user)
else:
user_in_db.full_name = full_name # type: ignore
user_in_db.email = email # type: ignore
user_in_db.password = get_password_hash(password) # type: ignore
await session.commit()
async def get_user(username: str) -> (UserInDB | None):
async with conf.session_maker.begin() as session:
req = await session.execute(select(UserInDB).where(UserInDB.username==username))
return req.scalar()
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
async def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, conf.security['secret_key'], algorithms=[ALGORITHM])
username: str = payload.get("sub", '')
if username == '':
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = await get_user(username=token_data.username) # type: ignore
if user is None:
raise credentials_exception
return User(username=user.username, # type: ignore
email=user.email, # type: ignore
full_name=user.full_name) # type: ignore
async def authenticate_user(username: str, password: str):
user = await get_user(username)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
def create_access_token(data: dict, expires_delta: timedelta):
to_encode = data.copy()
expire = datetime.utcnow() + expires_delta
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode,
conf.security['secret_key'],
algorithm=ALGORITHM)
return encoded_jwt