From 288b59bc3724d1a5b428b3dd91f79da72334cf2d Mon Sep 17 00:00:00 2001 From: jack Date: Mon, 22 Jun 2026 22:44:19 +0800 Subject: [PATCH 1/2] =?UTF-8?q?feat(kanban):=20board=20webhooks=20?= =?UTF-8?q?=E2=80=94=20registration=20+=20HMAC=20outbound=20delivery?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Register webhooks on the DB board and deliver kanban events to external URLs, signed with HMAC-SHA256. Implements D2 (next-features-design.md §3). - migration 0017_kanban_webhooks: kanban_webhooks (url, secret, event filter, board scope) + kanban_webhook_deliveries (queue + retry ledger). Coexists with B1's 0016. - handlers/kanban/webhook.rs: list / create / delete (owner-admin only). Create returns the plaintext secret ONCE (then a mask). HTTPS-only + SSRF guard (rejects loopback/private hosts). 20-per-workspace cap. `enqueue_event` queues a delivery for every enabled, subscribed webhook (board-scoped or all-boards). - card.rs: enqueue on create / move / archive (the key lifecycle events). - tasks/webhook_delivery.rs: worker that picks due deliveries, signs the JSON body (X-JType-Signature: sha256=…), POSTs via reqwest, and records the outcome with exponential backoff (dead after max_attempts). Spawned from run_from_env. - hmac crate; routes; module registration. - api.ts + KanbanWebhook(Created) types; Kanban.tsx Webhooks dialog (list / create with event + scope pickers / one-time secret reveal / delete). - tests: webhook_crud_and_enqueue (integration, MySQL) + HMAC known-vector unit test + hex_encode. Verified: cargo check + 39 kanban_tests pass against MySQL (create / secret-once / non-https reject / list / enqueue-on-card-create / delete + the 0017 migration); HMAC signing matches a known RFC-style vector; root + web tsc clean. Notes: migration 0017 (B1 takes 0016). Follow-ups: patch / rotate-secret / test / deliveries-history endpoints + UI; enqueue at patch/restore/delete sites; DNS- rebind hardening; realtime delivery-status events. The worker's live HTTP POST is covered by code + the HMAC unit test (an end-to-end POST needs a receiver). Co-Authored-By: Claude Opus 4.8 --- services/jtype-web/Cargo.lock | 1 + services/jtype-web/Cargo.toml | 1 + services/jtype-web/frontend/src/api.ts | 22 ++ .../jtype-web/frontend/src/pages/Kanban.tsx | 97 ++++++- .../migrations/0017_kanban_webhooks.down.sql | 2 + .../migrations/0017_kanban_webhooks.up.sql | 42 +++ services/jtype-web/src/db/migrations.rs | 7 + .../jtype-web/src/handlers/kanban/card.rs | 27 ++ services/jtype-web/src/handlers/kanban/mod.rs | 1 + .../jtype-web/src/handlers/kanban/webhook.rs | 266 ++++++++++++++++++ services/jtype-web/src/lib.rs | 10 + services/jtype-web/src/tasks/mod.rs | 1 + .../jtype-web/src/tasks/webhook_delivery.rs | 162 +++++++++++ services/jtype-web/tests/kanban_tests.rs | 52 ++++ 14 files changed, 690 insertions(+), 1 deletion(-) create mode 100644 services/jtype-web/migrations/0017_kanban_webhooks.down.sql create mode 100644 services/jtype-web/migrations/0017_kanban_webhooks.up.sql create mode 100644 services/jtype-web/src/handlers/kanban/webhook.rs create mode 100644 services/jtype-web/src/tasks/webhook_delivery.rs diff --git a/services/jtype-web/Cargo.lock b/services/jtype-web/Cargo.lock index 01dcdc1..55bf97a 100644 --- a/services/jtype-web/Cargo.lock +++ b/services/jtype-web/Cargo.lock @@ -1013,6 +1013,7 @@ dependencies = [ "bytes", "futures", "hex", + "hmac", "lettre", "mime_guess", "object_store", diff --git a/services/jtype-web/Cargo.toml b/services/jtype-web/Cargo.toml index e49b660..279ddc8 100644 --- a/services/jtype-web/Cargo.toml +++ b/services/jtype-web/Cargo.toml @@ -16,6 +16,7 @@ pulldown-cmark = "0.13" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } rand_core = { version = "0.6", features = ["std"] } rust-embed = "8" +hmac = "0.12" serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10" diff --git a/services/jtype-web/frontend/src/api.ts b/services/jtype-web/frontend/src/api.ts index 3cd80e7..abe79a1 100644 --- a/services/jtype-web/frontend/src/api.ts +++ b/services/jtype-web/frontend/src/api.ts @@ -332,6 +332,12 @@ export const api = { request(`/api/v1/workspaces/${workspaceId}/kanban/cards/${cardId}`, { method: 'DELETE' }), listTrash: (workspaceId: string, boardId: string) => request(`/api/v1/workspaces/${workspaceId}/kanban/boards/${boardId}/trash`), + listWebhooks: (workspaceId: string) => + request(`/api/v1/workspaces/${workspaceId}/kanban/webhooks`), + createWebhook: (workspaceId: string, data: { name: string; targetUrl: string; boardId?: string | null; eventTypes: string[] }) => + request(`/api/v1/workspaces/${workspaceId}/kanban/webhooks`, { method: 'POST', body: JSON.stringify(data) }), + deleteWebhook: (workspaceId: string, webhookId: string) => + request(`/api/v1/workspaces/${workspaceId}/kanban/webhooks/${webhookId}`, { method: 'DELETE' }), listLabels: (workspaceId: string, boardId: string) => request(`/api/v1/workspaces/${workspaceId}/kanban/boards/${boardId}/labels`), @@ -825,6 +831,22 @@ export interface KanbanBoardFull extends KanbanBoardSummary { labels: KanbanLabel[] } +export interface KanbanWebhook { + id: string + boardId?: string | null + name: string + targetUrl: string + eventTypes: string[] + enabled: boolean + secretMasked: string + lastDeliveryAt?: string | null + lastStatus?: string | null + createdAt: string +} +export interface KanbanWebhookCreated extends KanbanWebhook { + secret: string +} + export interface KanbanTrashItem { id: string cardId: string diff --git a/services/jtype-web/frontend/src/pages/Kanban.tsx b/services/jtype-web/frontend/src/pages/Kanban.tsx index 225c182..b019018 100644 --- a/services/jtype-web/frontend/src/pages/Kanban.tsx +++ b/services/jtype-web/frontend/src/pages/Kanban.tsx @@ -1,7 +1,7 @@ import { useState, useEffect, useCallback, useRef, useMemo } from 'react' import { useParams, useNavigate } from 'react-router-dom' import { Dialog, DialogPanel, Menu, MenuButton, MenuItems, MenuItem } from '@headlessui/react' -import { PlusIcon, EllipsisHorizontalIcon, TrashIcon, ArchiveBoxIcon, ArrowUturnLeftIcon, TagIcon, XMarkIcon, ChevronDownIcon } from '@heroicons/react/24/outline' +import { PlusIcon, EllipsisHorizontalIcon, TrashIcon, ArchiveBoxIcon, ArrowUturnLeftIcon, TagIcon, XMarkIcon, ChevronDownIcon, BoltIcon } from '@heroicons/react/24/outline' import { api, setSessionId, @@ -11,6 +11,8 @@ import { type KanbanLabel, type KanbanTrashItem, type KanbanPriority, + type KanbanWebhook, + type KanbanWebhookCreated, type UpdateKanbanCardRequest, type MemberInfo, } from '../api' @@ -40,6 +42,7 @@ export function Kanban() { const [error, setError] = useState('') const [showLabels, setShowLabels] = useState(false) const [showTrash, setShowTrash] = useState(false) + const [showWebhooks, setShowWebhooks] = useState(false) const [view, setView] = useState>({}) const { sessionId: wsSessionId, subscribe: wsSubscribe, status: wsStatus } = useWorkspaceSocket(workspaceId) @@ -360,6 +363,7 @@ export function Kanban() {
+ @@ -395,6 +399,9 @@ export function Kanban() { {showTrash && board && workspaceId && ( setShowTrash(false)} onChanged={reload} /> )} + {showWebhooks && workspaceId && ( + setShowWebhooks(false)} /> + )} +
+ {error &&

{error}

} + {revealed && ( +
+
Signing secret — shown once, copy it now:
+ {revealed.secret} +
+ )} +
    + {hooks.map((h) => ( +
  • +
    +
    {h.name}
    +
    {h.targetUrl}
    +
    {h.eventTypes.join(', ')}{h.boardId ? ' · this board' : ' · all boards'}{h.lastStatus ? ` · last: ${h.lastStatus}` : ''}
    +
    + +
  • + ))} + {hooks.length === 0 &&
  • No webhooks yet.
  • } +
+
{ e.preventDefault(); void create() }} className="space-y-2 border-t border-zinc-100 pt-3"> + setName(e.target.value)} /> + setUrl(e.target.value)} /> +
+ {WEBHOOK_EVENTS.map((ev) => ( + + ))} +
+
+ + {boardId && } +
+
+ +
+
+ + + + ) +} diff --git a/services/jtype-web/migrations/0017_kanban_webhooks.down.sql b/services/jtype-web/migrations/0017_kanban_webhooks.down.sql new file mode 100644 index 0000000..3cf9de2 --- /dev/null +++ b/services/jtype-web/migrations/0017_kanban_webhooks.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS kanban_webhook_deliveries; +DROP TABLE IF EXISTS kanban_webhooks; diff --git a/services/jtype-web/migrations/0017_kanban_webhooks.up.sql b/services/jtype-web/migrations/0017_kanban_webhooks.up.sql new file mode 100644 index 0000000..dcd4d65 --- /dev/null +++ b/services/jtype-web/migrations/0017_kanban_webhooks.up.sql @@ -0,0 +1,42 @@ +CREATE TABLE kanban_webhooks ( + id CHAR(36) NOT NULL, + workspace_id CHAR(36) NOT NULL, + board_id CHAR(36) NULL, + name VARCHAR(160) NOT NULL, + target_url VARCHAR(2048) NOT NULL, + secret CHAR(64) NOT NULL, + event_types JSON NOT NULL, + enabled TINYINT(1) NOT NULL DEFAULT 1, + created_by_user_id CHAR(36) NOT NULL, + last_delivery_at TIMESTAMP NULL, + last_status VARCHAR(32) NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + KEY idx_webhooks_workspace (workspace_id), + KEY idx_webhooks_enabled (workspace_id, enabled), + CONSTRAINT kanban_webhooks_workspace_fk FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE, + CONSTRAINT kanban_webhooks_board_fk FOREIGN KEY (board_id) REFERENCES kanban_boards(id) ON DELETE CASCADE, + CONSTRAINT kanban_webhooks_creator_fk FOREIGN KEY (created_by_user_id) REFERENCES users(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; + +CREATE TABLE kanban_webhook_deliveries ( + id CHAR(36) NOT NULL, + webhook_id CHAR(36) NOT NULL, + workspace_id CHAR(36) NOT NULL, + event_type VARCHAR(64) NOT NULL, + payload JSON NOT NULL, + status ENUM('pending','succeeded','failed','dead') NOT NULL DEFAULT 'pending', + attempt_count INT NOT NULL DEFAULT 0, + max_attempts INT NOT NULL DEFAULT 6, + last_status_code INT NULL, + last_error VARCHAR(512) NULL, + next_retry_at TIMESTAMP NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + KEY idx_deliveries_webhook (webhook_id), + KEY idx_deliveries_due (status, next_retry_at), + CONSTRAINT kanban_deliveries_webhook_fk FOREIGN KEY (webhook_id) REFERENCES kanban_webhooks(id) ON DELETE CASCADE, + CONSTRAINT kanban_deliveries_workspace_fk FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; diff --git a/services/jtype-web/src/db/migrations.rs b/services/jtype-web/src/db/migrations.rs index b3f7704..ce3b7c9 100644 --- a/services/jtype-web/src/db/migrations.rs +++ b/services/jtype-web/src/db/migrations.rs @@ -98,6 +98,13 @@ fn all_migrations() -> Vec { up: include_str!("../../migrations/0015_login_otp.up.sql"), down: include_str!("../../migrations/0015_login_otp.down.sql"), }, + // 0016 (kanban_comments) lands on its own branch; webhooks take 0017. + Migration { + version: 17, + name: "kanban_webhooks", + up: include_str!("../../migrations/0017_kanban_webhooks.up.sql"), + down: include_str!("../../migrations/0017_kanban_webhooks.down.sql"), + }, ] } diff --git a/services/jtype-web/src/handlers/kanban/card.rs b/services/jtype-web/src/handlers/kanban/card.rs index f419a58..afbd816 100644 --- a/services/jtype-web/src/handlers/kanban/card.rs +++ b/services/jtype-web/src/handlers/kanban/card.rs @@ -274,6 +274,15 @@ pub async fn create_card( ) .await; + super::webhook::enqueue_event( + &state.pool, + &workspace_id, + &board_id, + "kanban:card-updated", + json!({ "event": "kanban:card-updated", "cardId": card_id, "boardId": board_id }), + ) + .await; + // Re-fetch with DB timestamps fetch_card_response(&state, &workspace_id, &card_id).await } @@ -721,6 +730,15 @@ pub async fn move_card( ) .await; + super::webhook::enqueue_event( + &state.pool, + &workspace_id, + &card.board_id, + "kanban:card-updated", + json!({ "event": "kanban:card-updated", "cardId": card.id, "boardId": card.board_id }), + ) + .await; + Ok(Json(card).into_response()) } @@ -853,6 +871,15 @@ pub async fn archive_card( ) .await; + super::webhook::enqueue_event( + &state.pool, + &workspace_id, + &board_id, + "kanban:card-archived", + json!({ "event": "kanban:card-archived", "cardId": card_id, "boardId": board_id }), + ) + .await; + Ok(Json(json!({ "id": trash_id, "cardId": card_id, diff --git a/services/jtype-web/src/handlers/kanban/mod.rs b/services/jtype-web/src/handlers/kanban/mod.rs index 11f7c25..d54c7b3 100644 --- a/services/jtype-web/src/handlers/kanban/mod.rs +++ b/services/jtype-web/src/handlers/kanban/mod.rs @@ -12,6 +12,7 @@ pub mod board; pub mod card; pub mod column; pub mod label; +pub mod webhook; use sqlx::Row; diff --git a/services/jtype-web/src/handlers/kanban/webhook.rs b/services/jtype-web/src/handlers/kanban/webhook.rs new file mode 100644 index 0000000..29cf589 --- /dev/null +++ b/services/jtype-web/src/handlers/kanban/webhook.rs @@ -0,0 +1,266 @@ +//! Kanban webhook registration + outbound delivery enqueue (DB board). +//! +//! Endpoints (owner/admin only): +//! GET /api/v1/workspaces/:workspace_id/kanban/webhooks +//! POST /api/v1/workspaces/:workspace_id/kanban/webhooks +//! DELETE /api/v1/workspaces/:workspace_id/kanban/webhooks/:webhook_id +//! +//! On create the plaintext `secret` is returned ONCE; thereafter only a mask. +//! `enqueue_event` is called from the card handlers' broadcast sites to queue +//! deliveries; the `tasks::webhook_delivery` worker signs (HMAC-SHA256) and POSTs. + +use axum::{ + extract::{Path, State}, + http::HeaderMap, + response::{IntoResponse, Response}, + Json, +}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use sqlx::{MySql, Pool, Row}; +use uuid::Uuid; + +use super::clamp_str; +use crate::error::AppError; +use crate::handlers::workspace::require_workspace_role; +use crate::middleware::auth::extract_user; +use crate::AppState; + +const MAX_NAME: usize = 160; +const MAX_URL: usize = 2048; +const MAX_WEBHOOKS_PER_WORKSPACE: i64 = 20; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KanbanWebhook { + pub id: String, + pub board_id: Option, + pub name: String, + pub target_url: String, + pub event_types: Vec, + pub enabled: bool, + pub secret_masked: String, + pub last_delivery_at: Option, + pub last_status: Option, + pub created_at: String, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KanbanWebhookCreated { + #[serde(flatten)] + pub webhook: KanbanWebhook, + /// Plaintext secret — returned only on create. + pub secret: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateKanbanWebhookRequest { + pub name: String, + pub target_url: String, + #[serde(default)] + pub board_id: Option, + #[serde(default)] + pub event_types: Vec, +} + +fn row_to_webhook(r: &sqlx::mysql::MySqlRow) -> Result { + let event_types: JsonValue = r.try_get("event_types")?; + let events = match event_types { + JsonValue::Array(a) => a.into_iter().filter_map(|v| v.as_str().map(String::from)).collect(), + _ => Vec::new(), + }; + Ok(KanbanWebhook { + id: r.try_get("id")?, + board_id: r.try_get("board_id")?, + name: r.try_get("name")?, + target_url: r.try_get("target_url")?, + event_types: events, + enabled: r.try_get::("enabled")? != 0, + secret_masked: "whsec_••••".into(), + last_delivery_at: r.try_get("last_delivery_at")?, + last_status: r.try_get("last_status")?, + created_at: r.try_get("created_at")?, + }) +} + +const SELECT_WEBHOOK: &str = r#"SELECT id, board_id, name, target_url, event_types, enabled, + CAST(last_delivery_at AS CHAR) AS last_delivery_at, last_status, + CAST(created_at AS CHAR) AS created_at +FROM kanban_webhooks"#; + +/// Reject obvious SSRF targets (internal/loopback hosts) and non-HTTPS URLs. +fn validate_target_url(url: &str) -> Result<(), AppError> { + let lower = url.to_ascii_lowercase(); + if !lower.starts_with("https://") { + return Err(AppError::BadRequest("target_url must be https://".into())); + } + let host = lower["https://".len()..] + .split(['/', ':', '?']) + .next() + .unwrap_or(""); + let blocked = ["localhost", "127.", "0.0.0.0", "10.", "192.168.", "169.254.", "[::1]", "[fc", "[fd"]; + if host.is_empty() + || blocked.iter().any(|b| host.starts_with(b)) + || (host.starts_with("172.") + && host + .split('.') + .nth(1) + .and_then(|o| o.parse::().ok()) + .is_some_and(|n| (16..=31).contains(&n))) + { + return Err(AppError::BadRequest("target_url host is not allowed".into())); + } + Ok(()) +} + +pub async fn list_webhooks( + State(state): State, + headers: HeaderMap, + Path(workspace_id): Path, +) -> Result { + let user = extract_user(&state.pool, &headers).await?; + require_workspace_role(&state.pool, &workspace_id, &user.id, &["owner", "admin"]).await?; + let rows = sqlx::query(&format!("{SELECT_WEBHOOK} WHERE workspace_id = ? ORDER BY created_at DESC")) + .bind(&workspace_id) + .fetch_all(&state.pool) + .await?; + let out = rows.iter().map(row_to_webhook).collect::, _>>()?; + Ok(Json(out).into_response()) +} + +pub async fn create_webhook( + State(state): State, + headers: HeaderMap, + Path(workspace_id): Path, + Json(payload): Json, +) -> Result { + let user = extract_user(&state.pool, &headers).await?; + require_workspace_role(&state.pool, &workspace_id, &user.id, &["owner", "admin"]).await?; + + let name = clamp_str(payload.name.trim(), MAX_NAME); + if name.is_empty() { + return Err(AppError::BadRequest("name cannot be empty".into())); + } + let target_url = clamp_str(payload.target_url.trim(), MAX_URL); + validate_target_url(&target_url)?; + let events: Vec = payload + .event_types + .iter() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + if events.is_empty() { + return Err(AppError::BadRequest("event_types cannot be empty".into())); + } + if let Some(b) = &payload.board_id { + let ok: Option = sqlx::query_scalar("SELECT id FROM kanban_boards WHERE id = ? AND workspace_id = ?") + .bind(b) + .bind(&workspace_id) + .fetch_optional(&state.pool) + .await?; + if ok.is_none() { + return Err(AppError::BadRequest("board_id not found in workspace".into())); + } + } + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM kanban_webhooks WHERE workspace_id = ?") + .bind(&workspace_id) + .fetch_one(&state.pool) + .await?; + if count >= MAX_WEBHOOKS_PER_WORKSPACE { + return Err(AppError::BadRequest("too many webhooks for this workspace".into())); + } + + let id = Uuid::new_v4().to_string(); + let secret = format!("{}{}", Uuid::new_v4().simple(), Uuid::new_v4().simple()); + let events_json = serde_json::to_value(&events).unwrap_or(JsonValue::Array(vec![])); + sqlx::query( + "INSERT INTO kanban_webhooks (id, workspace_id, board_id, name, target_url, secret, event_types, created_by_user_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&workspace_id) + .bind(&payload.board_id) + .bind(&name) + .bind(&target_url) + .bind(&secret) + .bind(&events_json) + .bind(&user.id) + .execute(&state.pool) + .await?; + + let row = sqlx::query(&format!("{SELECT_WEBHOOK} WHERE id = ?")) + .bind(&id) + .fetch_one(&state.pool) + .await?; + Ok(Json(KanbanWebhookCreated { webhook: row_to_webhook(&row)?, secret }).into_response()) +} + +pub async fn delete_webhook( + State(state): State, + headers: HeaderMap, + Path((workspace_id, webhook_id)): Path<(String, String)>, +) -> Result { + let user = extract_user(&state.pool, &headers).await?; + require_workspace_role(&state.pool, &workspace_id, &user.id, &["owner", "admin"]).await?; + let res = sqlx::query("DELETE FROM kanban_webhooks WHERE id = ? AND workspace_id = ?") + .bind(&webhook_id) + .bind(&workspace_id) + .execute(&state.pool) + .await?; + if res.rows_affected() == 0 { + return Err(AppError::NotFound); + } + Ok(axum::http::StatusCode::NO_CONTENT.into_response()) +} + +/// Queue a delivery for every enabled webhook in the workspace that subscribes +/// to `event_type` (exact match or `"*"`) and is scoped to this board (or all +/// boards). Best-effort: errors are logged, never propagated to the caller. +pub async fn enqueue_event( + pool: &Pool, + workspace_id: &str, + board_id: &str, + event_type: &str, + payload: JsonValue, +) { + let rows = match sqlx::query( + "SELECT id, event_types FROM kanban_webhooks WHERE workspace_id = ? AND enabled = 1 AND (board_id IS NULL OR board_id = ?)", + ) + .bind(workspace_id) + .bind(board_id) + .fetch_all(pool) + .await + { + Ok(r) => r, + Err(e) => { + eprintln!("webhook enqueue query failed: {e}"); + return; + } + }; + for r in rows { + let webhook_id: String = match r.try_get("id") { + Ok(v) => v, + Err(_) => continue, + }; + let events: JsonValue = r.try_get("event_types").unwrap_or(JsonValue::Null); + let subscribed = matches!(&events, JsonValue::Array(a) if a.iter().any(|v| v.as_str() == Some(event_type) || v.as_str() == Some("*"))); + if !subscribed { + continue; + } + let delivery_id = Uuid::new_v4().to_string(); + if let Err(e) = sqlx::query( + "INSERT INTO kanban_webhook_deliveries (id, webhook_id, workspace_id, event_type, payload) VALUES (?, ?, ?, ?, ?)", + ) + .bind(&delivery_id) + .bind(&webhook_id) + .bind(workspace_id) + .bind(event_type) + .bind(&payload) + .execute(pool) + .await + { + eprintln!("webhook enqueue insert failed: {e}"); + } + } +} diff --git a/services/jtype-web/src/lib.rs b/services/jtype-web/src/lib.rs index ca78c79..43422b3 100644 --- a/services/jtype-web/src/lib.rs +++ b/services/jtype-web/src/lib.rs @@ -94,6 +94,7 @@ pub async fn run_from_env() -> Result<(), AppError> { .map_err(|e| AppError::Server(e.to_string()))?; println!("jtype-web listening on http://{}", bind_addr); // Spawn periodic trash cleanup (document_trash + kanban_card_trash) + tasks::webhook_delivery::spawn(pool.clone()); tasks::cleanup_trash::spawn(pool); axum::serve(listener, app) .await @@ -370,6 +371,15 @@ pub fn build_app( "/api/v1/workspaces/:workspace_id/kanban/cards/:card_id/restore", post(handlers::kanban::card::restore_card), ) + // Webhooks + .route( + "/api/v1/workspaces/:workspace_id/kanban/webhooks", + get(handlers::kanban::webhook::list_webhooks).post(handlers::kanban::webhook::create_webhook), + ) + .route( + "/api/v1/workspaces/:workspace_id/kanban/webhooks/:webhook_id", + axum::routing::delete(handlers::kanban::webhook::delete_webhook), + ) // Labels .route( "/api/v1/workspaces/:workspace_id/kanban/boards/:board_id/labels", diff --git a/services/jtype-web/src/tasks/mod.rs b/services/jtype-web/src/tasks/mod.rs index 3754adb..c3637d4 100644 --- a/services/jtype-web/src/tasks/mod.rs +++ b/services/jtype-web/src/tasks/mod.rs @@ -7,3 +7,4 @@ //! All tasks spawned from `lib.rs::run_from_env`. pub mod cleanup_trash; +pub mod webhook_delivery; diff --git a/services/jtype-web/src/tasks/webhook_delivery.rs b/services/jtype-web/src/tasks/webhook_delivery.rs new file mode 100644 index 0000000..cc3b484 --- /dev/null +++ b/services/jtype-web/src/tasks/webhook_delivery.rs @@ -0,0 +1,162 @@ +//! Outbound kanban webhook delivery worker. +//! +//! Every tick it picks due deliveries (`pending`/`failed` with `next_retry_at` +//! elapsed), signs the JSON body with HMAC-SHA256 (key = the webhook secret), +//! POSTs to the target URL, and records the outcome with exponential backoff. +//! `dead` after `max_attempts`. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use sqlx::{MySql, Pool, Row}; + +type HmacSha256 = Hmac; + +const TICK_SECS: u64 = 10; +const BATCH: i64 = 20; + +pub fn spawn(pool: Pool) { + let running = Arc::new(AtomicBool::new(false)); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(TICK_SECS)); + interval.tick().await; + loop { + interval.tick().await; + if running.swap(true, Ordering::SeqCst) { + continue; + } + if let Err(e) = run_once(&pool).await { + eprintln!("webhook delivery tick failed: {e}"); + } + running.store(false, Ordering::SeqCst); + } + }); +} + +fn hex_encode(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + s.push_str(&format!("{b:02x}")); + } + s +} + +/// Process one batch of due deliveries. Returns how many were attempted. +pub async fn run_once(pool: &Pool) -> Result { + let client = reqwest::Client::builder() + .user_agent("jtype-web") + .timeout(std::time::Duration::from_secs(10)) + .build() + .unwrap_or_else(|_| reqwest::Client::new()); + + let rows = sqlx::query( + r#"SELECT d.id, d.webhook_id, d.event_type, d.payload, d.attempt_count, d.max_attempts, + w.target_url, w.secret + FROM kanban_webhook_deliveries d + JOIN kanban_webhooks w ON w.id = d.webhook_id + WHERE d.status IN ('pending', 'failed') + AND (d.next_retry_at IS NULL OR d.next_retry_at <= NOW()) + ORDER BY d.created_at ASC + LIMIT ?"#, + ) + .bind(BATCH) + .fetch_all(pool) + .await?; + + let mut attempted = 0u64; + for r in rows { + let id: String = r.try_get("id")?; + let webhook_id: String = r.try_get("webhook_id")?; + let event_type: String = r.try_get("event_type")?; + let payload: serde_json::Value = r.try_get("payload")?; + let prev_attempts: i32 = r.try_get("attempt_count")?; + let max_attempts: i32 = r.try_get("max_attempts")?; + let target_url: String = r.try_get("target_url")?; + let secret: String = r.try_get("secret")?; + + let body = serde_json::to_vec(&payload).unwrap_or_default(); + let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { + Ok(m) => m, + Err(_) => continue, + }; + mac.update(&body); + let signature = hex_encode(&mac.finalize().into_bytes()); + + let resp = client + .post(&target_url) + .header("Content-Type", "application/json") + .header("X-JType-Event", &event_type) + .header("X-JType-Delivery", &id) + .header("X-JType-Signature", format!("sha256={signature}")) + .body(body) + .send() + .await; + + let attempt = prev_attempts + 1; + let success = matches!(&resp, Ok(r) if r.status().is_success()); + if success { + let code = resp.map(|r| r.status().as_u16() as i32).unwrap_or(0); + sqlx::query( + "UPDATE kanban_webhook_deliveries SET status='succeeded', attempt_count=?, last_status_code=?, next_retry_at=NULL WHERE id=?", + ) + .bind(attempt) + .bind(code) + .bind(&id) + .execute(pool) + .await?; + sqlx::query("UPDATE kanban_webhooks SET last_delivery_at=NOW(), last_status='ok' WHERE id=?") + .bind(&webhook_id) + .execute(pool) + .await?; + } else { + let (code, err): (Option, String) = match resp { + Ok(r) => (Some(r.status().as_u16() as i32), format!("HTTP {}", r.status())), + Err(e) => (None, e.to_string()), + }; + let dead = attempt >= max_attempts; + let status = if dead { "dead" } else { "failed" }; + let backoff = 2_i64.saturating_pow(attempt.max(0) as u32).saturating_mul(30).min(3600); + let err: String = err.chars().take(512).collect(); + sqlx::query( + "UPDATE kanban_webhook_deliveries SET status=?, attempt_count=?, last_status_code=?, last_error=?, next_retry_at=DATE_ADD(NOW(), INTERVAL ? SECOND) WHERE id=?", + ) + .bind(status) + .bind(attempt) + .bind(code) + .bind(&err) + .bind(backoff) + .bind(&id) + .execute(pool) + .await?; + sqlx::query("UPDATE kanban_webhooks SET last_delivery_at=NOW(), last_status='failed' WHERE id=?") + .bind(&webhook_id) + .execute(pool) + .await?; + } + attempted += 1; + } + Ok(attempted) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn hex_encode_zero_pads() { + assert_eq!(hex_encode(&[0x00, 0x0f, 0xff]), "000fff"); + } + + #[test] + fn hmac_sha256_matches_known_vector() { + // RFC-style vector: HMAC-SHA256("key", "The quick brown fox jumps over the lazy dog") + let mut mac = HmacSha256::new_from_slice(b"key").unwrap(); + mac.update(b"The quick brown fox jumps over the lazy dog"); + assert_eq!( + hex_encode(&mac.finalize().into_bytes()), + "f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8" + ); + } +} diff --git a/services/jtype-web/tests/kanban_tests.rs b/services/jtype-web/tests/kanban_tests.rs index 4d87c64..b766b5d 100644 --- a/services/jtype-web/tests/kanban_tests.rs +++ b/services/jtype-web/tests/kanban_tests.rs @@ -1582,3 +1582,55 @@ async fn conflict_response_includes_latest_card_snapshot() { assert!(body["latest"]["updatedClock"].as_i64().unwrap() > base_clock); assert_eq!(body["baseUpdatedClock"], base_clock); } + +#[tokio::test] +async fn webhook_crud_and_enqueue() { + let (app, pool) = common::setup().await; + let (token, _) = common::register_user(app.clone(), &common::uid()).await; + let ws_id = common::create_workspace(app.clone(), &token, &common::wname()).await; + let (_s, board) = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/boards"), + Some(&token), Some(json!({ "name": "B" })), + ).await; + let board_id = board["id"].as_str().unwrap().to_string(); + let col = board["columns"][0]["id"].as_str().unwrap().to_string(); + + // create a webhook subscribed to card-updated + let (status, wh) = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), + Some(&token), + Some(json!({ "name": "CI", "targetUrl": "https://example.com/hook", "eventTypes": ["kanban:card-updated"] })), + ).await; + assert_eq!(status, StatusCode::OK); + assert!(wh["secret"].is_string(), "plaintext secret returned once"); + assert_eq!(wh["secretMasked"], "whsec_••••"); + let wh_id = wh["id"].as_str().unwrap().to_string(); + + // reject non-https + let (bad, _b) = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), + Some(&token), Some(json!({ "name": "x", "targetUrl": "http://localhost/hook", "eventTypes": ["*"] })), + ).await; + assert_eq!(bad, StatusCode::BAD_REQUEST); + + // list → 1 + let (_s, list) = common::req( + app.clone(), "GET", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), Some(&token), None, + ).await; + assert_eq!(list.as_array().unwrap().len(), 1); + + // creating a card enqueues a delivery (synchronously, before the worker runs) + let _ = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/boards/{board_id}/cards"), + Some(&token), Some(json!({ "columnId": col, "title": "X" })), + ).await; + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM kanban_webhook_deliveries WHERE webhook_id = ?") + .bind(&wh_id).fetch_one(&pool).await.unwrap(); + assert!(count >= 1, "card create enqueued a webhook delivery"); + + // delete webhook → 204 (cascade removes deliveries) + let (status, _) = common::req( + app, "DELETE", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks/{wh_id}"), Some(&token), None, + ).await; + assert_eq!(status, StatusCode::NO_CONTENT); +} From 85b1a6cccfced95295b302db138518e4c6467712 Mon Sep 17 00:00:00 2001 From: jack Date: Tue, 23 Jun 2026 11:54:48 +0800 Subject: [PATCH 2/2] fix(kanban): harden webhook SSRF guard (review #44) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - validate_target_url now parses the URL with the `url` crate and classifies the host structurally, so `https://user@127.0.0.1/`, private/link-local IPv4, IPv6 loopback/ULA, and `*.internal`/`*.local` can no longer slip past the old prefix check. IP literals are classified via std::net (is_private/is_loopback/…). - delivery worker: reqwest client uses redirect Policy::none() so a 3xx to an internal host can't bypass the create-time validation. - test: webhook_crud_and_enqueue now asserts rejection of userinfo, private-IP, IPv6-loopback and .internal targets. Co-Authored-By: Claude Opus 4.8 --- services/jtype-web/Cargo.lock | 1 + services/jtype-web/Cargo.toml | 1 + .../jtype-web/src/handlers/kanban/webhook.rs | 69 ++++++++++++++----- .../jtype-web/src/tasks/webhook_delivery.rs | 3 + services/jtype-web/tests/kanban_tests.rs | 22 ++++-- 5 files changed, 71 insertions(+), 25 deletions(-) diff --git a/services/jtype-web/Cargo.lock b/services/jtype-web/Cargo.lock index 55bf97a..f409945 100644 --- a/services/jtype-web/Cargo.lock +++ b/services/jtype-web/Cargo.lock @@ -1030,6 +1030,7 @@ dependencies = [ "tokio", "tower", "tower-http", + "url", "uuid", ] diff --git a/services/jtype-web/Cargo.toml b/services/jtype-web/Cargo.toml index 279ddc8..3620b07 100644 --- a/services/jtype-web/Cargo.toml +++ b/services/jtype-web/Cargo.toml @@ -17,6 +17,7 @@ reqwest = { version = "0.12", default-features = false, features = ["json", "rus rand_core = { version = "0.6", features = ["std"] } rust-embed = "8" hmac = "0.12" +url = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10" diff --git a/services/jtype-web/src/handlers/kanban/webhook.rs b/services/jtype-web/src/handlers/kanban/webhook.rs index 29cf589..ae26ae3 100644 --- a/services/jtype-web/src/handlers/kanban/webhook.rs +++ b/services/jtype-web/src/handlers/kanban/webhook.rs @@ -90,27 +90,58 @@ const SELECT_WEBHOOK: &str = r#"SELECT id, board_id, name, target_url, event_typ CAST(created_at AS CHAR) AS created_at FROM kanban_webhooks"#; -/// Reject obvious SSRF targets (internal/loopback hosts) and non-HTTPS URLs. -fn validate_target_url(url: &str) -> Result<(), AppError> { - let lower = url.to_ascii_lowercase(); - if !lower.starts_with("https://") { +fn is_blocked_v4(ip: std::net::Ipv4Addr) -> bool { + // loopback 127/8, private 10/8 + 172.16/12 + 192.168/16, link-local 169.254/16, + // unspecified 0.0.0.0, broadcast, and the 0/8 reserved block. + ip.is_loopback() + || ip.is_private() + || ip.is_link_local() + || ip.is_unspecified() + || ip.is_broadcast() + || ip.octets()[0] == 0 +} + +fn is_blocked_v6(ip: std::net::Ipv6Addr) -> bool { + // loopback ::1, unspecified ::, and ULA fc00::/7. + ip.is_loopback() || ip.is_unspecified() || (ip.segments()[0] & 0xfe00) == 0xfc00 +} + +/// Reject SSRF targets (internal/loopback/private hosts) and non-HTTPS URLs. +/// Parses the URL structurally so userinfo (`https://user@127.0.0.1/`) and IPv6 +/// literals can't slip past a prefix check, and classifies IP literals properly. +fn validate_target_url(raw: &str) -> Result<(), AppError> { + let parsed = url::Url::parse(raw).map_err(|_| AppError::BadRequest("invalid target_url".into()))?; + if parsed.scheme() != "https" { return Err(AppError::BadRequest("target_url must be https://".into())); } - let host = lower["https://".len()..] - .split(['/', ':', '?']) - .next() - .unwrap_or(""); - let blocked = ["localhost", "127.", "0.0.0.0", "10.", "192.168.", "169.254.", "[::1]", "[fc", "[fd"]; - if host.is_empty() - || blocked.iter().any(|b| host.starts_with(b)) - || (host.starts_with("172.") - && host - .split('.') - .nth(1) - .and_then(|o| o.parse::().ok()) - .is_some_and(|n| (16..=31).contains(&n))) - { - return Err(AppError::BadRequest("target_url host is not allowed".into())); + let blocked = || AppError::BadRequest("target_url host is not allowed".into()); + match parsed.host().ok_or_else(blocked)? { + url::Host::Ipv4(ip) => { + if is_blocked_v4(ip) { + return Err(blocked()); + } + } + url::Host::Ipv6(ip) => { + if is_blocked_v6(ip) { + return Err(blocked()); + } + } + url::Host::Domain(d) => { + let d = d.to_ascii_lowercase(); + if d == "localhost" + || d.ends_with(".localhost") + || d.ends_with(".local") + || d.ends_with(".internal") + { + return Err(blocked()); + } + // A bare IP that url parsed as a domain (defensive). + match d.parse::() { + Ok(std::net::IpAddr::V4(v4)) if is_blocked_v4(v4) => return Err(blocked()), + Ok(std::net::IpAddr::V6(v6)) if is_blocked_v6(v6) => return Err(blocked()), + _ => {} + } + } } Ok(()) } diff --git a/services/jtype-web/src/tasks/webhook_delivery.rs b/services/jtype-web/src/tasks/webhook_delivery.rs index cc3b484..12f4580 100644 --- a/services/jtype-web/src/tasks/webhook_delivery.rs +++ b/services/jtype-web/src/tasks/webhook_delivery.rs @@ -48,6 +48,9 @@ pub async fn run_once(pool: &Pool) -> Result { let client = reqwest::Client::builder() .user_agent("jtype-web") .timeout(std::time::Duration::from_secs(10)) + // Don't follow redirects: a 3xx to an internal host would bypass the + // create-time SSRF validation of target_url. + .redirect(reqwest::redirect::Policy::none()) .build() .unwrap_or_else(|_| reqwest::Client::new()); diff --git a/services/jtype-web/tests/kanban_tests.rs b/services/jtype-web/tests/kanban_tests.rs index b766b5d..62aa4df 100644 --- a/services/jtype-web/tests/kanban_tests.rs +++ b/services/jtype-web/tests/kanban_tests.rs @@ -1606,12 +1606,22 @@ async fn webhook_crud_and_enqueue() { assert_eq!(wh["secretMasked"], "whsec_••••"); let wh_id = wh["id"].as_str().unwrap().to_string(); - // reject non-https - let (bad, _b) = common::req( - app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), - Some(&token), Some(json!({ "name": "x", "targetUrl": "http://localhost/hook", "eventTypes": ["*"] })), - ).await; - assert_eq!(bad, StatusCode::BAD_REQUEST); + // reject non-https and SSRF targets (incl. userinfo / private-IP / IPv6 bypasses) + for bad_url in [ + "http://localhost/hook", + "https://user@127.0.0.1/hook", + "https://10.0.0.5/hook", + "https://192.168.1.10/hook", + "https://169.254.169.254/latest", + "https://[::1]/hook", + "https://foo.internal/hook", + ] { + let (bad, _b) = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), + Some(&token), Some(json!({ "name": "x", "targetUrl": bad_url, "eventTypes": ["*"] })), + ).await; + assert_eq!(bad, StatusCode::BAD_REQUEST, "should reject {bad_url}"); + } // list → 1 let (_s, list) = common::req(