mirror of
https://github.com/cssgunc/compass.git
synced 2025-04-07 05:00:16 -04:00
152 lines
5.2 KiB
Python
152 lines
5.2 KiB
Python
from fastapi import Depends
|
|
from ..database import db_session
|
|
from sqlalchemy.orm import Session
|
|
from sqlalchemy import select
|
|
from ..models.resource_model import Resource
|
|
from ..entities.resource_entity import ResourceEntity
|
|
from ..models.user_model import User
|
|
|
|
from .exceptions import ResourceNotFoundException
|
|
|
|
class ResourceService:
|
|
|
|
def __init__(self, session: Session = Depends(db_session)):
|
|
self._session = session
|
|
|
|
def all(self, user: User) -> list[Resource]:
|
|
"""
|
|
Retrieves all Resources that the user has access to based on their role and group.
|
|
|
|
Parameters:
|
|
user: a valid User model representing the currently logged in User
|
|
|
|
Returns:
|
|
list[Resource]: list of accessible `Resource` for the user
|
|
"""
|
|
# Filter resources based on user's role and group
|
|
query = select(ResourceEntity).where(
|
|
ResourceEntity.role == user.role,
|
|
ResourceEntity.group == user.group
|
|
)
|
|
entities = self._session.scalars(query).all()
|
|
|
|
return [entity.to_model() for entity in entities]
|
|
|
|
def create(self, user: User, resource: Resource) -> Resource:
|
|
"""
|
|
Creates a resource based on the input object and adds it to the table if the user has the right permissions.
|
|
|
|
Parameters:
|
|
user: a valid User model representing the currently logged in User
|
|
resource: Resource object to add to table
|
|
|
|
Returns:
|
|
Resource: Object added to table
|
|
"""
|
|
if resource.role != user.role or resource.group != user.group:
|
|
raise PermissionError("User does not have permission to add resources in this role or group.")
|
|
|
|
resource_entity = ResourceEntity.from_model(resource)
|
|
self._session.add(resource_entity)
|
|
self._session.commit()
|
|
|
|
return resource_entity.to_model()
|
|
|
|
def get_by_id(self, user: User, id: int) -> Resource:
|
|
"""
|
|
Gets a resource based on the resource id that the user has access to
|
|
|
|
Parameters:
|
|
user: a valid User model representing the currently logged in User
|
|
id: int, the id of the resource
|
|
|
|
Returns:
|
|
Resource
|
|
|
|
Raises:
|
|
ResourceNotFoundException: If no resource is found with id
|
|
"""
|
|
resource = (
|
|
self._session.query(ResourceEntity)
|
|
.filter(ResourceEntity.id == id, ResourceEntity.role == user.role, ResourceEntity.group == user.group)
|
|
.one_or_none()
|
|
)
|
|
|
|
if resource is None:
|
|
raise ResourceNotFoundException(f"No resource found with id: {id}")
|
|
|
|
return resource.to_model()
|
|
|
|
def update(self, user: User, resource: ResourceEntity) -> Resource:
|
|
"""
|
|
Update the resource if the user has access
|
|
|
|
Parameters:
|
|
user: a valid User model representing the currently logged in User
|
|
resource (ResourceEntity): Resource to update
|
|
|
|
Returns:
|
|
Resource: Updated resource object
|
|
|
|
Raises:
|
|
ResourceNotFoundException: If no resource is found with the corresponding ID
|
|
"""
|
|
if resource.role != user.role or resource.group != user.group:
|
|
raise PermissionError("User does not have permission to update this resource.")
|
|
|
|
obj = self._session.get(ResourceEntity, resource.id) if resource.id else None
|
|
|
|
if obj is None:
|
|
raise ResourceNotFoundException(f"No resource found with matching id: {resource.id}")
|
|
|
|
obj.update_from_model(resource) # Assuming an update method exists
|
|
self._session.commit()
|
|
|
|
return obj.to_model()
|
|
|
|
def delete(self, user: User, id: int) -> None:
|
|
"""
|
|
Delete resource based on id that the user has access to
|
|
|
|
Parameters:
|
|
user: a valid User model representing the currently logged in User
|
|
id: int, a unique resource id
|
|
|
|
Raises:
|
|
ResourceNotFoundException: If no resource is found with the corresponding id
|
|
"""
|
|
resource = (
|
|
self._session.query(ResourceEntity)
|
|
.filter(ResourceEntity.id == id, ResourceEntity.role == user.role, ResourceEntity.group == user.group)
|
|
.one_or_none()
|
|
)
|
|
|
|
if resource is None:
|
|
raise ResourceNotFoundException(f"No resource found with matching id: {id}")
|
|
|
|
self._session.delete(resource)
|
|
self._session.commit()
|
|
|
|
def get_by_slug(self, user: User, search_string: str) -> list[Resource]:
|
|
"""
|
|
Get a list of resources given a search string that the user has access to
|
|
|
|
Parameters:
|
|
user: a valid User model representing the currently logged in User
|
|
search_string: a string to search resources by
|
|
|
|
Returns:
|
|
list[Resource]: list of resources relating to the string
|
|
|
|
Raises:
|
|
ResourceNotFoundException if no resource is found with the corresponding slug
|
|
"""
|
|
query = select(ResourceEntity).where(
|
|
ResourceEntity.title.ilike(f"%{search_string}%"),
|
|
ResourceEntity.role == user.role,
|
|
ResourceEntity.group == user.group
|
|
)
|
|
entities = self._session.scalars(query).all()
|
|
|
|
return [entity.to_model() for entity in entities]
|