mirror of
https://github.com/cssgunc/compass.git
synced 2025-04-09 22:00:18 -04:00
Compare commits
2 Commits
b04a37f1df
...
61dcfde469
Author | SHA1 | Date | |
---|---|---|---|
|
61dcfde469 | ||
|
8960386cf6 |
|
@ -1,11 +1,27 @@
|
|||
import jwt
|
||||
from fastapi import Depends, HTTPException, Header, status
|
||||
from datetime import datetime, timedelta
|
||||
from fastapi import Depends, HTTPException, status, APIRouter
|
||||
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
||||
from backend.models.user_model import User
|
||||
from ..services import UserService
|
||||
|
||||
auth_router = APIRouter()
|
||||
api = APIRouter(prefix="/api/authentication")
|
||||
|
||||
openapi_tags = {
|
||||
"name": "Authentication",
|
||||
"description": "Authentication of users and distributes bearer tokens",
|
||||
}
|
||||
|
||||
JWT_SECRET = "Sample Secret"
|
||||
JWT_ALGORITHM = "HS256"
|
||||
ACCESS_TOKEN_EXPIRE_MINUTES = 30
|
||||
|
||||
def create_access_token(user_id: str) -> str:
|
||||
expiration = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||||
payload = {"user_id": user_id, "exp": expiration}
|
||||
token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALGORITHM)
|
||||
return token
|
||||
|
||||
def registered_user(
|
||||
token: HTTPAuthorizationCredentials = Depends(HTTPBearer()),
|
||||
|
@ -13,16 +29,38 @@ def registered_user(
|
|||
) -> User:
|
||||
try:
|
||||
payload = jwt.decode(token.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM])
|
||||
user = user_service.get(payload["pid"])
|
||||
user_id = payload.get("user_id")
|
||||
|
||||
user = user_service.get_user_by_id(user_id)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="User not found",
|
||||
detail="User not found"
|
||||
)
|
||||
return user
|
||||
except jwt.ExpiredSignatureError:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Token expired"
|
||||
)
|
||||
except jwt.PyJWTError:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid or expired token",
|
||||
headers={"WWW-Authenticate": "Bearer"},
|
||||
)
|
||||
detail="Invalid token"
|
||||
)
|
||||
|
||||
@auth_router.post("/api/authentication", tags=["Authentication"])
|
||||
def return_bearer_token(user_id: str, user_service: UserService = Depends()):
|
||||
user = user_service.get_user_by_id(user_id)
|
||||
if not user:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED,
|
||||
detail="Invalid user ID"
|
||||
)
|
||||
|
||||
access_token = create_access_token(user_id=user_id)
|
||||
return {"access_token": access_token}
|
||||
|
||||
@auth_router.get("/api/authentication", tags=["Authentication"])
|
||||
def get_user_id(user_service: UserService = Depends()):
|
||||
return user_service.all()
|
|
@ -4,7 +4,7 @@ from fastapi.middleware.gzip import GZipMiddleware
|
|||
|
||||
|
||||
|
||||
from .api import user, health, service, resource, tag
|
||||
from .api import user, health, service, resource, tag, authentication
|
||||
|
||||
description = """
|
||||
Welcome to the **COMPASS** RESTful Application Programming Interface.
|
||||
|
@ -19,7 +19,8 @@ app = FastAPI(
|
|||
health.openapi_tags,
|
||||
service.openapi_tags,
|
||||
resource.openapi_tags,
|
||||
tag.openapi_tags
|
||||
tag.openapi_tags,
|
||||
authentication.openapi_tags
|
||||
],
|
||||
)
|
||||
|
||||
|
@ -30,6 +31,7 @@ feature_apis = [user, health, service, resource, tag]
|
|||
for feature_api in feature_apis:
|
||||
app.include_router(feature_api.api)
|
||||
|
||||
app.include_router(authentication.auth_router)
|
||||
|
||||
# Add application-wide exception handling middleware for commonly encountered API Exceptions
|
||||
@app.exception_handler(Exception)
|
||||
|
|
Loading…
Reference in New Issue
Block a user