diff --git a/syncmaster/db/repositories/group.py b/syncmaster/db/repositories/group.py index 6a31d355..b3a86432 100644 --- a/syncmaster/db/repositories/group.py +++ b/syncmaster/db/repositories/group.py @@ -26,7 +26,7 @@ class GroupRepository(Repository[Group]): def __init__(self, session: AsyncSession): super().__init__(model=Group, session=session) - async def paginate_all( + async def paginate_for_superuser( self, page: int, page_size: int, @@ -200,29 +200,18 @@ async def update( except IntegrityError as e: self._raise_error(e) - async def get_member_role(self, group_id: int, user_id: int) -> GroupMemberRole: + async def get_member_role(self, user: User, group_id: int) -> GroupMemberRole: + if user.is_superuser: + return GroupMemberRole.Superuser + user_group = await self._session.get( UserGroup, { "group_id": group_id, - "user_id": user_id, + "user_id": user.id, }, ) - if user_group is None: # then it's either the owner or a superuser because permission exists - owner_query = ( - ( - select(Group).where( - Group.owner_id == user_id, - Group.id == group_id, - ) - ) - .exists() - .select() - ) - is_owner = await self._session.scalar(owner_query) - return GroupMemberRole.Owner if is_owner else GroupMemberRole.Superuser - - return user_group.role + return user_group.role if user_group else GroupMemberRole.Owner async def update_member_role( self, @@ -309,6 +298,9 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: if not await self._session.get(Group, group_id): raise GroupNotFoundError + if user.is_superuser: + return Permission.DELETE + owner_query = ( ( select(Group).where( @@ -321,8 +313,7 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: ) is_owner = await self._session.scalar(owner_query) - - if is_owner or user.is_superuser: + if is_owner: return Permission.DELETE group_role_query = select(UserGroup).where( @@ -331,11 +322,7 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: ) user_group = await self._session.scalar(group_role_query) - - if not user_group: - return Permission.NONE - - return Permission.READ + return Permission.READ if user_group else Permission.NONE async def get_user_group(self, group_id: int, user_id: int) -> UserGroup | None: return await self._session.get( diff --git a/syncmaster/db/repositories/queue.py b/syncmaster/db/repositories/queue.py index 4fc23964..9b8e719c 100644 --- a/syncmaster/db/repositories/queue.py +++ b/syncmaster/db/repositories/queue.py @@ -84,6 +84,11 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: Method for determining CRUD permissions in the specified group 'DEVELOPER' does not have WRITE permission in the QUEUE repository """ + if not await self._session.get(Group, group_id): + raise GroupNotFoundError + + if user.is_superuser: + return Permission.DELETE owner_query = ( ( @@ -97,7 +102,6 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: ) is_owner = await self._session.scalar(owner_query) - if is_owner: return Permission.DELETE @@ -107,23 +111,13 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: ) user_group = await self._session.scalar(group_role_query) - if not user_group: - # Check: group exists - if not await self._session.get(Group, group_id): - raise GroupNotFoundError + return Permission.NONE - # If the user is not in the group, then he is either a superuser or does not have any rights - if not user.is_superuser: - return Permission.NONE + if user_group.role == GroupMemberRole.Maintainer: return Permission.DELETE - group_role = user_group.role - - if group_role in (GroupMemberRole.Guest, GroupMemberRole.Developer): - return Permission.READ - - return Permission.DELETE + return Permission.READ async def get_resource_permission(self, user: User, resource_id: int) -> Permission: """ @@ -155,7 +149,6 @@ async def get_resource_permission(self, user: User, resource_id: int) -> Permiss ) is_owner = await self._session.scalar(owner_query) - if is_owner: return Permission.DELETE @@ -173,12 +166,10 @@ async def get_resource_permission(self, user: User, resource_id: int) -> Permiss if not user_group: return Permission.NONE - group_role = user_group.role - - if group_role in (GroupMemberRole.Guest, GroupMemberRole.Developer): - return Permission.READ + if user_group.role == GroupMemberRole.Maintainer: + return Permission.DELETE - return Permission.DELETE + return Permission.READ def _raise_error(self, err: DBAPIError) -> NoReturn: constraint = err.__cause__.__cause__.constraint_name # type: ignore[arg-type, union-attr] diff --git a/syncmaster/db/repositories/repository_with_owner.py b/syncmaster/db/repositories/repository_with_owner.py index fa671d1b..10573c7e 100644 --- a/syncmaster/db/repositories/repository_with_owner.py +++ b/syncmaster/db/repositories/repository_with_owner.py @@ -40,7 +40,6 @@ async def get_resource_permission(self, user: User, resource_id: int) -> Permiss ) is_owner = await self._session.scalar(owner_query) - if is_owner: return Permission.DELETE @@ -58,18 +57,22 @@ async def get_resource_permission(self, user: User, resource_id: int) -> Permiss if not user_group: return Permission.NONE - group_role = user_group.role - - if group_role == GroupMemberRole.Guest: + if user_group.role == GroupMemberRole.Guest: return Permission.READ - if group_role == GroupMemberRole.Developer: + if user_group.role == GroupMemberRole.Developer: return Permission.WRITE return Permission.DELETE # Maintainer async def get_group_permission(self, user: User, group_id: int) -> Permission: """Method for determining CRUD permissions in the specified group""" + if not await self._session.get(Group, group_id): + raise GroupNotFoundError + + if user.is_superuser: + return Permission.DELETE + owner_query = ( ( select(Group).where( @@ -82,7 +85,6 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: ) is_owner = await self._session.scalar(owner_query) - if is_owner: return Permission.DELETE @@ -92,23 +94,13 @@ async def get_group_permission(self, user: User, group_id: int) -> Permission: ) user_group = await self._session.scalar(group_role_query) - if not user_group: - # Check: group exists - if not await self._session.get(Group, group_id): - raise GroupNotFoundError + return Permission.NONE - # If the user is not in the group, then he is either a superuser or does not have any rights - if not user.is_superuser: - return Permission.NONE + if user_group.role == GroupMemberRole.Maintainer: return Permission.DELETE - group_role = user_group.role - - if group_role == GroupMemberRole.Guest: - return Permission.READ - - if group_role == GroupMemberRole.Developer: + if user_group.role == GroupMemberRole.Developer: return Permission.WRITE - return Permission.DELETE # Maintainer + return Permission.READ diff --git a/syncmaster/server/api/v1/groups.py b/syncmaster/server/api/v1/groups.py index a4fe2d10..e97dcf19 100644 --- a/syncmaster/server/api/v1/groups.py +++ b/syncmaster/server/api/v1/groups.py @@ -42,7 +42,7 @@ async def read_groups( # noqa: PLR0913 ] = None, ) -> GroupPageSchema: if current_user.is_superuser: - pagination = await unit_of_work.group.paginate_all( + pagination = await unit_of_work.group.paginate_for_superuser( page=page, page_size=page_size, search_query=search_query, @@ -88,7 +88,7 @@ async def read_group( raise GroupNotFoundError group = await unit_of_work.group.read_by_id(group_id=group_id) - user_role = await unit_of_work.group.get_member_role(group_id=group_id, user_id=current_user.id) + user_role = await unit_of_work.group.get_member_role(user=current_user, group_id=group_id) return GroupWithUserRoleSchema( data=ReadGroupSchema.model_validate(group, from_attributes=True), role=GroupMemberRole(user_role), diff --git a/tests/test_unit/test_groups/test_read_groups.py b/tests/test_unit/test_groups/test_read_groups.py index 617010f7..c3657ee9 100644 --- a/tests/test_unit/test_groups/test_read_groups.py +++ b/tests/test_unit/test_groups/test_read_groups.py @@ -46,7 +46,7 @@ async def test_group_member_can_read_groups( } -async def test_groupless_user_cannot_get_any_groups( +async def test_groupless_user_cannot_read_groups( client: AsyncClient, simple_user: MockUser, group: MockGroup, # do not delete this group, it is not used but is needed to show that the group is not read