CSXL authentication.py for compass

This commit is contained in:
Benjamin Lin 2024-11-12 19:32:53 -05:00
parent 6e3c24f265
commit b04a37f1df
6 changed files with 63 additions and 184 deletions

View File

@ -0,0 +1,28 @@
import jwt
from fastapi import Depends, HTTPException, Header, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from backend.models.user_model import User
from ..services import UserService
JWT_SECRET = "Sample Secret"
JWT_ALGORITHM = "HS256"
def registered_user(
token: HTTPAuthorizationCredentials = Depends(HTTPBearer()),
user_service: UserService = Depends()
) -> User:
try:
payload = jwt.decode(token.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM])
user = user_service.get(payload["pid"])
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return user
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)

View File

@ -1,83 +0,0 @@
# token_utils.py
import jwt
from jwt import PyJWTError
from datetime import datetime, timedelta
from fastapi import HTTPException, status, Depends
from backend.models.user_model import User
from ..services import UserService
from passlib.context import CryptContext
from supabase import create_client, Client
# Supabase configuration
SUPABASE_URL = "placeholder"
SUPABASE_KEY = "sample key"
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
SECRET = "SECRET_KEY"
ALGORITHM = "HS256"
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def create_bearer_token(user_uuid: str, expires_delta: timedelta = None) -> str:
if expires_delta:
expire = datetime.now(datetime.UTC) + expires_delta
else:
expire = datetime.now(datetime.UTC) + timedelta(minutes=180)
to_encode = {
"sub": user_uuid,
"exp": expire,
}
token = jwt.encode(to_encode, SECRET, algorithm=ALGORITHM)
expires_at = expire.isoformat()
token_data = {
"user_uuid": user_uuid,
"token": token,
"expires_at": expires_at,
}
response = supabase.table("user_tokens").insert(token_data).execute()
def decode_token(token: str) -> User:
try:
payload = jwt.decode(token, SECRET, algorithms=[ALGORITHM])
user_uuid = payload.get("sub")
if user_uuid is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
headers={"WWW-Authenticate": "Bearer"},
)
user_data = UserService.get_user_by_uuid(user_uuid)
if user_data is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
user = User(
id=user_data.id,
username=user_data.username,
email=user_data.email,
experience=user_data.experience,
group=user_data.group,
program=user_data.program,
role=user_data.role,
created_at=user_data.created_at,
uuid=user_data.uuid,
)
return user
except PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)

View File

@ -1,13 +1,11 @@
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from backend.api.decoder import decode_token
from backend.models.user_model import User
from ..services import ResourceService, UserService
from ..models.resource_model import Resource
from typing import List from typing import List
from .authentication import registered_user
from backend.models.user_model import User
from ..services import ResourceService
from ..models.resource_model import Resource
api = APIRouter(prefix="/api/resource") api = APIRouter(prefix="/api/resource")
openapi_tags = { openapi_tags = {
@ -15,51 +13,32 @@ openapi_tags = {
"description": "Resource search and related operations.", "description": "Resource search and related operations.",
} }
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
user = decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# TODO: Add security using HTTP Bearer Tokens
# TODO: Enable authorization by passing user uuid to API
# TODO: Create custom exceptions
@api.post("", response_model=Resource, tags=["Resource"]) @api.post("", response_model=Resource, tags=["Resource"])
def create( def create(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
resource: Resource = Depends(), resource: Resource = Depends(),
resource_svc: ResourceService = Depends(), resource_svc: ResourceService = Depends(),
): ):
return resource_svc.create(subject, resource) return resource_svc.create(subject, resource)
@api.get("", response_model=List[Resource], tags=["Resource"]) @api.get("", response_model=List[Resource], tags=["Resource"])
def get_all( def get_all(
subject: User = Depends(get_current_user), resource_svc: ResourceService = Depends() subject: User = Depends(registered_user),
resource_svc: ResourceService = Depends()
): ):
return resource_svc.get_resource_by_user(subject) return resource_svc.get_resource_by_user(subject)
@api.put("", response_model=Resource, tags=["Resource"]) @api.put("", response_model=Resource, tags=["Resource"])
def update( def update(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
resource: Resource = Depends(), resource: Resource = Depends(),
resource_svc: ResourceService = Depends(), resource_svc: ResourceService = Depends(),
): ):
return resource_svc.update(subject, resource) return resource_svc.update(subject, resource)
@api.delete("", response_model=None, tags=["Resource"]) @api.delete("", response_model=None, tags=["Resource"])
def delete( def delete(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
resource: Resource = Depends(), resource: Resource = Depends(),
resource_svc: ResourceService = Depends(), resource_svc: ResourceService = Depends(),
): ):

View File

@ -1,13 +1,11 @@
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from backend.api.decoder import decode_token
from backend.models.user_model import User
from ..services import ServiceService, UserService
from ..models.service_model import Service
from typing import List from typing import List
from .authentication import registered_user
from backend.models.user_model import User
from ..services import ServiceService
from ..models.service_model import Service
api = APIRouter(prefix="/api/service") api = APIRouter(prefix="/api/service")
openapi_tags = { openapi_tags = {
@ -15,54 +13,32 @@ openapi_tags = {
"description": "Service search and related operations.", "description": "Service search and related operations.",
} }
# Creates an OAuth instance
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
user = decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# TODO: Add security using HTTP Bearer Tokens
# TODO: Enable authorization by passing user uuid to API
# TODO: Create custom exceptions
@api.post("", response_model=Service, tags=["Service"]) @api.post("", response_model=Service, tags=["Service"])
def create( def create(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
service: Service = Depends(), service: Service = Depends(),
service_svc: ServiceService = Depends(), service_svc: ServiceService = Depends(),
): ):
return service_svc.create(subject, service) return service_svc.create(subject, service)
@api.get("", response_model=List[Service], tags=["Service"]) @api.get("", response_model=List[Service], tags=["Service"])
def get_all( def get_all(
subject: User = Depends(get_current_user), service_svc: ServiceService = Depends() subject: User = Depends(registered_user),
service_svc: ServiceService = Depends()
): ):
return service_svc.get_service_by_user(subject) return service_svc.get_service_by_user(subject)
@api.put("", response_model=Service, tags=["Service"]) @api.put("", response_model=Service, tags=["Service"])
def update( def update(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
service: Service = Depends(), service: Service = Depends(),
service_svc: ServiceService = Depends(), service_svc: ServiceService = Depends(),
): ):
return service_svc.update(subject, service) return service_svc.update(subject, service)
@api.delete("", response_model=None, tags=["Service"]) @api.delete("", response_model=None, tags=["Service"])
def delete( def delete(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
service: Service = Depends(), service: Service = Depends(),
service_svc: ServiceService = Depends(), service_svc: ServiceService = Depends(),
): ):

View File

@ -1,14 +1,10 @@
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends
from fastapi.security import OAuth2PasswordBearer from typing import List
from backend.api.decoder import decode_token
from .authentication import registered_user
from backend.models.tag_model import Tag from backend.models.tag_model import Tag
from backend.models.user_model import User from backend.models.user_model import User
from backend.services.tag import TagService from backend.services.tag import TagService
from ..services import ResourceService, UserService
from ..models.resource_model import Resource
from typing import List
api = APIRouter(prefix="/api/tag") api = APIRouter(prefix="/api/tag")
@ -17,49 +13,32 @@ openapi_tags = {
"description": "Tag CRUD operations.", "description": "Tag CRUD operations.",
} }
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_current_user(token: str = Depends(oauth2_scheme)) -> User:
user = decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# TODO: Add security using HTTP Bearer Tokens
# TODO: Enable authorization by passing user uuid to API
# TODO: Create custom exceptions
@api.post("", response_model=Tag, tags=["Tag"]) @api.post("", response_model=Tag, tags=["Tag"])
def create( def create(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
tag: Tag = Depends(), tag: Tag = Depends(),
tag_service: TagService = Depends(), tag_service: TagService = Depends(),
): ):
return tag_service.create(subject, tag) return tag_service.create(subject, tag)
@api.get("", response_model=List[Tag], tags=["Tag"]) @api.get("", response_model=List[Tag], tags=["Tag"])
def get_all(subject: User = Depends(get_current_user), tag_svc: TagService = Depends()): def get_all(
subject: User = Depends(registered_user),
tag_svc: TagService = Depends()
):
return tag_svc.get_all() return tag_svc.get_all()
@api.put("", response_model=Tag, tags=["Tag"]) @api.put("", response_model=Tag, tags=["Tag"])
def update( def update(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
tag: Tag = Depends(), tag: Tag = Depends(),
tag_svc: TagService = Depends(), tag_svc: TagService = Depends(),
): ):
return tag_svc.delete(subject, tag) return tag_svc.update(subject, tag)
@api.delete("", response_model=None, tags=["Tag"]) @api.delete("", response_model=None, tags=["Tag"])
def delete( def delete(
subject: User = Depends(get_current_user), subject: User = Depends(registered_user),
tag: Tag = Depends(), tag: Tag = Depends(),
tag_svc: TagService = Depends(), tag_svc: TagService = Depends(),
): ):

View File

@ -21,7 +21,7 @@ def get_all(user_id: str, user_svc: UserService = Depends()):
if subject.role != UserTypeEnum.ADMIN: if subject.role != UserTypeEnum.ADMIN:
raise Exception(f"Insufficient permissions for user {subject.uuid}") raise Exception(f"Insufficient permissions for user {subject.uuid}")
# create_bearer_token(user_id)
return user_svc.all() return user_svc.all()