-
Notifications
You must be signed in to change notification settings - Fork 1
Add kick extension and user management services #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
3f72657
945eede
40fde3b
fc31cd5
312ebb9
1923554
4d6d312
828d221
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,4 +11,5 @@ build/ | |
| logs/*.log | ||
| config/*.local.yaml | ||
| .idea | ||
| *.db | ||
| *.db | ||
| settings.json | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,2 @@ | ||
| class SpaceNotFoundError(Exception): | ||
| pass |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| from matrix import Context, Extension | ||
| from matrix.errors import CheckError | ||
|
|
||
| from bot.permissions import is_moderator | ||
| from .kick_service import kick_from_context | ||
| from .models import KickResult | ||
| from .errors import SpaceNotFoundError | ||
|
|
||
| extension = Extension("kick") | ||
|
|
||
| KICK_RESULT_TEMPLATE = ( | ||
| "{space}" | ||
| "Target: `{target}`\n" | ||
| "Kicked from: `{kicked}` rooms\n" | ||
| "Failed in: `{failed}` rooms\n" | ||
| "Reason: {reason}" | ||
| ) | ||
|
|
||
|
|
||
| def format_kick_result(result: KickResult) -> str: | ||
| return KICK_RESULT_TEMPLATE.format( | ||
| space=f"Space: `{result.space_id}`\n" if result.space_id else "", | ||
| target=result.target_user_id, | ||
| kicked=len(result.kicked_room_ids), | ||
| failed=len(result.failed_room_ids), | ||
| reason=result.reason, | ||
| ) | ||
|
|
||
|
|
||
| @extension.command( | ||
| "kick", | ||
| description="Kick a user from all rooms in the current space", | ||
| ) | ||
| async def kick( | ||
| ctx: Context, | ||
| user_id: str, | ||
| reason: str = "No reason provided", | ||
| ) -> None: | ||
| result = await kick_from_context(ctx, user_id, reason) | ||
| await ctx.reply(format_kick_result(result)) | ||
|
|
||
|
|
||
| @kick.error(SpaceNotFoundError) | ||
| async def kick_space_error(ctx: Context, error: SpaceNotFoundError) -> None: | ||
| await ctx.reply(f"Could not complete kick operation: {error}") | ||
|
|
||
|
|
||
| @kick.error(CheckError) | ||
| async def kick_check_error(ctx: Context, error: CheckError) -> None: | ||
| await ctx.reply(f"Could not complete kick operation: {error}") | ||
|
|
||
|
|
||
| @kick.check | ||
| async def can_kick(ctx: Context) -> bool: | ||
| return await is_moderator(ctx) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| from matrix import Context | ||
| from matrix.errors import MatrixError | ||
|
|
||
| from .errors import SpaceNotFoundError | ||
| from .models import KickResult | ||
| from .space_service import ( | ||
| get_parent_space_id, | ||
| collect_space_child_room_ids, | ||
| ) | ||
|
|
||
|
|
||
| async def kick_from_context( | ||
| ctx: Context, | ||
| user_id: str, | ||
| reason: str, | ||
| ) -> KickResult: | ||
| space_id = get_parent_space_id(ctx) | ||
|
|
||
| if space_id is None: | ||
| room_id = ctx.room.room_id | ||
|
|
||
| try: | ||
| await ctx.room.kick_user(user_id, reason=reason) | ||
| except MatrixError: | ||
| return KickResult( | ||
| target_user_id=user_id, | ||
| reason=reason, | ||
| space_id=None, | ||
| kicked_room_ids=[], | ||
| failed_room_ids=[room_id] if room_id else [], | ||
| ) | ||
|
|
||
| return KickResult( | ||
| target_user_id=user_id, | ||
| reason=reason, | ||
| space_id=None, | ||
| kicked_room_ids=[room_id] if room_id else [], | ||
| failed_room_ids=[room_id] if not room_id else [], | ||
| ) | ||
|
|
||
| space = ctx.bot.get_room(space_id) | ||
| if space is None: | ||
| raise SpaceNotFoundError( | ||
| f"Could not find parent space in room cache: {space_id}" | ||
| ) | ||
|
|
||
| room_ids: list[str] = [] | ||
| await collect_space_child_room_ids(ctx, space, room_ids, set()) | ||
|
|
||
| kicked: list[str] = [] | ||
| failed: list[str] = [] | ||
|
|
||
| for room_id in room_ids: | ||
| room = ctx.bot.get_room(room_id) | ||
| if room is None: | ||
| failed.append(room_id) | ||
| continue | ||
|
|
||
| try: | ||
| await room.kick_user(user_id, reason=reason) | ||
| except MatrixError: | ||
| failed.append(room_id) | ||
| continue | ||
|
|
||
| kicked.append(room_id) | ||
|
|
||
| return KickResult( | ||
| target_user_id=user_id, | ||
| reason=reason, | ||
| space_id=space_id, | ||
| kicked_room_ids=kicked, | ||
| failed_room_ids=failed, | ||
| ) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,10 @@ | ||
| from dataclasses import dataclass | ||
|
|
||
|
|
||
| @dataclass(frozen=True) | ||
| class KickResult: | ||
| target_user_id: str | ||
| reason: str | ||
| space_id: str | None | ||
| kicked_room_ids: list[str] | ||
| failed_room_ids: list[str] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,42 @@ | ||
| from matrix import Context, Room | ||
|
|
||
|
|
||
| def get_parent_space_id(ctx: Context) -> str | None: | ||
| matrix_room = ctx.room.matrix_room | ||
| parents: set[str] = matrix_room.parents | ||
|
|
||
| return next(iter(parents), None) | ||
|
|
||
|
|
||
| async def collect_space_child_room_ids( | ||
| ctx: Context, | ||
| room: Room, | ||
| room_ids: list[str], | ||
| seen: set[str], | ||
|
chrisdedman marked this conversation as resolved.
|
||
| depth: int = 0, | ||
| ) -> None: | ||
| """Collect non-space child room IDs from a Matrix space.""" | ||
| if depth >= 10: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can reduce this to 3 since, from what I researched, most clients limit it to 3.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we might want to log it and ideally find a way to notify admins that we reached the max depth. I have a couple of ideas. Like, raising an error that will notify us that we reached the max depth and that it needs to be ran with a flag enabled to "force it"/means we acknowledged it. (the error would also return the space at the max depth so we can use it later on. |
||
| return | ||
|
|
||
| for child_room_id in room.children: | ||
| if child_room_id in seen: | ||
| continue | ||
|
|
||
| seen.add(child_room_id) | ||
|
|
||
| child_room = ctx.bot.get_room(child_room_id) | ||
| if child_room is None: | ||
|
chrisdedman marked this conversation as resolved.
|
||
| continue | ||
|
|
||
| if child_room.room_type == "m.space": | ||
| await collect_space_child_room_ids( | ||
| ctx, | ||
| child_room, | ||
| room_ids, | ||
| seen, | ||
| depth + 1, | ||
| ) | ||
| continue | ||
|
|
||
| room_ids.append(child_room_id) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,22 @@ | ||
| from matrix import Context | ||
|
|
||
|
|
||
| def _configured_moderators(ctx: Context) -> list[str]: | ||
| moderators = ctx.bot.config.get( | ||
| "moderators", | ||
| section="bot", | ||
| default=[], | ||
| ) | ||
|
|
||
| if isinstance(moderators, str): | ||
| return [ | ||
| moderator.strip() | ||
| for moderator in moderators.split(",") | ||
| if moderator.strip() | ||
| ] | ||
|
|
||
| return moderators | ||
|
|
||
|
|
||
| async def is_moderator(ctx: Context) -> bool: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be fine for now, but I'm sure we can check a user's permission differently.
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return ctx.sender in _configured_moderators(ctx) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| import pytest | ||
| from matrix.errors import MatrixError | ||
| from types import SimpleNamespace | ||
| from typing import cast | ||
| from unittest.mock import AsyncMock, MagicMock, patch | ||
|
|
||
| from bot.extensions.moderation.kick_service import kick_from_context | ||
| from matrix import Context | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_kick_from_context_kicks_current_room_when_not_in_space() -> None: | ||
| ctx = SimpleNamespace( | ||
| room=SimpleNamespace( | ||
| room_id="!room:example.com", | ||
| kick_user=AsyncMock(), | ||
| ), | ||
| ) | ||
|
|
||
| with patch( | ||
| "bot.extensions.moderation.kick_service.get_parent_space_id", | ||
| return_value=None, | ||
| ): | ||
| result = await kick_from_context( | ||
| cast(Context, ctx), "@target:example.com", "spam" | ||
| ) | ||
|
|
||
| ctx.room.kick_user.assert_awaited_once_with("@target:example.com", reason="spam") | ||
| assert result.kicked_room_ids == ["!room:example.com"] | ||
| assert result.failed_room_ids == [] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_kick_from_context_records_current_room_failure() -> None: | ||
| ctx = SimpleNamespace( | ||
| room=SimpleNamespace( | ||
| room_id="!room:example.com", | ||
| kick_user=AsyncMock(side_effect=MatrixError("not allowed")), | ||
| ), | ||
| ) | ||
|
|
||
| with patch( | ||
| "bot.extensions.moderation.kick_service.get_parent_space_id", | ||
| return_value=None, | ||
| ): | ||
| result = await kick_from_context( | ||
| cast(Context, ctx), "@target:example.com", "spam" | ||
| ) | ||
|
|
||
| assert result.kicked_room_ids == [] | ||
| assert result.failed_room_ids == ["!room:example.com"] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_kick_from_context_collects_space_successes_and_failures() -> None: | ||
| space = SimpleNamespace(room_id="!space:example.com") | ||
| successful_room = SimpleNamespace(kick_user=AsyncMock()) | ||
| failed_room = SimpleNamespace( | ||
| kick_user=AsyncMock(side_effect=MatrixError("denied")) | ||
| ) | ||
|
|
||
| bot = MagicMock() | ||
| bot.get_room.side_effect = lambda room_id: { | ||
| "!space:example.com": space, | ||
| "!success:example.com": successful_room, | ||
| "!failed:example.com": failed_room, | ||
| "!missing:example.com": None, | ||
| }[room_id] | ||
| ctx = SimpleNamespace(bot=bot) | ||
|
|
||
| async def collect_room_ids(_ctx, _space, room_ids, _seen): | ||
| room_ids.extend( | ||
| [ | ||
| "!success:example.com", | ||
| "!failed:example.com", | ||
| "!missing:example.com", | ||
| ] | ||
| ) | ||
|
|
||
| with ( | ||
| patch( | ||
| "bot.extensions.moderation.kick_service.get_parent_space_id", | ||
| return_value="!space:example.com", | ||
| ), | ||
| patch( | ||
| "bot.extensions.moderation.kick_service.collect_space_child_room_ids", | ||
| side_effect=collect_room_ids, | ||
| ), | ||
| ): | ||
| result = await kick_from_context( | ||
| cast(Context, ctx), "@target:example.com", "spam" | ||
| ) | ||
|
|
||
| successful_room.kick_user.assert_awaited_once_with( | ||
| "@target:example.com", reason="spam" | ||
| ) | ||
| failed_room.kick_user.assert_awaited_once_with("@target:example.com", reason="spam") | ||
| assert result.kicked_room_ids == ["!success:example.com"] | ||
| assert result.failed_room_ids == ["!failed:example.com", "!missing:example.com"] |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,37 @@ | ||
| import pytest | ||
| from types import SimpleNamespace | ||
|
|
||
| from bot.permissions import is_moderator | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_is_moderator_accepts_configured_list() -> None: | ||
| config = SimpleNamespace( | ||
| get=lambda _key, section, default: [ | ||
| "@admin:example.com", | ||
| "@mod:example.com", | ||
| ] | ||
| ) | ||
| ctx = SimpleNamespace(bot=SimpleNamespace(config=config), sender="@mod:example.com") | ||
|
|
||
| assert await is_moderator(ctx) is True # type: ignore[arg-type] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_is_moderator_accepts_comma_separated_config_string() -> None: | ||
| config = SimpleNamespace( | ||
| get=lambda _key, section, default: ("@admin:example.com, @mod:example.com, ") | ||
| ) | ||
| ctx = SimpleNamespace(bot=SimpleNamespace(config=config), sender="@mod:example.com") | ||
|
|
||
| assert await is_moderator(ctx) is True # type: ignore[arg-type] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_is_moderator_rejects_unconfigured_user() -> None: | ||
| config = SimpleNamespace(get=lambda _key, section, default: ["@admin:example.com"]) | ||
| ctx = SimpleNamespace( | ||
| bot=SimpleNamespace(config=config), sender="@user:example.com" | ||
| ) | ||
|
|
||
| assert await is_moderator(ctx) is False # type: ignore[arg-type] |
Uh oh!
There was an error while loading. Please reload this page.