diff --git a/src/basic_memory/cli/commands/cloud/__init__.py b/src/basic_memory/cli/commands/cloud/__init__.py index 273365ec..35a1d974 100644 --- a/src/basic_memory/cli/commands/cloud/__init__.py +++ b/src/basic_memory/cli/commands/cloud/__init__.py @@ -11,9 +11,11 @@ # Register snapshot sub-command group from basic_memory.cli.commands.cloud.snapshot import snapshot_app from basic_memory.cli.commands.cloud.workspace import workspace_app +from basic_memory.cli.commands.cloud.shares import share_app cloud_app.add_typer(snapshot_app, name="snapshot") cloud_app.add_typer(workspace_app, name="workspace") +cloud_app.add_typer(share_app, name="share") # Register restore command (directly on cloud_app via decorator) from basic_memory.cli.commands.cloud.restore import restore # noqa: F401, E402 diff --git a/src/basic_memory/cli/commands/cloud/shares.py b/src/basic_memory/cli/commands/cloud/shares.py new file mode 100644 index 00000000..a0566394 --- /dev/null +++ b/src/basic_memory/cli/commands/cloud/shares.py @@ -0,0 +1,533 @@ +"""Public share CLI commands for Basic Memory Cloud. + +Surfaces the cloud `/api/shares` endpoints so users can manage public share +links for notes without leaving the terminal: + +- POST /api/shares -> create +- GET /api/shares -> list +- PATCH /api/shares/{token} -> update (enable/disable, set expiration) +- DELETE /api/shares/{token} -> revoke + +Auth, config lookup, and error handling reuse the shared `make_api_request()` +helper, matching the `snapshot.py` command group. +""" + +import asyncio +from datetime import datetime +from typing import Optional +from urllib.parse import urlencode +from uuid import UUID + +import typer +from rich.console import Console +from rich.table import Table + +from basic_memory.cli.commands.cloud.api_client import ( + CloudAPIError, + SubscriptionRequiredError, + make_api_request, +) +from basic_memory.config import ConfigManager +from basic_memory.mcp.async_client import resolve_configured_workspace +from basic_memory.schemas.cloud import WorkspaceInfo + +console = Console() +share_app = typer.Typer(help="Manage public share links for notes") + +# Header the cloud uses to route a request to a specific tenant's workspace. +# Mirrors basic_memory.cli.commands.cloud.cloud_utils._workspace_headers: the +# cloud /api/shares endpoints resolve the workspace from X-Workspace-ID (see +# resolve_workspace in basic-memory-cloud deps.py), so without it a team +# workspace project would be evaluated against the caller's default tenant. +WORKSPACE_ID_HEADER = "X-Workspace-ID" + + +def _is_uuid(value: str) -> bool: + """Return True when value parses as a UUID in any standard textual form.""" + try: + UUID(value) + except ValueError: + return False + return True + + +def _match_workspace_identifier( + workspaces: list[WorkspaceInfo], identifier: str +) -> Optional[WorkspaceInfo]: + """Match a human workspace identifier with slug > tenant_id > name precedence. + + Mirrors project_context._match_workspace_identifier (PR #979): try the stable + slug first (case-insensitive), then the exact tenant_id, then the display name + (case-insensitive). The first tier that yields any match wins, so a display + name colliding with another workspace's slug never shadows the slug match. + """ + slug_matches = [ws for ws in workspaces if ws.slug.casefold() == identifier.casefold()] + if slug_matches: + return slug_matches[0] if len(slug_matches) == 1 else _ambiguous(slug_matches, identifier) + + tenant_matches = [ws for ws in workspaces if ws.tenant_id == identifier] + if tenant_matches: + return tenant_matches[0] + + name_matches = [ws for ws in workspaces if ws.name.casefold() == identifier.casefold()] + if name_matches: + return name_matches[0] if len(name_matches) == 1 else _ambiguous(name_matches, identifier) + + return None + + +def _ambiguous(matches: list[WorkspaceInfo], identifier: str) -> WorkspaceInfo: + """Fail with a clear, copyable error when an identifier matches >1 workspace. + + Trigger: a display name (or slug, defensively) resolves to multiple workspaces. + Why: silently picking one would route a share to the wrong tenant. + Outcome: list candidate slugs/tenant_ids and exit non-zero so the user re-runs + with an unambiguous slug or tenant_id. + """ + candidates = "\n".join(f" - {ws.slug} (tenant_id: {ws.tenant_id})" for ws in matches) + console.print( + f"[red]Workspace '{identifier}' is ambiguous; it matches multiple workspaces.[/red]\n" + "[yellow]Re-run with a unique workspace slug or tenant_id:[/yellow]\n" + f"{candidates}" + ) + raise typer.Exit(1) + + +async def _resolve_workspace_to_tenant_id(identifier: str) -> str: + """Resolve a human workspace identifier (slug/name/tenant_id) to a tenant UUID. + + Constraint: the cloud's X-Workspace-ID resolver only accepts a workspace/tenant + UUID, but users see slugs and display names (in list-workspaces output and + memory:// URLs). So when --workspace (or the configured default) is not already + a UUID, fetch the caller's workspaces once and map it to the tenant_id here. + + get_available_workspaces is the same workspace-fetch seam project.py uses for + CLI workspace resolution; it is awaited directly here because the share + commands already run inside their own event loop. + """ + from basic_memory.mcp.project_context import get_available_workspaces + + workspaces = await get_available_workspaces() + match = _match_workspace_identifier(workspaces, identifier) + if match is None: + available = "\n".join(f" - {ws.slug}" for ws in workspaces) + console.print( + f"[red]Workspace '{identifier}' was not found.[/red]\n" + "[yellow]Use one of these workspace slugs (or a tenant_id):[/yellow]\n" + f"{available}" + ) + raise typer.Exit(1) + return match.tenant_id + + +async def _workspace_headers( + *, + project_name: Optional[str] = None, + workspace: Optional[str] = None, +) -> dict[str, str]: + """Resolve the target workspace and build the routing header, if any. + + Resolution chain (see resolve_configured_workspace): explicit --workspace, + then the project's configured workspace_id, then the global default. Returns + an empty dict when nothing resolves so the request falls back to the + caller's default tenant exactly as before. + + The cloud's X-Workspace-ID resolver only accepts a workspace/tenant UUID. A + UUID is forwarded verbatim (covers per-project config workspace_id values and + the default chain, zero extra API calls); any other value is treated as a + human identifier and resolved to the tenant UUID via one workspace lookup. + """ + resolved = resolve_configured_workspace(project_name=project_name, workspace=workspace) + if resolved is None: + return {} + if _is_uuid(resolved): + return {WORKSPACE_ID_HEADER: resolved} + tenant_id = await _resolve_workspace_to_tenant_id(resolved) + return {WORKSPACE_ID_HEADER: tenant_id} + + +def _format_timestamp(iso_timestamp: Optional[str]) -> str: + """Format an ISO timestamp to a human-readable form, or '-' when absent.""" + if not iso_timestamp: + return "-" + try: + dt = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00")) + return dt.strftime("%Y-%m-%d %H:%M:%S") + except (ValueError, AttributeError): + return iso_timestamp + + +def _parse_expires_at(value: str) -> str: + """Validate an --expires-at value and normalize it to an ISO 8601 string. + + Accepts either a full ISO timestamp ("2025-12-31T23:59:00") or a bare date + ("2025-12-31"). Exits with a clear error on anything we can't parse so the + server never sees a malformed payload. + """ + try: + dt = datetime.fromisoformat(value) + except ValueError: + console.print( + f"[red]Invalid --expires-at value '{value}'. " + "Use ISO format, e.g. 2025-12-31 or 2025-12-31T23:59:00.[/red]" + ) + raise typer.Exit(1) + return dt.isoformat() + + +def _print_share_details(data: dict) -> None: + """Print a single share's fields in the snapshot-style detail layout.""" + console.print(f" Token: {data.get('token', 'unknown')}") + console.print(f" URL: [blue underline]{data.get('share_url', '-')}[/blue underline]") + console.print(f" Project: {data.get('project_name', '-')}") + console.print(f" Note: {data.get('note_permalink', '-')}") + console.print(f" Enabled: {'yes' if data.get('enabled', False) else 'no'}") + console.print(f" Expires: {_format_timestamp(data.get('expires_at'))}") + console.print(f" Views: {data.get('view_count', 0)}") + console.print(f" Created: {_format_timestamp(data.get('created_at'))}") + + +@share_app.command("create") +def create( + project: str = typer.Argument( + ..., + help="Name of the project the note belongs to", + ), + permalink: str = typer.Argument( + ..., + help="Permalink of the note to share", + ), + expires_at: Optional[str] = typer.Option( + None, + "--expires-at", + "-e", + help="Optional expiration date/time (ISO 8601, e.g. 2025-12-31)", + ), + workspace: Optional[str] = typer.Option( + None, + "--workspace", + help="Workspace to route to: a workspace slug, display name, or tenant ID", + ), +) -> None: + """Create a public share link for a note. + + Examples: + bm cloud share create my-project notes/my-idea + bm cloud share create my-project notes/my-idea --expires-at 2025-12-31 + bm cloud share create my-project notes/my-idea --workspace acme + """ + + # Validate --expires-at before any async/API work so a parse error surfaces + # a single clean message and exits, rather than being re-wrapped by the broad + # handler below as "Unexpected error: 1" (typer.Exit subclasses Exception). + payload: dict = { + "project_name": project, + "note_permalink": permalink, + } + if expires_at is not None: + payload["expires_at"] = _parse_expires_at(expires_at) + + async def _create(): + try: + config_manager = ConfigManager() + config = config_manager.config + host_url = config.cloud_host.rstrip("/") + + console.print("[blue]Creating share link...[/blue]") + + response = await make_api_request( + method="POST", + url=f"{host_url}/api/shares", + json_data=payload, + headers=await _workspace_headers(project_name=project, workspace=workspace), + ) + + data = response.json() + + console.print("[green]Share link created successfully[/green]") + _print_share_details(data) + + except typer.Exit: + raise + except SubscriptionRequiredError as e: + console.print("\n[red]Subscription Required[/red]\n") + console.print(f"[yellow]{e.args[0]}[/yellow]\n") + console.print(f"Subscribe at: [blue underline]{e.subscribe_url}[/blue underline]\n") + raise typer.Exit(1) + except CloudAPIError as e: + if e.status_code == 404: + console.print(f"[red]Note not found: {permalink} (project: {project})[/red]") + else: + console.print(f"[red]Failed to create share link: {e}[/red]") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]Unexpected error: {e}[/red]") + raise typer.Exit(1) + + asyncio.run(_create()) + + +@share_app.command("list") +def list_shares( + project: Optional[str] = typer.Option( + None, + "--project", + "-p", + help="Filter shares by project name", + ), + workspace: Optional[str] = typer.Option( + None, + "--workspace", + help="Workspace to route to: a workspace slug, display name, or tenant ID", + ), +) -> None: + """List public share links. + + Examples: + bm cloud share list + bm cloud share list --project my-project + bm cloud share list --project my-project --workspace acme + """ + + async def _list(): + try: + config_manager = ConfigManager() + config = config_manager.config + host_url = config.cloud_host.rstrip("/") + + url = f"{host_url}/api/shares" + if project: + # Encode the filter so project names with query-reserved + # characters (&, +, #, spaces) reach the server intact rather + # than being parsed as extra query parameters. + url += f"?{urlencode({'project_name': project})}" + + console.print("[blue]Fetching share links...[/blue]") + + response = await make_api_request( + method="GET", + url=url, + headers=await _workspace_headers(project_name=project, workspace=workspace), + ) + + data = response.json() + shares = data.get("shares", []) + total = data.get("total", len(shares)) + + if not shares: + console.print("[yellow]No share links found[/yellow]") + console.print( + "\n[dim]Create a share with: bm cloud share create [/dim]" + ) + return + + table = Table(title=f"Public Shares ({total} total)") + table.add_column("Token", style="cyan", no_wrap=True) + table.add_column("Project", style="yellow") + table.add_column("Note", style="white") + table.add_column("Enabled", style="green") + table.add_column("Expires", style="green") + table.add_column("Views", style="magenta", justify="right") + table.add_column("URL", style="blue", overflow="fold") + + for share in shares: + table.add_row( + share.get("token", "unknown"), + share.get("project_name", "-"), + share.get("note_permalink", "-"), + "yes" if share.get("enabled", False) else "no", + _format_timestamp(share.get("expires_at")), + str(share.get("view_count", 0)), + share.get("share_url", "-"), + ) + + console.print(table) + + # Re-raise typer.Exit before the broad handler below: workspace resolution + # raises typer.Exit (a subclass of Exception) for ambiguous/unknown + # identifiers, and that must not be re-wrapped as "Unexpected error: 1". + except typer.Exit: + raise + except SubscriptionRequiredError as e: + console.print("\n[red]Subscription Required[/red]\n") + console.print(f"[yellow]{e.args[0]}[/yellow]\n") + console.print(f"Subscribe at: [blue underline]{e.subscribe_url}[/blue underline]\n") + raise typer.Exit(1) + except CloudAPIError as e: + console.print(f"[red]Failed to list share links: {e}[/red]") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]Unexpected error: {e}[/red]") + raise typer.Exit(1) + + asyncio.run(_list()) + + +@share_app.command("update") +def update( + token: str = typer.Argument( + ..., + help="The token of the share to update", + ), + enable: bool = typer.Option( + False, + "--enable", + help="Enable the share link", + ), + disable: bool = typer.Option( + False, + "--disable", + help="Disable the share link without deleting it", + ), + expires_at: Optional[str] = typer.Option( + None, + "--expires-at", + "-e", + help="New expiration date/time (ISO 8601). Use 'none' to clear it.", + ), + workspace: Optional[str] = typer.Option( + None, + "--workspace", + help="Workspace the share belongs to: a workspace slug, display name, or tenant ID", + ), +) -> None: + """Update a share link: enable/disable it or change its expiration. + + Examples: + bm cloud share update abc123 --disable + bm cloud share update abc123 --enable + bm cloud share update abc123 --expires-at 2026-01-01 + bm cloud share update abc123 --expires-at none + bm cloud share update abc123 --disable --workspace acme + """ + + async def _update(): + try: + # --- Validate flags --- + # Trigger: both toggles passed, or neither toggle and no expiry change. + # Why: PATCH needs at least one concrete field, and enable/disable + # conflict; reject up front so we don't send an empty/ambiguous body. + if enable and disable: + console.print("[red]Cannot use --enable and --disable together[/red]") + raise typer.Exit(1) + if not enable and not disable and expires_at is None: + console.print( + "[red]Nothing to update. Pass --enable, --disable, or --expires-at.[/red]" + ) + raise typer.Exit(1) + + payload: dict = {} + if enable: + payload["enabled"] = True + if disable: + payload["enabled"] = False + if expires_at is not None: + # "none" clears the expiration; anything else is parsed as a date. + payload["expires_at"] = ( + None if expires_at.lower() == "none" else _parse_expires_at(expires_at) + ) + + config_manager = ConfigManager() + config = config_manager.config + host_url = config.cloud_host.rstrip("/") + + console.print("[blue]Updating share link...[/blue]") + + response = await make_api_request( + method="PATCH", + url=f"{host_url}/api/shares/{token}", + json_data=payload, + headers=await _workspace_headers(workspace=workspace), + ) + + data = response.json() + + console.print("[green]Share link updated successfully[/green]") + _print_share_details(data) + + except typer.Exit: + raise + except SubscriptionRequiredError as e: + console.print("\n[red]Subscription Required[/red]\n") + console.print(f"[yellow]{e.args[0]}[/yellow]\n") + console.print(f"Subscribe at: [blue underline]{e.subscribe_url}[/blue underline]\n") + raise typer.Exit(1) + except CloudAPIError as e: + if e.status_code == 404: + console.print(f"[red]Share not found: {token}[/red]") + else: + console.print(f"[red]Failed to update share link: {e}[/red]") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]Unexpected error: {e}[/red]") + raise typer.Exit(1) + + asyncio.run(_update()) + + +@share_app.command("revoke") +def revoke( + token: str = typer.Argument( + ..., + help="The token of the share to revoke", + ), + force: bool = typer.Option( + False, + "--force", + "-f", + help="Skip confirmation prompt", + ), + workspace: Optional[str] = typer.Option( + None, + "--workspace", + help="Workspace the share belongs to: a workspace slug, display name, or tenant ID", + ), +) -> None: + """Revoke (delete) a public share link. + + Examples: + bm cloud share revoke abc123 + bm cloud share revoke abc123 --force + bm cloud share revoke abc123 --force --workspace acme + """ + + async def _revoke(): + try: + config_manager = ConfigManager() + config = config_manager.config + host_url = config.cloud_host.rstrip("/") + + if not force: + confirmed = typer.confirm(f"Are you sure you want to revoke share '{token}'?") + if not confirmed: + console.print("[yellow]Revocation cancelled[/yellow]") + raise typer.Exit(0) + + console.print("[blue]Revoking share link...[/blue]") + + await make_api_request( + method="DELETE", + url=f"{host_url}/api/shares/{token}", + headers=await _workspace_headers(workspace=workspace), + ) + + console.print(f"[green]Share {token} revoked successfully[/green]") + + except typer.Exit: + raise + except SubscriptionRequiredError as e: + console.print("\n[red]Subscription Required[/red]\n") + console.print(f"[yellow]{e.args[0]}[/yellow]\n") + console.print(f"Subscribe at: [blue underline]{e.subscribe_url}[/blue underline]\n") + raise typer.Exit(1) + except CloudAPIError as e: + if e.status_code == 404: + console.print(f"[red]Share not found: {token}[/red]") + else: + console.print(f"[red]Failed to revoke share link: {e}[/red]") + raise typer.Exit(1) + except Exception as e: + console.print(f"[red]Unexpected error: {e}[/red]") + raise typer.Exit(1) + + asyncio.run(_revoke()) diff --git a/tests/cli/test_share_commands.py b/tests/cli/test_share_commands.py new file mode 100644 index 00000000..4b9f3499 --- /dev/null +++ b/tests/cli/test_share_commands.py @@ -0,0 +1,999 @@ +"""Tests for cloud share CLI commands. + +Issue #880: Tests for share create, list, update, revoke commands that surface +the cloud /api/shares endpoints. +""" + +from unittest.mock import AsyncMock, Mock, patch + +import httpx +from typer.testing import CliRunner + +from basic_memory.cli.app import app +from basic_memory.cli.commands.cloud.api_client import ( + CloudAPIError, + SubscriptionRequiredError, +) +from basic_memory.schemas.cloud import WorkspaceInfo + +# A real workspace/tenant UUID is forwarded verbatim as X-Workspace-ID with no +# workspace lookup; the cloud's resolver only accepts this UUID form. +TENANT_UUID = "5ccbae40-ca03-43a2-b23d-9931eb130e22" + + +def _workspace(slug: str, tenant_id: str, name: str) -> WorkspaceInfo: + """Build a WorkspaceInfo for workspace-resolution tests.""" + return WorkspaceInfo( + tenant_id=tenant_id, + workspace_type="organization", + slug=slug, + name=name, + role="owner", + is_default=False, + ) + + +def _patch_available_workspaces(workspaces): + """Patch the workspace list fetch used when --workspace is a slug/name. + + Asserts can wrap the returned mock to confirm the lookup was (or was not) + performed for a given invocation. + """ + return patch( + "basic_memory.mcp.project_context.get_available_workspaces", + new=AsyncMock(return_value=workspaces), + ) + + +SHARE_RESPONSE = { + "id": "11111111-1111-1111-1111-111111111111", + "token": "abc123", + "project_name": "my-project", + "note_permalink": "notes/my-idea", + "note_external_id": "ext-1", + "enabled": True, + "expires_at": None, + "share_url": "https://share.example.com/abc123", + "view_count": 0, + "last_viewed_at": None, + "created_at": "2025-01-18T12:00:00Z", +} + + +def _mock_config_manager(): + mock_config = Mock() + mock_config.cloud_host = "https://cloud.example.com" + mock_config_manager = Mock() + mock_config_manager.config = mock_config + return mock_config_manager + + +def _patch_workspace(resolved): + """Patch the workspace resolver used by the share commands. + + Returns whatever ``resolved`` is for every lookup, so tests can assert the + X-Workspace-ID header is built (and routed) the way the cloud expects + without depending on real config files. + """ + return patch( + "basic_memory.cli.commands.cloud.shares.resolve_configured_workspace", + return_value=resolved, + ) + + +class TestShareCreateCommand: + """Tests for 'bm cloud share create' command.""" + + def test_create_share_success(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["json_data"] = kwargs.get("json_data") + captured["headers"] = kwargs.get("headers") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace(TENANT_UUID): + with _patch_available_workspaces([]) as fetch: + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/my-idea"] + ) + + assert result.exit_code == 0 + assert "Share link created successfully" in result.stdout + assert "abc123" in result.stdout + assert "https://share.example.com/abc123" in result.stdout + # Payload should match the cloud CreateShareRequest contract. + assert captured["json_data"] == { + "project_name": "my-project", + "note_permalink": "notes/my-idea", + } + # Workspace routing: a resolved tenant UUID travels verbatim as the + # X-Workspace-ID header so team-workspace projects aren't evaluated + # against the caller's default tenant. + assert captured["headers"] == {"X-Workspace-ID": TENANT_UUID} + # A UUID needs no resolution: the workspace list is never fetched. + fetch.assert_not_called() + + def test_create_share_slug_resolves_to_tenant_uuid(self): + """A --workspace slug is resolved to the tenant UUID before routing.""" + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["headers"] = kwargs.get("headers") + return mock_response + + seen = {} + + def fake_resolve(*, project_name=None, workspace=None): + seen["project_name"] = project_name + seen["workspace"] = workspace + return workspace + + workspaces = [ + _workspace("basic-memory-7020de4e925843c68c9056c60d101d9e", TENANT_UUID, "Acme Org"), + _workspace("other-slug", "11111111-1111-1111-1111-111111111111", "Other"), + ] + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with patch( + "basic_memory.cli.commands.cloud.shares.resolve_configured_workspace", + side_effect=fake_resolve, + ): + with _patch_available_workspaces(workspaces) as fetch: + result = runner.invoke( + app, + [ + "cloud", + "share", + "create", + "my-project", + "notes/my-idea", + "--workspace", + "basic-memory-7020de4e925843c68c9056c60d101d9e", + ], + ) + + assert result.exit_code == 0 + assert seen == { + "project_name": "my-project", + "workspace": "basic-memory-7020de4e925843c68c9056c60d101d9e", + } + # The slug was mapped to the workspace's tenant UUID. + assert captured["headers"] == {"X-Workspace-ID": TENANT_UUID} + fetch.assert_awaited_once() + + def test_create_share_display_name_resolves_case_insensitively(self): + """A --workspace display name resolves case-insensitively to the tenant UUID.""" + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["headers"] = kwargs.get("headers") + return mock_response + + workspaces = [_workspace("acme-slug", TENANT_UUID, "Acme Org")] + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace("acme org"): + with _patch_available_workspaces(workspaces): + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/my-idea"] + ) + + assert result.exit_code == 0 + assert captured["headers"] == {"X-Workspace-ID": TENANT_UUID} + + def test_create_share_tenant_id_input_passthrough(self): + """A tenant UUID resolved from config is forwarded without a lookup.""" + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["headers"] = kwargs.get("headers") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace(TENANT_UUID): + with _patch_available_workspaces([]) as fetch: + result = runner.invoke( + app, + [ + "cloud", + "share", + "create", + "my-project", + "notes/my-idea", + "--workspace", + TENANT_UUID, + ], + ) + + assert result.exit_code == 0 + assert captured["headers"] == {"X-Workspace-ID": TENANT_UUID} + fetch.assert_not_called() + + def test_create_share_ambiguous_name_errors_with_candidate_slugs(self): + """A display name matching multiple workspaces errors and lists candidates.""" + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): # pragma: no cover + raise AssertionError("API should not be called when workspace is ambiguous") + + workspaces = [ + _workspace("acme-prod", TENANT_UUID, "Acme"), + _workspace("acme-staging", "11111111-1111-1111-1111-111111111111", "Acme"), + ] + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace("Acme"): + with _patch_available_workspaces(workspaces): + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/my-idea"] + ) + + assert result.exit_code == 1 + assert "ambiguous" in result.stdout + # Candidate slugs are listed so the user can disambiguate. + assert "acme-prod" in result.stdout + assert "acme-staging" in result.stdout + # The typer.Exit must not be re-wrapped by the broad handler. + assert "Unexpected error" not in result.stdout + + def test_create_share_unknown_workspace_errors_with_available_slugs(self): + """An unknown identifier errors and lists the available workspace slugs.""" + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): # pragma: no cover + raise AssertionError("API should not be called for an unknown workspace") + + workspaces = [ + _workspace("acme-prod", TENANT_UUID, "Acme"), + _workspace("widget-co", "11111111-1111-1111-1111-111111111111", "Widget Co"), + ] + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace("does-not-exist"): + with _patch_available_workspaces(workspaces): + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/my-idea"] + ) + + assert result.exit_code == 1 + assert "was not found" in result.stdout + assert "acme-prod" in result.stdout + assert "widget-co" in result.stdout + assert "Unexpected error" not in result.stdout + + def test_create_share_no_workspace_sends_no_header(self): + """When nothing resolves, no routing header is added (default tenant).""" + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["headers"] = kwargs.get("headers") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace(None): + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/my-idea"] + ) + + assert result.exit_code == 0 + assert captured["headers"] == {} + + def test_create_share_with_expires_at(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["json_data"] = kwargs.get("json_data") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, + [ + "cloud", + "share", + "create", + "my-project", + "notes/my-idea", + "--expires-at", + "2025-12-31", + ], + ) + + assert result.exit_code == 0 + assert captured["json_data"]["expires_at"].startswith("2025-12-31") + + def test_create_share_invalid_expires_at(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): # pragma: no cover + raise AssertionError("API should not be called on invalid input") + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, + [ + "cloud", + "share", + "create", + "my-project", + "notes/my-idea", + "--expires-at", + "not-a-date", + ], + ) + + assert result.exit_code == 1 + assert "Invalid --expires-at" in result.stdout + # A parse error must produce a single clean message, not a + # spurious "Unexpected error: 1" from the broad handler + # re-catching typer.Exit. See issue #880 review. + assert "Unexpected error" not in result.stdout + + def test_create_share_note_not_found(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): + raise CloudAPIError("Not found", status_code=404) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/missing"] + ) + + assert result.exit_code == 1 + assert "Note not found" in result.stdout + + def test_create_share_subscription_required(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): + raise SubscriptionRequiredError( + message="Active subscription required", + subscribe_url="https://basicmemory.com/subscribe", + ) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/my-idea"] + ) + + assert result.exit_code == 1 + assert "Subscription Required" in result.stdout + + def test_create_share_api_error(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): + raise CloudAPIError("Server error", status_code=500) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, ["cloud", "share", "create", "my-project", "notes/my-idea"] + ) + + assert result.exit_code == 1 + assert "Failed to create share link" in result.stdout + + +class TestShareListCommand: + """Tests for 'bm cloud share list' command.""" + + def test_list_shares_success(self): + # Wide terminal so the rich table doesn't truncate cell contents. + runner = CliRunner(env={"COLUMNS": "200"}) + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = { + "shares": [ + SHARE_RESPONSE, + { + **SHARE_RESPONSE, + "token": "def456", + "note_permalink": "notes/second", + "enabled": False, + "expires_at": "2025-12-31T00:00:00Z", + "view_count": 7, + }, + ], + "total": 2, + } + + async def mock_make_api_request(*args, **kwargs): + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "list"]) + + assert result.exit_code == 0 + assert "abc123" in result.stdout + assert "def456" in result.stdout + assert "notes/second" in result.stdout + + def test_list_shares_empty(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = {"shares": [], "total": 0} + + async def mock_make_api_request(*args, **kwargs): + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "list"]) + + assert result.exit_code == 0 + assert "No share links found" in result.stdout + + def test_list_shares_with_project_filter(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = {"shares": [SHARE_RESPONSE], "total": 1} + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["url"] = kwargs.get("url", args[1] if len(args) > 1 else "") + captured["headers"] = kwargs.get("headers") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace(TENANT_UUID): + result = runner.invoke( + app, ["cloud", "share", "list", "--project", "my-project"] + ) + + assert result.exit_code == 0 + assert "project_name=my-project" in captured["url"] + assert captured["headers"] == {"X-Workspace-ID": TENANT_UUID} + + def test_list_shares_unknown_workspace_errors_without_double_error(self): + """An unknown --workspace on list errors cleanly (no 'Unexpected error'). + + Exercises the list handler's typer.Exit re-raise: workspace resolution + raises typer.Exit, which must not be re-wrapped by the broad handler. + """ + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): # pragma: no cover + raise AssertionError("API should not be called for an unknown workspace") + + workspaces = [_workspace("acme-prod", TENANT_UUID, "Acme")] + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace("does-not-exist"): + with _patch_available_workspaces(workspaces): + result = runner.invoke(app, ["cloud", "share", "list"]) + + assert result.exit_code == 1 + assert "was not found" in result.stdout + assert "acme-prod" in result.stdout + assert "Unexpected error" not in result.stdout + + def test_list_shares_ambiguous_slug_errors_with_candidates(self): + """A slug colliding across workspaces errors and lists candidates. + + Exercises the slug tier of _match_workspace_identifier raising on >1 match. + """ + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): # pragma: no cover + raise AssertionError("API should not be called when the slug is ambiguous") + + workspaces = [ + _workspace("shared-slug", TENANT_UUID, "Acme"), + _workspace("shared-slug", "11111111-1111-1111-1111-111111111111", "Widget"), + ] + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace("shared-slug"): + with _patch_available_workspaces(workspaces): + result = runner.invoke(app, ["cloud", "share", "list"]) + + assert result.exit_code == 1 + assert "ambiguous" in result.stdout + assert TENANT_UUID in result.stdout + assert "Unexpected error" not in result.stdout + + def test_list_shares_project_filter_url_encoded(self): + """Project names with query-reserved chars must be percent-encoded. + + A name like "R&D+notes #1" interpolated raw would split into bogus + query params (project_name=R, plus a stray "D+notes #1" key); encoding + keeps it a single faithful project_name value. + """ + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = {"shares": [SHARE_RESPONSE], "total": 1} + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["url"] = kwargs.get("url", args[1] if len(args) > 1 else "") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace(None): + result = runner.invoke( + app, ["cloud", "share", "list", "--project", "R&D+notes #1"] + ) + + assert result.exit_code == 0 + # Reserved characters are percent-encoded into a single value. + assert "project_name=R%26D%2Bnotes+%231" in captured["url"] + # And the raw, ambiguous form never reaches the wire. + assert "project_name=R&D" not in captured["url"] + + def test_list_shares_api_error(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): + raise CloudAPIError("Server error", status_code=500) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "list"]) + + assert result.exit_code == 1 + assert "Failed to list share links" in result.stdout + + +class TestShareUpdateCommand: + """Tests for 'bm cloud share update' command.""" + + def test_update_disable(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = {**SHARE_RESPONSE, "enabled": False} + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["json_data"] = kwargs.get("json_data") + captured["method"] = kwargs.get("method") + captured["headers"] = kwargs.get("headers") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace(TENANT_UUID): + result = runner.invoke( + app, + ["cloud", "share", "update", "abc123", "--disable", "--workspace", "acme"], + ) + + assert result.exit_code == 0 + assert "updated successfully" in result.stdout + assert captured["method"] == "PATCH" + assert captured["json_data"] == {"enabled": False} + assert captured["headers"] == {"X-Workspace-ID": TENANT_UUID} + + def test_update_enable(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["json_data"] = kwargs.get("json_data") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "update", "abc123", "--enable"]) + + assert result.exit_code == 0 + assert captured["json_data"] == {"enabled": True} + + def test_update_expires_at(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["json_data"] = kwargs.get("json_data") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, + ["cloud", "share", "update", "abc123", "--expires-at", "2026-01-01"], + ) + + assert result.exit_code == 0 + assert captured["json_data"]["expires_at"].startswith("2026-01-01") + + def test_update_clear_expires_at(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 200 + mock_response.json.return_value = SHARE_RESPONSE + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["json_data"] = kwargs.get("json_data") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, ["cloud", "share", "update", "abc123", "--expires-at", "none"] + ) + + assert result.exit_code == 0 + assert captured["json_data"] == {"expires_at": None} + + def test_update_enable_and_disable_conflict(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): # pragma: no cover + raise AssertionError("API should not be called on conflicting flags") + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke( + app, + ["cloud", "share", "update", "abc123", "--enable", "--disable"], + ) + + assert result.exit_code == 1 + assert "Cannot use --enable and --disable together" in result.stdout + + def test_update_nothing_to_change(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): # pragma: no cover + raise AssertionError("API should not be called with empty update") + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "update", "abc123"]) + + assert result.exit_code == 1 + assert "Nothing to update" in result.stdout + + def test_update_not_found(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): + raise CloudAPIError("Not found", status_code=404) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "update", "missing", "--disable"]) + + assert result.exit_code == 1 + assert "Share not found" in result.stdout + + +class TestShareRevokeCommand: + """Tests for 'bm cloud share revoke' command.""" + + def test_revoke_success_with_force(self): + runner = CliRunner() + + mock_response = Mock(spec=httpx.Response) + mock_response.status_code = 204 + mock_response.json.return_value = {} + + captured = {} + + async def mock_make_api_request(*args, **kwargs): + captured["method"] = kwargs.get("method") + captured["url"] = kwargs.get("url") + captured["headers"] = kwargs.get("headers") + return mock_response + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + with _patch_workspace(TENANT_UUID): + result = runner.invoke( + app, + ["cloud", "share", "revoke", "abc123", "--force", "--workspace", "acme"], + ) + + assert result.exit_code == 0 + assert "revoked successfully" in result.stdout + assert captured["method"] == "DELETE" + assert captured["url"].endswith("/api/shares/abc123") + assert captured["headers"] == {"X-Workspace-ID": TENANT_UUID} + + def test_revoke_cancelled(self): + runner = CliRunner() + + call_count = 0 + + async def mock_make_api_request(*args, **kwargs): + nonlocal call_count + call_count += 1 + return Mock(spec=httpx.Response) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "revoke", "abc123"], input="n\n") + + assert result.exit_code == 0 + assert "cancelled" in result.stdout + assert call_count == 0 + + def test_revoke_not_found(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): + raise CloudAPIError("Not found", status_code=404) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "revoke", "missing", "--force"]) + + assert result.exit_code == 1 + assert "Share not found" in result.stdout + + def test_revoke_subscription_required(self): + runner = CliRunner() + + async def mock_make_api_request(*args, **kwargs): + raise SubscriptionRequiredError( + message="Active subscription required", + subscribe_url="https://basicmemory.com/subscribe", + ) + + with patch( + "basic_memory.cli.commands.cloud.shares.make_api_request", + side_effect=mock_make_api_request, + ): + with patch( + "basic_memory.cli.commands.cloud.shares.ConfigManager", + return_value=_mock_config_manager(), + ): + result = runner.invoke(app, ["cloud", "share", "revoke", "abc123", "--force"]) + + assert result.exit_code == 1 + assert "Subscription Required" in result.stdout