Compare commits

...

2 Commits

Author SHA1 Message Date
Benjamin Lin
61dcfde469 Minor changes for API documentation page 2024-11-13 23:04:09 -05:00
Benjamin Lin
8960386cf6 Functional bearer token authentication system 2024-11-13 22:55:18 -05:00
2 changed files with 48 additions and 8 deletions

View File

@ -1,11 +1,27 @@
import jwt
from fastapi import Depends, HTTPException, Header, status
from datetime import datetime, timedelta
from fastapi import Depends, HTTPException, status, APIRouter
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from backend.models.user_model import User
from ..services import UserService
auth_router = APIRouter()
api = APIRouter(prefix="/api/authentication")
openapi_tags = {
"name": "Authentication",
"description": "Authentication of users and distributes bearer tokens",
}
JWT_SECRET = "Sample Secret"
JWT_ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def create_access_token(user_id: str) -> str:
expiration = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
payload = {"user_id": user_id, "exp": expiration}
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
return token
def registered_user(
token: HTTPAuthorizationCredentials = Depends(HTTPBearer()),
@ -13,16 +29,38 @@ def registered_user(
) -> User:
try:
payload = jwt.decode(token.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM])
user = user_service.get(payload["pid"])
user_id = payload.get("user_id")
user = user_service.get_user_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
detail="User not found"
)
return user
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token expired"
)
except jwt.PyJWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid or expired token",
headers={"WWW-Authenticate": "Bearer"},
)
detail="Invalid token"
)
@auth_router.post("/api/authentication", tags=["Authentication"])
def return_bearer_token(user_id: str, user_service: UserService = Depends()):
user = user_service.get_user_by_id(user_id)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid user ID"
)
access_token = create_access_token(user_id=user_id)
return {"access_token": access_token}
@auth_router.get("/api/authentication", tags=["Authentication"])
def get_user_id(user_service: UserService = Depends()):
return user_service.all()

View File

@ -4,7 +4,7 @@ from fastapi.middleware.gzip import GZipMiddleware
from .api import user, health, service, resource, tag
from .api import user, health, service, resource, tag, authentication
description = """
Welcome to the **COMPASS** RESTful Application Programming Interface.
@ -19,7 +19,8 @@ app = FastAPI(
health.openapi_tags,
service.openapi_tags,
resource.openapi_tags,
tag.openapi_tags
tag.openapi_tags,
authentication.openapi_tags
],
)
@ -30,6 +31,7 @@ feature_apis = [user, health, service, resource, tag]
for feature_api in feature_apis:
app.include_router(feature_api.api)
app.include_router(authentication.auth_router)
# Add application-wide exception handling middleware for commonly encountered API Exceptions
@app.exception_handler(Exception)