diff --git a/frontend/app.py b/frontend/app.py index 5118778..1557b4e 100644 --- a/frontend/app.py +++ b/frontend/app.py @@ -14,6 +14,11 @@ POST /api/extract-functions — parse Python code for async functions GET /api/env — list .env vars (values masked) POST /api/env — upsert vars into .env {vars: {KEY: VALUE}} +GET /api/files — list a directory inside a mounted root (?root=&path=) +POST /api/files/mkdir — create a directory {root, path} +POST /api/files/upload — multipart upload (root, path, file) +GET /api/files/download — download a file (?root=&path=) +DELETE /api/files — delete a file/dir (?root=&path=&recursive=) POST /api/restart — send SIGTERM to restart server GET /api/config — UI feature flags (e.g. web_terminal) WS /ws/terminal — interactive PTY terminal (optional ?cmd=…) @@ -37,12 +42,21 @@ import textwrap import threading import traceback -from pathlib import Path +from pathlib import Path, PurePosixPath from typing import Any import yaml -from fastapi import FastAPI, HTTPException, Request, WebSocket, WebSocketDisconnect -from fastapi.responses import HTMLResponse, JSONResponse, StreamingResponse +from fastapi import ( + FastAPI, + File, + Form, + HTTPException, + Request, + UploadFile, + WebSocket, + WebSocketDisconnect, +) +from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, StreamingResponse from config import CONFIG_DIR, ENV_FILE, FILES_DIR, REPOS_DIR @@ -553,9 +567,48 @@ def _safe_local_openapi_path(source: str) -> str: return str(candidate) -def create_app(config_dir: Path | None = None, env_file: Path | None = None) -> "FastAPI": +# --------------------------------------------------------------------------- +# File manager helpers +# --------------------------------------------------------------------------- +# +# The /api/files endpoints let the UI manage the volume-mounted directories +# (tools, files, repos) — e.g. create /app/tools/secrets and upload +# client_secret.json into it. Like the rest of the UI there is no auth: this +# is intended for a trusted, single-user/local admin UI. + +MAX_UPLOAD_BYTES = int(os.environ.get("MCPPROXY_MAX_UPLOAD_BYTES", 50 * 1024 * 1024)) + + +def _resolve_in_root(roots: dict[str, Path], root: str, rel: str) -> Path: + """Resolve ``rel`` inside the whitelisted root, rejecting escapes. + + Resolves symlinks before checking containment, so both ``..`` traversal + and symlinks pointing outside the root raise HTTPException(400). + """ + base = roots.get(root) + if base is None: + raise HTTPException(400, f"Unknown root {root!r} (expected one of {sorted(roots)})") + base = base.resolve() + target = (base / (rel or "").lstrip("/")).resolve() + try: + target.relative_to(base) + except ValueError: + raise HTTPException(400, f"Path escapes the {root} directory: {rel!r}") + return target + + +def create_app( + config_dir: Path | None = None, + env_file: Path | None = None, + file_roots: dict[str, Path] | None = None, +) -> "FastAPI": _config_dir = config_dir or CONFIG_DIR _env_file = env_file or ENV_FILE + _file_roots = file_roots or { + "tools": _config_dir, + "files": FILES_DIR, + "repos": REPOS_DIR, + } app = FastAPI(title="mcpproxy UI", docs_url=None, redoc_url=None) @@ -938,6 +991,133 @@ async def set_env(request: Request) -> dict: _write_env_file(_env_file, updates) return {"ok": True, "written": list(updates.keys())} + # ── File manager ───────────────────────────────────────────────────────── + # + # Browse / mkdir / upload / download / delete inside the volume-mounted + # roots (tools, files, repos). Paths are always relative to a whitelisted + # root and validated with resolve()+relative_to() — see _resolve_in_root. + + @app.get("/api/files") + async def list_dir(root: str = "tools", path: str = "") -> dict: + target = _resolve_in_root(_file_roots, root, path) + entries: list[dict[str, Any]] = [] + if target.is_dir(): + base = _file_roots[root].resolve() + for entry in target.iterdir(): + if entry.is_symlink(): + etype = "symlink" + elif entry.is_dir(): + etype = "directory" + else: + etype = "file" + try: + stat = entry.stat() if etype != "symlink" else entry.lstat() + size = stat.st_size if etype == "file" else 0 + mtime = stat.st_mtime + except OSError: + size, mtime = 0, 0 + entries.append({ + "name": entry.name, + "path": entry.relative_to(base).as_posix(), + "type": etype, + "size": size, + "mtime": mtime, + }) + entries.sort(key=lambda e: (e["type"] != "directory", e["name"].lower())) + return { + "ok": True, + "root": root, + "path": path, + "roots": sorted(_file_roots), + "entries": entries, + } + + @app.post("/api/files/mkdir") + async def make_dir(request: Request) -> dict: + body = await request.json() + root = (body.get("root") or "tools").strip() + rel = (body.get("path") or "").strip() + if not rel.strip("/"): + raise HTTPException(400, "path is required") + target = _resolve_in_root(_file_roots, root, rel) + if target.exists() and not target.is_dir(): + raise HTTPException(400, f"A file already exists at {rel!r}") + target.mkdir(parents=True, exist_ok=True) + return {"ok": True, "path": target.relative_to(_file_roots[root].resolve()).as_posix()} + + @app.post("/api/files/upload") + async def upload_file( + root: str = Form("tools"), + path: str = Form(""), + file: UploadFile = File(...), + ) -> dict: + """Upload a file into ``path`` (a directory) under the given root. + + The client-supplied filename is reduced to its basename, so it cannot + carry path segments. Existing files are overwritten — this is a + trusted single-admin UI and re-uploading a corrected file is the + common case. + """ + name = Path(file.filename or "").name + if not name or name in (".", ".."): + raise HTTPException(400, "Invalid filename") + target_dir = _resolve_in_root(_file_roots, root, path) + if target_dir.exists() and not target_dir.is_dir(): + raise HTTPException(400, f"Upload target {path!r} is not a directory") + target_dir.mkdir(parents=True, exist_ok=True) + target = target_dir / name + size = 0 + try: + with target.open("wb") as out: + while chunk := await file.read(1024 * 1024): + size += len(chunk) + if size > MAX_UPLOAD_BYTES: + raise HTTPException( + 413, f"File exceeds the upload limit ({MAX_UPLOAD_BYTES} bytes)" + ) + out.write(chunk) + except HTTPException: + target.unlink(missing_ok=True) + raise + rel = target.relative_to(_file_roots[root].resolve()).as_posix() + return {"ok": True, "path": rel, "size": size} + + @app.get("/api/files/download") + async def download_file(root: str = "tools", path: str = "") -> FileResponse: + target = _resolve_in_root(_file_roots, root, path) + if not target.is_file(): + raise HTTPException(404, f"File not found: {path!r}") + return FileResponse(target, filename=target.name) + + @app.delete("/api/files") + async def delete_path(root: str = "tools", path: str = "", recursive: bool = False) -> dict: + rel = PurePosixPath(path.strip("/")) + if not rel.name or rel.name in (".", ".."): + raise HTTPException(400, "path is required (cannot delete the root itself)") + # Validate the parent directory, then lstat the final component: a + # symlink pointing outside the root would fail _resolve_in_root, but + # deleting the link itself is safe — remove it without following it. + parent = _resolve_in_root(_file_roots, root, rel.parent.as_posix()) + target = parent / rel.name + if target.is_symlink(): + target.unlink() + return {"ok": True} + if not target.exists(): + raise HTTPException(404, f"Not found: {path!r}") + if target.is_dir(): + if recursive: + shutil.rmtree(target) + else: + try: + target.rmdir() + except OSError: + raise HTTPException( + 400, f"Directory {path!r} is not empty (pass recursive=true)" + ) + else: + target.unlink() + return {"ok": True} + # ── OpenAI-compatible tool endpoints ───────────────────────────────────── # # These endpoints let OpenAI-style callers (e.g. OpenWebUI tool servers) @@ -1277,6 +1457,10 @@ async def index(): + + MCP :8888  |  UI :8889 @@ -1826,6 +2010,67 @@ async def index(): + + + + + + @@ -1839,6 +2084,9 @@ async def index(): let currentProvider = null; // the structured JSON object being edited let codeEditor = null; // CodeMirror instance for the code block let secretsModal = null, wizModal = null, termModal = null; +let filesModal = null, ttModal = null; +let filesRoot = 'tools', filesPath = '', filesRoots = ['tools', 'files', 'repos']; +let ttTools = [], ttSelected = null; // tool tester: /v1/tools entries + selected name let webTerminalEnabled = false; let term = null, termFit = null, termSock = null; // xterm.js terminal state let wzType = null; // 'code' | 'package' | 'repository' | 'remote' | 'rest' @@ -1875,7 +2123,11 @@ async def index(): secretsModal = new bootstrap.Modal('#secrets-modal'); wizModal = new bootstrap.Modal('#wizard-modal'); termModal = new bootstrap.Modal('#terminal-modal'); + filesModal = new bootstrap.Modal('#files-modal'); + ttModal = new bootstrap.Modal('#tooltest-modal'); document.getElementById('terminal-modal').addEventListener('hidden.bs.modal', closeTerminal); + document.getElementById('files-list').addEventListener('click', filesListClick); + document.getElementById('tt-list').addEventListener('click', ttListClick); // Wizard: live function detection as the user types into the code textarea document.getElementById('wz-code-input').addEventListener('input', () => { clearTimeout(_wzAnalyzeDebounce); @@ -3682,6 +3934,320 @@ async def index(): if (currentName) await openProvider(currentName); } +// ───────────────────────────────────────────────────────────────────────────── +// File manager (/api/files — browse / mkdir / upload / download / delete) +// ───────────────────────────────────────────────────────────────────────────── +async function openFiles() { + filesModal.show(); + await filesRefresh(); +} + +function filesSetRoot(root) { + filesRoot = root; + filesPath = ''; + filesRefresh(); +} + +function filesNavigate(path) { + filesPath = path; + filesRefresh(); +} + +function _fmtSize(n) { + if (n >= 1048576) return (n / 1048576).toFixed(1) + ' MB'; + if (n >= 1024) return (n / 1024).toFixed(1) + ' KB'; + return n + ' B'; +} + +async function filesRefresh() { + let data; + try { + data = await api('GET', `/api/files?root=${encodeURIComponent(filesRoot)}&path=${encodeURIComponent(filesPath)}`); + } catch (e) { toast(e.message, false); return; } + filesRoots = data.roots; + + const sel = document.getElementById('files-root'); + sel.innerHTML = filesRoots.map(r => + ``).join(''); + + // Breadcrumb: root + each path segment is clickable. + const segs = filesPath ? filesPath.split('/') : []; + let crumbs = ``; + let acc = ''; + for (const s of segs) { + acc = acc ? acc + '/' + s : s; + crumbs += ``; + } + const crumbsEl = document.getElementById('files-crumbs'); + crumbsEl.innerHTML = crumbs; + crumbsEl.querySelectorAll('a[data-nav]').forEach(a => a.addEventListener('click', ev => { + ev.preventDefault(); + filesNavigate(a.dataset.nav); + })); + + const list = document.getElementById('files-list'); + if (!data.entries.length) { + list.innerHTML = '
Empty directory.
'; + return; + } + list.innerHTML = data.entries.map(e => { + const icon = e.type === 'directory' ? '📁' : e.type === 'symlink' ? '🔗' : '📄'; + const size = e.type === 'file' ? `${_fmtSize(e.size)}` : ''; + return `
+ ${icon} ${esc(e.name)} + ${size} + + +
`; + }).join(''); +} + +// Single delegated click handler — avoids quoting issues with arbitrary filenames. +function filesListClick(ev) { + const delBtn = ev.target.closest('[data-del]'); + if (delBtn) { + const row = delBtn.closest('.provider-item'); + filesDelete(delBtn.dataset.del, row.dataset.type === 'directory'); + return; + } + const row = ev.target.closest('.provider-item'); + if (!row) return; + if (row.dataset.type === 'directory') { + filesNavigate(row.dataset.path); + } else if (row.dataset.type === 'file') { + window.open(`/api/files/download?root=${encodeURIComponent(filesRoot)}&path=${encodeURIComponent(row.dataset.path)}`, '_blank'); + } +} + +async function filesMkdir() { + const name = prompt('New folder name (e.g. secrets):'); + if (!name) return; + const path = filesPath ? `${filesPath}/${name}` : name; + try { + await api('POST', '/api/files/mkdir', {root: filesRoot, path}); + toast(`Created ${path}`); + await filesRefresh(); + } catch (e) { toast(e.message, false); } +} + +async function filesUpload(fileList) { + // Multipart upload — must NOT go through api() (it forces a JSON content type). + for (const file of fileList) { + const form = new FormData(); + form.append('root', filesRoot); + form.append('path', filesPath); + form.append('file', file); + try { + const r = await fetch('/api/files/upload', {method: 'POST', body: form}); + const data = await r.json().catch(() => ({detail: r.statusText})); + if (!r.ok) throw new Error(data.detail || r.statusText); + toast(`Uploaded ${data.path}`); + } catch (e) { toast(`${file.name}: ${e.message}`, false); } + } + document.getElementById('files-upload-input').value = ''; + await filesRefresh(); +} + +async function filesDelete(path, isDir) { + if (!confirm(`Delete ${path}?`)) return; + let recursive = false; + try { + await api('DELETE', `/api/files?root=${encodeURIComponent(filesRoot)}&path=${encodeURIComponent(path)}`); + } catch (e) { + if (isDir && e.message.includes('not empty')) { + if (!confirm(`${path} is not empty — delete it and everything inside?`)) return; + recursive = true; + } else { toast(e.message, false); return; } + } + if (recursive) { + try { + await api('DELETE', `/api/files?root=${encodeURIComponent(filesRoot)}&path=${encodeURIComponent(path)}&recursive=true`); + } catch (e) { toast(e.message, false); return; } + } + toast(`Deleted ${path}`); + await filesRefresh(); +} + +// ───────────────────────────────────────────────────────────────────────────── +// Tool tester (/v1/tools list + /v1/tools/{name}/invoke) +// ───────────────────────────────────────────────────────────────────────────── +async function openToolTester() { + ttModal.show(); + try { + const r = await api('GET', '/v1/tools'); + ttTools = r.tools || []; + } catch (e) { toast(e.message, false); ttTools = []; } + ttRenderList(); + if (ttSelected && !ttTools.some(t => t.function.name === ttSelected)) { + ttSelected = null; + document.getElementById('tt-detail').innerHTML = + '
Select a tool to test it.
'; + } +} + +function ttRenderList() { + const q = (document.getElementById('tt-search').value || '').toLowerCase(); + const listEl = document.getElementById('tt-list'); + const tools = ttTools.filter(t => t.function.name.toLowerCase().includes(q)); + if (!ttTools.length) { + listEl.innerHTML = `
No tools registered.

+ The registry is populated at server startup — after provider changes, + restart and reopen this dialog.

+ +
`; + return; + } + if (!tools.length) { + listEl.innerHTML = '
No tools match the filter.
'; + return; + } + // Group by provider prefix (advertised names look like provider__tool). + const groups = {}; + for (const t of tools) { + const name = t.function.name; + const prov = name.includes('__') ? name.split('__')[0] : '(other)'; + (groups[prov] = groups[prov] || []).push(t); + } + let out = ''; + for (const prov of Object.keys(groups).sort()) { + out += `
${esc(prov)}
`; + out += groups[prov].map(t => { + const name = t.function.name; + const short = name.includes('__') ? name.split('__').slice(1).join('__') : name; + return `
+ ${esc(short)} +
`; + }).join(''); + } + listEl.innerHTML = out; +} + +function ttListClick(ev) { + const row = ev.target.closest('[data-tool]'); + if (!row) return; + ttSelected = row.dataset.tool; + ttRenderList(); + ttRenderForm(); +} + +function ttRenderForm() { + const tool = ttTools.find(t => t.function.name === ttSelected); + if (!tool) return; + const schema = tool.function.parameters || {}; + const props = schema.properties || {}; + const required = new Set(schema.required || []); + let fields = ''; + for (const [name, def] of Object.entries(props)) { + const isReq = required.has(name); + const badge = `${isReq ? 'required' : 'optional'}`; + const help = def.description + ? `
${esc(def.description)}
` : ''; + const type = def.type || (def.enum ? 'string' : 'json'); + let input; + if (def.enum) { + const blank = isReq ? '' : ''; + input = `'; + } else if (type === 'boolean') { + input = `
+ + +
`; + } else if (type === 'number' || type === 'integer') { + input = ``; + } else if (type === 'string') { + input = ``; + } else { // object / array / unknown — raw JSON + const placeholder = JSON.stringify(def.default ?? (type === 'array' ? [] : {}), null, 2); + input = ``; + } + fields += `
+ + ${input}${help} +
`; + } + if (!fields) fields = '
This tool takes no arguments.
'; + document.getElementById('tt-detail').innerHTML = ` +
${esc(tool.function.name)}
+
${esc(tool.function.description)}
+
Arguments
${fields}
+ +
`; +} + +function ttCollectArgs() { + const tool = ttTools.find(t => t.function.name === ttSelected); + const required = new Set((tool.function.parameters || {}).required || []); + const args = {}; + for (const el of document.querySelectorAll('#tt-form .tt-field')) { + const name = el.dataset.name, kind = el.dataset.kind; + if (kind === 'boolean') { args[name] = el.checked; continue; } + const raw = el.value; + if (raw === '') { // empty optional fields are omitted so handler defaults apply + if (required.has(name)) throw new Error(`'${name}' is required`); + continue; + } + if (kind === 'integer') args[name] = parseInt(raw, 10); + else if (kind === 'number') args[name] = Number(raw); + else if (kind === 'json') { + try { args[name] = JSON.parse(raw); } + catch { throw new Error(`'${name}' is not valid JSON`); } + } else args[name] = raw; + } + return args; +} + +async function ttInvoke() { + let args; + try { args = ttCollectArgs(); } + catch (e) { toast(e.message, false); return; } + const btn = document.getElementById('tt-invoke-btn'); + const resultEl = document.getElementById('tt-result'); + btn.disabled = true; + btn.innerHTML = 'Running…'; + resultEl.innerHTML = ''; + try { + const res = await api('POST', `/v1/tools/${encodeURIComponent(ttSelected)}/invoke`, {arguments: args}); + ttRenderResult(res); + } catch (e) { + ttRenderResult({content: [{type: 'text', text: e.message}], is_error: true}); + } finally { + btn.disabled = false; + btn.innerHTML = '▶ Invoke'; + } +} + +function ttRenderResult(res) { + const el = document.getElementById('tt-result'); + const border = res.is_error ? 'border-left:3px solid var(--red)' : 'border-left:3px solid var(--green)'; + const badge = res.is_error ? '✗ error' + : '✓ ok'; + el.innerHTML = `
+
Result ${badge}
+
`; + const body = document.getElementById('tt-result-body'); + for (const item of (res.content || [])) { + const pre = document.createElement('pre'); + pre.style.cssText = 'white-space:pre-wrap;word-break:break-word;font-size:.8em;margin:0 0 8px;color:#cdd6f4;max-height:50vh;overflow:auto'; + if (item.type === 'text') { + let text = item.text; + try { text = JSON.stringify(JSON.parse(item.text), null, 2); } catch {} + pre.textContent = text; // textContent — tool output is untrusted + } else { + pre.textContent = JSON.stringify(item, null, 2); + } + body.appendChild(pre); + } + if (!(res.content || []).length) body.innerHTML = '(empty result)'; +} + // ───────────────────────────────────────────────────────────────────────────── // Helpers // ───────────────────────────────────────────────────────────────────────────── diff --git a/requirements.txt b/requirements.txt index abad7c9..b4329d3 100755 --- a/requirements.txt +++ b/requirements.txt @@ -2,5 +2,6 @@ fastmcp pyyaml personalcapital2 fastapi +python-multipart uvicorn[standard] httpx diff --git a/tests/test_frontend.py b/tests/test_frontend.py index a8af607..4a6f39e 100644 --- a/tests/test_frontend.py +++ b/tests/test_frontend.py @@ -1512,3 +1512,183 @@ def test_full_wizard_sequence(self, app, tools_dir, tmp_path, monkeypatch): assert set(create["body_params"]) == {"name", "age"} # Secret env keys surface for the wizard's Secrets step. assert set(r.json()["secret_keys"]) >= {"DEMO_ID", "DEMO_SECRET"} + + +# --------------------------------------------------------------------------- +# File manager endpoints +# --------------------------------------------------------------------------- + +@pytest.fixture() +def file_roots(tmp_path: Path, tools_dir: Path) -> dict: + files_dir = tmp_path / "files" + repos_dir = tmp_path / "repos" + files_dir.mkdir() + repos_dir.mkdir() + return {"tools": tools_dir, "files": files_dir, "repos": repos_dir} + + +@pytest.fixture() +def files_client(tools_dir, env_path, file_roots): + app = create_app(config_dir=tools_dir, env_file=env_path, file_roots=file_roots) + return TestClient(app) + + +class TestFilesAPI: + def test_list_empty_root(self, files_client): + r = files_client.get("/api/files", params={"root": "tools"}) + assert r.status_code == 200 + data = r.json() + assert data["ok"] is True + assert data["entries"] == [] + assert set(data["roots"]) == {"tools", "files", "repos"} + + def test_list_entries_shape_and_order(self, files_client, file_roots): + root = file_roots["tools"] + (root / "zfile.txt").write_text("hello") + (root / "adir").mkdir() + r = files_client.get("/api/files", params={"root": "tools"}) + entries = r.json()["entries"] + # Directories sort first. + assert [e["name"] for e in entries] == ["adir", "zfile.txt"] + f = entries[1] + assert f["type"] == "file" and f["size"] == 5 and f["path"] == "zfile.txt" + assert entries[0]["type"] == "directory" + + def test_mkdir_nested_and_list_into(self, files_client, file_roots): + r = files_client.post("/api/files/mkdir", json={"root": "tools", "path": "secrets/inner"}) + assert r.status_code == 200, r.text + assert (file_roots["tools"] / "secrets" / "inner").is_dir() + r = files_client.get("/api/files", params={"root": "tools", "path": "secrets"}) + assert [e["name"] for e in r.json()["entries"]] == ["inner"] + + def test_mkdir_empty_path_rejected(self, files_client): + r = files_client.post("/api/files/mkdir", json={"root": "tools", "path": ""}) + assert r.status_code == 400 + + def test_upload_download_delete_roundtrip(self, files_client, file_roots): + files_client.post("/api/files/mkdir", json={"root": "tools", "path": "secrets"}) + r = files_client.post( + "/api/files/upload", + data={"root": "tools", "path": "secrets"}, + files={"file": ("client_secret.json", b'{"installed": {}}', "application/json")}, + ) + assert r.status_code == 200, r.text + assert r.json()["path"] == "secrets/client_secret.json" + on_disk = file_roots["tools"] / "secrets" / "client_secret.json" + assert on_disk.read_bytes() == b'{"installed": {}}' + + r = files_client.get( + "/api/files/download", params={"root": "tools", "path": "secrets/client_secret.json"} + ) + assert r.status_code == 200 + assert r.content == b'{"installed": {}}' + assert "client_secret.json" in r.headers["content-disposition"] + + r = files_client.delete( + "/api/files", params={"root": "tools", "path": "secrets/client_secret.json"} + ) + assert r.status_code == 200 + assert not on_disk.exists() + + def test_upload_creates_target_dir(self, files_client, file_roots): + r = files_client.post( + "/api/files/upload", + data={"root": "files", "path": "new/sub"}, + files={"file": ("a.txt", b"x", "text/plain")}, + ) + assert r.status_code == 200, r.text + assert (file_roots["files"] / "new" / "sub" / "a.txt").exists() + + def test_upload_filename_sanitized_to_basename(self, files_client, file_roots): + r = files_client.post( + "/api/files/upload", + data={"root": "tools", "path": ""}, + files={"file": ("../evil.txt", b"x", "text/plain")}, + ) + assert r.status_code == 200, r.text + assert (file_roots["tools"] / "evil.txt").exists() + assert not (file_roots["tools"].parent / "evil.txt").exists() + + def test_upload_size_limit(self, files_client, file_roots, monkeypatch): + import frontend.app as app_module + monkeypatch.setattr(app_module, "MAX_UPLOAD_BYTES", 10) + r = files_client.post( + "/api/files/upload", + data={"root": "tools", "path": ""}, + files={"file": ("big.bin", b"x" * 100, "application/octet-stream")}, + ) + assert r.status_code == 413 + assert not (file_roots["tools"] / "big.bin").exists() + + def test_download_directory_rejected(self, files_client, file_roots): + (file_roots["tools"] / "adir").mkdir() + r = files_client.get("/api/files/download", params={"root": "tools", "path": "adir"}) + assert r.status_code == 404 + + def test_delete_root_rejected(self, files_client): + for path in ("", "/", "."): + r = files_client.delete("/api/files", params={"root": "tools", "path": path}) + assert r.status_code == 400, path + + def test_delete_nonempty_dir_requires_recursive(self, files_client, file_roots): + d = file_roots["tools"] / "full" + d.mkdir() + (d / "x.txt").write_text("x") + r = files_client.delete("/api/files", params={"root": "tools", "path": "full"}) + assert r.status_code == 400 + r = files_client.delete( + "/api/files", params={"root": "tools", "path": "full", "recursive": "true"} + ) + assert r.status_code == 200 + assert not d.exists() + + def test_absolute_path_treated_as_root_relative(self, files_client): + # Leading slashes are stripped: "/etc/passwd" means "/etc/passwd", + # which does not exist — the real /etc/passwd is never touched. + r = files_client.get("/api/files/download", params={"root": "tools", "path": "/etc/passwd"}) + assert r.status_code == 404 + + def test_traversal_rejected(self, files_client): + for path in ("../../etc", "a/../../etc"): + for method, url, kwargs in ( + ("get", "/api/files", {"params": {"root": "tools", "path": path}}), + ("post", "/api/files/mkdir", {"json": {"root": "tools", "path": path}}), + ("get", "/api/files/download", {"params": {"root": "tools", "path": path}}), + ("delete", "/api/files", {"params": {"root": "tools", "path": path}}), + ): + r = getattr(files_client, method)(url, **kwargs) + assert r.status_code == 400, (method, url, path, r.text) + + def test_unknown_root_rejected(self, files_client): + r = files_client.get("/api/files", params={"root": "bogus"}) + assert r.status_code == 400 + + def test_symlink_escape_rejected_but_link_deletable(self, files_client, file_roots, tmp_path): + outside = tmp_path / "outside.txt" + outside.write_text("secret") + link = file_roots["tools"] / "leak" + link.symlink_to(outside) + # Reading through the link is rejected (resolves outside the root)... + r = files_client.get("/api/files/download", params={"root": "tools", "path": "leak"}) + assert r.status_code == 400 + # ...but it is listed as a symlink and the link itself can be removed. + entries = files_client.get("/api/files", params={"root": "tools"}).json()["entries"] + assert entries[0]["type"] == "symlink" + r = files_client.delete("/api/files", params={"root": "tools", "path": "leak"}) + assert r.status_code == 200 + assert not link.is_symlink() and outside.exists() + + def test_default_roots_use_config_dir(self, client, tools_dir): + # create_app without file_roots maps "tools" to the injected config_dir. + (tools_dir / "p.yaml").write_text("tools: []\n") + r = client.get("/api/files", params={"root": "tools"}) + assert r.status_code == 200 + assert [e["name"] for e in r.json()["entries"]] == ["p.yaml"] + + +class TestNewUISmoke: + def test_index_contains_files_and_tooltester_ui(self, client): + html = client.get("/").text + for needle in ("files-modal", "tooltest-modal", "openFiles()", "openToolTester()", + "filesUpload", "ttInvoke"): + assert needle in html, needle