diff --git a/services/jtype-web/Cargo.lock b/services/jtype-web/Cargo.lock index 01dcdc1..f409945 100644 --- a/services/jtype-web/Cargo.lock +++ b/services/jtype-web/Cargo.lock @@ -1013,6 +1013,7 @@ dependencies = [ "bytes", "futures", "hex", + "hmac", "lettre", "mime_guess", "object_store", @@ -1029,6 +1030,7 @@ dependencies = [ "tokio", "tower", "tower-http", + "url", "uuid", ] diff --git a/services/jtype-web/Cargo.toml b/services/jtype-web/Cargo.toml index e49b660..3620b07 100644 --- a/services/jtype-web/Cargo.toml +++ b/services/jtype-web/Cargo.toml @@ -16,6 +16,8 @@ pulldown-cmark = "0.13" reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } rand_core = { version = "0.6", features = ["std"] } rust-embed = "8" +hmac = "0.12" +url = "2" serde = { version = "1", features = ["derive"] } serde_json = "1" sha2 = "0.10" diff --git a/services/jtype-web/frontend/src/api.ts b/services/jtype-web/frontend/src/api.ts index 8869e1a..228808a 100644 --- a/services/jtype-web/frontend/src/api.ts +++ b/services/jtype-web/frontend/src/api.ts @@ -342,6 +342,12 @@ export const api = { request(`/api/v1/workspaces/${workspaceId}/kanban/comments/${commentId}`, { method: 'DELETE' }), getCardActivity: (workspaceId: string, cardId: string) => request(`/api/v1/workspaces/${workspaceId}/kanban/cards/${cardId}/activity`), + listWebhooks: (workspaceId: string) => + request(`/api/v1/workspaces/${workspaceId}/kanban/webhooks`), + createWebhook: (workspaceId: string, data: { name: string; targetUrl: string; boardId?: string | null; eventTypes: string[] }) => + request(`/api/v1/workspaces/${workspaceId}/kanban/webhooks`, { method: 'POST', body: JSON.stringify(data) }), + deleteWebhook: (workspaceId: string, webhookId: string) => + request(`/api/v1/workspaces/${workspaceId}/kanban/webhooks/${webhookId}`, { method: 'DELETE' }), listLabels: (workspaceId: string, boardId: string) => request(`/api/v1/workspaces/${workspaceId}/kanban/boards/${boardId}/labels`), @@ -835,6 +841,22 @@ export interface KanbanBoardFull extends KanbanBoardSummary { labels: KanbanLabel[] } +export interface KanbanWebhook { + id: string + boardId?: string | null + name: string + targetUrl: string + eventTypes: string[] + enabled: boolean + secretMasked: string + lastDeliveryAt?: string | null + lastStatus?: string | null + createdAt: string +} +export interface KanbanWebhookCreated extends KanbanWebhook { + secret: string +} + export interface KanbanComment { id: string cardId: string diff --git a/services/jtype-web/frontend/src/pages/Kanban.tsx b/services/jtype-web/frontend/src/pages/Kanban.tsx index ad6dc47..05da072 100644 --- a/services/jtype-web/frontend/src/pages/Kanban.tsx +++ b/services/jtype-web/frontend/src/pages/Kanban.tsx @@ -1,7 +1,7 @@ import { useState, useEffect, useCallback, useRef, useMemo } from 'react' import { useParams, useNavigate } from 'react-router-dom' import { Dialog, DialogPanel, Menu, MenuButton, MenuItems, MenuItem } from '@headlessui/react' -import { PlusIcon, EllipsisHorizontalIcon, TrashIcon, ArchiveBoxIcon, ArrowUturnLeftIcon, TagIcon, XMarkIcon, ChevronDownIcon } from '@heroicons/react/24/outline' +import { PlusIcon, EllipsisHorizontalIcon, TrashIcon, ArchiveBoxIcon, ArrowUturnLeftIcon, TagIcon, XMarkIcon, ChevronDownIcon, BoltIcon } from '@heroicons/react/24/outline' import { api, setSessionId, @@ -12,6 +12,8 @@ import { type KanbanLabel, type KanbanTrashItem, type KanbanPriority, + type KanbanWebhook, + type KanbanWebhookCreated, type UpdateKanbanCardRequest, type MemberInfo, } from '../api' @@ -41,6 +43,7 @@ export function Kanban() { const [error, setError] = useState('') const [showLabels, setShowLabels] = useState(false) const [showTrash, setShowTrash] = useState(false) + const [showWebhooks, setShowWebhooks] = useState(false) const [view, setView] = useState>({}) const { sessionId: wsSessionId, subscribe: wsSubscribe, status: wsStatus } = useWorkspaceSocket(workspaceId) @@ -373,6 +376,7 @@ export function Kanban() {
+ @@ -413,6 +417,9 @@ export function Kanban() { {showTrash && board && workspaceId && ( setShowTrash(false)} onChanged={reload} /> )} + {showWebhooks && workspaceId && ( + setShowWebhooks(false)} /> + )} +
+ {error &&

{error}

} + {revealed && ( +
+
Signing secret — shown once, copy it now:
+ {revealed.secret} +
+ )} +
    + {hooks.map((h) => ( +
  • +
    +
    {h.name}
    +
    {h.targetUrl}
    +
    {h.eventTypes.join(', ')}{h.boardId ? ' · this board' : ' · all boards'}{h.lastStatus ? ` · last: ${h.lastStatus}` : ''}
    +
    + +
  • + ))} + {hooks.length === 0 &&
  • No webhooks yet.
  • } +
+
{ e.preventDefault(); void create() }} className="space-y-2 border-t border-zinc-100 pt-3"> + setName(e.target.value)} /> + setUrl(e.target.value)} /> +
+ {WEBHOOK_EVENTS.map((ev) => ( + + ))} +
+
+ + {boardId && } +
+
+ +
+
+ + + + ) +} diff --git a/services/jtype-web/migrations/0017_kanban_webhooks.down.sql b/services/jtype-web/migrations/0017_kanban_webhooks.down.sql new file mode 100644 index 0000000..3cf9de2 --- /dev/null +++ b/services/jtype-web/migrations/0017_kanban_webhooks.down.sql @@ -0,0 +1,2 @@ +DROP TABLE IF EXISTS kanban_webhook_deliveries; +DROP TABLE IF EXISTS kanban_webhooks; diff --git a/services/jtype-web/migrations/0017_kanban_webhooks.up.sql b/services/jtype-web/migrations/0017_kanban_webhooks.up.sql new file mode 100644 index 0000000..dcd4d65 --- /dev/null +++ b/services/jtype-web/migrations/0017_kanban_webhooks.up.sql @@ -0,0 +1,42 @@ +CREATE TABLE kanban_webhooks ( + id CHAR(36) NOT NULL, + workspace_id CHAR(36) NOT NULL, + board_id CHAR(36) NULL, + name VARCHAR(160) NOT NULL, + target_url VARCHAR(2048) NOT NULL, + secret CHAR(64) NOT NULL, + event_types JSON NOT NULL, + enabled TINYINT(1) NOT NULL DEFAULT 1, + created_by_user_id CHAR(36) NOT NULL, + last_delivery_at TIMESTAMP NULL, + last_status VARCHAR(32) NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + KEY idx_webhooks_workspace (workspace_id), + KEY idx_webhooks_enabled (workspace_id, enabled), + CONSTRAINT kanban_webhooks_workspace_fk FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE, + CONSTRAINT kanban_webhooks_board_fk FOREIGN KEY (board_id) REFERENCES kanban_boards(id) ON DELETE CASCADE, + CONSTRAINT kanban_webhooks_creator_fk FOREIGN KEY (created_by_user_id) REFERENCES users(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; + +CREATE TABLE kanban_webhook_deliveries ( + id CHAR(36) NOT NULL, + webhook_id CHAR(36) NOT NULL, + workspace_id CHAR(36) NOT NULL, + event_type VARCHAR(64) NOT NULL, + payload JSON NOT NULL, + status ENUM('pending','succeeded','failed','dead') NOT NULL DEFAULT 'pending', + attempt_count INT NOT NULL DEFAULT 0, + max_attempts INT NOT NULL DEFAULT 6, + last_status_code INT NULL, + last_error VARCHAR(512) NULL, + next_retry_at TIMESTAMP NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (id), + KEY idx_deliveries_webhook (webhook_id), + KEY idx_deliveries_due (status, next_retry_at), + CONSTRAINT kanban_deliveries_webhook_fk FOREIGN KEY (webhook_id) REFERENCES kanban_webhooks(id) ON DELETE CASCADE, + CONSTRAINT kanban_deliveries_workspace_fk FOREIGN KEY (workspace_id) REFERENCES workspaces(id) ON DELETE CASCADE +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci; diff --git a/services/jtype-web/src/db/migrations.rs b/services/jtype-web/src/db/migrations.rs index 8f16d52..8a02670 100644 --- a/services/jtype-web/src/db/migrations.rs +++ b/services/jtype-web/src/db/migrations.rs @@ -104,6 +104,12 @@ fn all_migrations() -> Vec { up: include_str!("../../migrations/0016_kanban_comments.up.sql"), down: include_str!("../../migrations/0016_kanban_comments.down.sql"), }, + Migration { + version: 17, + name: "kanban_webhooks", + up: include_str!("../../migrations/0017_kanban_webhooks.up.sql"), + down: include_str!("../../migrations/0017_kanban_webhooks.down.sql"), + }, ] } diff --git a/services/jtype-web/src/handlers/kanban/card.rs b/services/jtype-web/src/handlers/kanban/card.rs index 713b545..65d9a18 100644 --- a/services/jtype-web/src/handlers/kanban/card.rs +++ b/services/jtype-web/src/handlers/kanban/card.rs @@ -274,6 +274,15 @@ pub async fn create_card( ) .await; + super::webhook::enqueue_event( + &state.pool, + &workspace_id, + &board_id, + "kanban:card-updated", + json!({ "event": "kanban:card-updated", "cardId": card_id, "boardId": board_id }), + ) + .await; + // Re-fetch with DB timestamps fetch_card_response(&state, &workspace_id, &card_id).await } @@ -721,6 +730,15 @@ pub async fn move_card( ) .await; + super::webhook::enqueue_event( + &state.pool, + &workspace_id, + &card.board_id, + "kanban:card-updated", + json!({ "event": "kanban:card-updated", "cardId": card.id, "boardId": card.board_id }), + ) + .await; + Ok(Json(card).into_response()) } @@ -853,6 +871,15 @@ pub async fn archive_card( ) .await; + super::webhook::enqueue_event( + &state.pool, + &workspace_id, + &board_id, + "kanban:card-archived", + json!({ "event": "kanban:card-archived", "cardId": card_id, "boardId": board_id }), + ) + .await; + Ok(Json(json!({ "id": trash_id, "cardId": card_id, diff --git a/services/jtype-web/src/handlers/kanban/mod.rs b/services/jtype-web/src/handlers/kanban/mod.rs index d777eda..895b48e 100644 --- a/services/jtype-web/src/handlers/kanban/mod.rs +++ b/services/jtype-web/src/handlers/kanban/mod.rs @@ -13,6 +13,7 @@ pub mod card; pub mod column; pub mod comment; pub mod label; +pub mod webhook; use sqlx::Row; diff --git a/services/jtype-web/src/handlers/kanban/webhook.rs b/services/jtype-web/src/handlers/kanban/webhook.rs new file mode 100644 index 0000000..ae26ae3 --- /dev/null +++ b/services/jtype-web/src/handlers/kanban/webhook.rs @@ -0,0 +1,297 @@ +//! Kanban webhook registration + outbound delivery enqueue (DB board). +//! +//! Endpoints (owner/admin only): +//! GET /api/v1/workspaces/:workspace_id/kanban/webhooks +//! POST /api/v1/workspaces/:workspace_id/kanban/webhooks +//! DELETE /api/v1/workspaces/:workspace_id/kanban/webhooks/:webhook_id +//! +//! On create the plaintext `secret` is returned ONCE; thereafter only a mask. +//! `enqueue_event` is called from the card handlers' broadcast sites to queue +//! deliveries; the `tasks::webhook_delivery` worker signs (HMAC-SHA256) and POSTs. + +use axum::{ + extract::{Path, State}, + http::HeaderMap, + response::{IntoResponse, Response}, + Json, +}; +use serde::{Deserialize, Serialize}; +use serde_json::Value as JsonValue; +use sqlx::{MySql, Pool, Row}; +use uuid::Uuid; + +use super::clamp_str; +use crate::error::AppError; +use crate::handlers::workspace::require_workspace_role; +use crate::middleware::auth::extract_user; +use crate::AppState; + +const MAX_NAME: usize = 160; +const MAX_URL: usize = 2048; +const MAX_WEBHOOKS_PER_WORKSPACE: i64 = 20; + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KanbanWebhook { + pub id: String, + pub board_id: Option, + pub name: String, + pub target_url: String, + pub event_types: Vec, + pub enabled: bool, + pub secret_masked: String, + pub last_delivery_at: Option, + pub last_status: Option, + pub created_at: String, +} + +#[derive(Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct KanbanWebhookCreated { + #[serde(flatten)] + pub webhook: KanbanWebhook, + /// Plaintext secret — returned only on create. + pub secret: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CreateKanbanWebhookRequest { + pub name: String, + pub target_url: String, + #[serde(default)] + pub board_id: Option, + #[serde(default)] + pub event_types: Vec, +} + +fn row_to_webhook(r: &sqlx::mysql::MySqlRow) -> Result { + let event_types: JsonValue = r.try_get("event_types")?; + let events = match event_types { + JsonValue::Array(a) => a.into_iter().filter_map(|v| v.as_str().map(String::from)).collect(), + _ => Vec::new(), + }; + Ok(KanbanWebhook { + id: r.try_get("id")?, + board_id: r.try_get("board_id")?, + name: r.try_get("name")?, + target_url: r.try_get("target_url")?, + event_types: events, + enabled: r.try_get::("enabled")? != 0, + secret_masked: "whsec_••••".into(), + last_delivery_at: r.try_get("last_delivery_at")?, + last_status: r.try_get("last_status")?, + created_at: r.try_get("created_at")?, + }) +} + +const SELECT_WEBHOOK: &str = r#"SELECT id, board_id, name, target_url, event_types, enabled, + CAST(last_delivery_at AS CHAR) AS last_delivery_at, last_status, + CAST(created_at AS CHAR) AS created_at +FROM kanban_webhooks"#; + +fn is_blocked_v4(ip: std::net::Ipv4Addr) -> bool { + // loopback 127/8, private 10/8 + 172.16/12 + 192.168/16, link-local 169.254/16, + // unspecified 0.0.0.0, broadcast, and the 0/8 reserved block. + ip.is_loopback() + || ip.is_private() + || ip.is_link_local() + || ip.is_unspecified() + || ip.is_broadcast() + || ip.octets()[0] == 0 +} + +fn is_blocked_v6(ip: std::net::Ipv6Addr) -> bool { + // loopback ::1, unspecified ::, and ULA fc00::/7. + ip.is_loopback() || ip.is_unspecified() || (ip.segments()[0] & 0xfe00) == 0xfc00 +} + +/// Reject SSRF targets (internal/loopback/private hosts) and non-HTTPS URLs. +/// Parses the URL structurally so userinfo (`https://user@127.0.0.1/`) and IPv6 +/// literals can't slip past a prefix check, and classifies IP literals properly. +fn validate_target_url(raw: &str) -> Result<(), AppError> { + let parsed = url::Url::parse(raw).map_err(|_| AppError::BadRequest("invalid target_url".into()))?; + if parsed.scheme() != "https" { + return Err(AppError::BadRequest("target_url must be https://".into())); + } + let blocked = || AppError::BadRequest("target_url host is not allowed".into()); + match parsed.host().ok_or_else(blocked)? { + url::Host::Ipv4(ip) => { + if is_blocked_v4(ip) { + return Err(blocked()); + } + } + url::Host::Ipv6(ip) => { + if is_blocked_v6(ip) { + return Err(blocked()); + } + } + url::Host::Domain(d) => { + let d = d.to_ascii_lowercase(); + if d == "localhost" + || d.ends_with(".localhost") + || d.ends_with(".local") + || d.ends_with(".internal") + { + return Err(blocked()); + } + // A bare IP that url parsed as a domain (defensive). + match d.parse::() { + Ok(std::net::IpAddr::V4(v4)) if is_blocked_v4(v4) => return Err(blocked()), + Ok(std::net::IpAddr::V6(v6)) if is_blocked_v6(v6) => return Err(blocked()), + _ => {} + } + } + } + Ok(()) +} + +pub async fn list_webhooks( + State(state): State, + headers: HeaderMap, + Path(workspace_id): Path, +) -> Result { + let user = extract_user(&state.pool, &headers).await?; + require_workspace_role(&state.pool, &workspace_id, &user.id, &["owner", "admin"]).await?; + let rows = sqlx::query(&format!("{SELECT_WEBHOOK} WHERE workspace_id = ? ORDER BY created_at DESC")) + .bind(&workspace_id) + .fetch_all(&state.pool) + .await?; + let out = rows.iter().map(row_to_webhook).collect::, _>>()?; + Ok(Json(out).into_response()) +} + +pub async fn create_webhook( + State(state): State, + headers: HeaderMap, + Path(workspace_id): Path, + Json(payload): Json, +) -> Result { + let user = extract_user(&state.pool, &headers).await?; + require_workspace_role(&state.pool, &workspace_id, &user.id, &["owner", "admin"]).await?; + + let name = clamp_str(payload.name.trim(), MAX_NAME); + if name.is_empty() { + return Err(AppError::BadRequest("name cannot be empty".into())); + } + let target_url = clamp_str(payload.target_url.trim(), MAX_URL); + validate_target_url(&target_url)?; + let events: Vec = payload + .event_types + .iter() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(); + if events.is_empty() { + return Err(AppError::BadRequest("event_types cannot be empty".into())); + } + if let Some(b) = &payload.board_id { + let ok: Option = sqlx::query_scalar("SELECT id FROM kanban_boards WHERE id = ? AND workspace_id = ?") + .bind(b) + .bind(&workspace_id) + .fetch_optional(&state.pool) + .await?; + if ok.is_none() { + return Err(AppError::BadRequest("board_id not found in workspace".into())); + } + } + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM kanban_webhooks WHERE workspace_id = ?") + .bind(&workspace_id) + .fetch_one(&state.pool) + .await?; + if count >= MAX_WEBHOOKS_PER_WORKSPACE { + return Err(AppError::BadRequest("too many webhooks for this workspace".into())); + } + + let id = Uuid::new_v4().to_string(); + let secret = format!("{}{}", Uuid::new_v4().simple(), Uuid::new_v4().simple()); + let events_json = serde_json::to_value(&events).unwrap_or(JsonValue::Array(vec![])); + sqlx::query( + "INSERT INTO kanban_webhooks (id, workspace_id, board_id, name, target_url, secret, event_types, created_by_user_id) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + ) + .bind(&id) + .bind(&workspace_id) + .bind(&payload.board_id) + .bind(&name) + .bind(&target_url) + .bind(&secret) + .bind(&events_json) + .bind(&user.id) + .execute(&state.pool) + .await?; + + let row = sqlx::query(&format!("{SELECT_WEBHOOK} WHERE id = ?")) + .bind(&id) + .fetch_one(&state.pool) + .await?; + Ok(Json(KanbanWebhookCreated { webhook: row_to_webhook(&row)?, secret }).into_response()) +} + +pub async fn delete_webhook( + State(state): State, + headers: HeaderMap, + Path((workspace_id, webhook_id)): Path<(String, String)>, +) -> Result { + let user = extract_user(&state.pool, &headers).await?; + require_workspace_role(&state.pool, &workspace_id, &user.id, &["owner", "admin"]).await?; + let res = sqlx::query("DELETE FROM kanban_webhooks WHERE id = ? AND workspace_id = ?") + .bind(&webhook_id) + .bind(&workspace_id) + .execute(&state.pool) + .await?; + if res.rows_affected() == 0 { + return Err(AppError::NotFound); + } + Ok(axum::http::StatusCode::NO_CONTENT.into_response()) +} + +/// Queue a delivery for every enabled webhook in the workspace that subscribes +/// to `event_type` (exact match or `"*"`) and is scoped to this board (or all +/// boards). Best-effort: errors are logged, never propagated to the caller. +pub async fn enqueue_event( + pool: &Pool, + workspace_id: &str, + board_id: &str, + event_type: &str, + payload: JsonValue, +) { + let rows = match sqlx::query( + "SELECT id, event_types FROM kanban_webhooks WHERE workspace_id = ? AND enabled = 1 AND (board_id IS NULL OR board_id = ?)", + ) + .bind(workspace_id) + .bind(board_id) + .fetch_all(pool) + .await + { + Ok(r) => r, + Err(e) => { + eprintln!("webhook enqueue query failed: {e}"); + return; + } + }; + for r in rows { + let webhook_id: String = match r.try_get("id") { + Ok(v) => v, + Err(_) => continue, + }; + let events: JsonValue = r.try_get("event_types").unwrap_or(JsonValue::Null); + let subscribed = matches!(&events, JsonValue::Array(a) if a.iter().any(|v| v.as_str() == Some(event_type) || v.as_str() == Some("*"))); + if !subscribed { + continue; + } + let delivery_id = Uuid::new_v4().to_string(); + if let Err(e) = sqlx::query( + "INSERT INTO kanban_webhook_deliveries (id, webhook_id, workspace_id, event_type, payload) VALUES (?, ?, ?, ?, ?)", + ) + .bind(&delivery_id) + .bind(&webhook_id) + .bind(workspace_id) + .bind(event_type) + .bind(&payload) + .execute(pool) + .await + { + eprintln!("webhook enqueue insert failed: {e}"); + } + } +} diff --git a/services/jtype-web/src/lib.rs b/services/jtype-web/src/lib.rs index 0eae63a..8218d99 100644 --- a/services/jtype-web/src/lib.rs +++ b/services/jtype-web/src/lib.rs @@ -94,6 +94,7 @@ pub async fn run_from_env() -> Result<(), AppError> { .map_err(|e| AppError::Server(e.to_string()))?; println!("jtype-web listening on http://{}", bind_addr); // Spawn periodic trash cleanup (document_trash + kanban_card_trash) + tasks::webhook_delivery::spawn(pool.clone()); tasks::cleanup_trash::spawn(pool); axum::serve(listener, app) .await @@ -370,6 +371,7 @@ pub fn build_app( "/api/v1/workspaces/:workspace_id/kanban/cards/:card_id/restore", post(handlers::kanban::card::restore_card), ) + // Comments .route( "/api/v1/workspaces/:workspace_id/kanban/cards/:card_id/comments", get(handlers::kanban::comment::list_comments).post(handlers::kanban::comment::create_comment), @@ -382,6 +384,15 @@ pub fn build_app( "/api/v1/workspaces/:workspace_id/kanban/cards/:card_id/activity", get(handlers::kanban::card::card_activity), ) + // Webhooks + .route( + "/api/v1/workspaces/:workspace_id/kanban/webhooks", + get(handlers::kanban::webhook::list_webhooks).post(handlers::kanban::webhook::create_webhook), + ) + .route( + "/api/v1/workspaces/:workspace_id/kanban/webhooks/:webhook_id", + axum::routing::delete(handlers::kanban::webhook::delete_webhook), + ) // Labels .route( "/api/v1/workspaces/:workspace_id/kanban/boards/:board_id/labels", diff --git a/services/jtype-web/src/tasks/mod.rs b/services/jtype-web/src/tasks/mod.rs index 3754adb..c3637d4 100644 --- a/services/jtype-web/src/tasks/mod.rs +++ b/services/jtype-web/src/tasks/mod.rs @@ -7,3 +7,4 @@ //! All tasks spawned from `lib.rs::run_from_env`. pub mod cleanup_trash; +pub mod webhook_delivery; diff --git a/services/jtype-web/src/tasks/webhook_delivery.rs b/services/jtype-web/src/tasks/webhook_delivery.rs new file mode 100644 index 0000000..12f4580 --- /dev/null +++ b/services/jtype-web/src/tasks/webhook_delivery.rs @@ -0,0 +1,165 @@ +//! Outbound kanban webhook delivery worker. +//! +//! Every tick it picks due deliveries (`pending`/`failed` with `next_retry_at` +//! elapsed), signs the JSON body with HMAC-SHA256 (key = the webhook secret), +//! POSTs to the target URL, and records the outcome with exponential backoff. +//! `dead` after `max_attempts`. + +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +use hmac::{Hmac, Mac}; +use sha2::Sha256; +use sqlx::{MySql, Pool, Row}; + +type HmacSha256 = Hmac; + +const TICK_SECS: u64 = 10; +const BATCH: i64 = 20; + +pub fn spawn(pool: Pool) { + let running = Arc::new(AtomicBool::new(false)); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(TICK_SECS)); + interval.tick().await; + loop { + interval.tick().await; + if running.swap(true, Ordering::SeqCst) { + continue; + } + if let Err(e) = run_once(&pool).await { + eprintln!("webhook delivery tick failed: {e}"); + } + running.store(false, Ordering::SeqCst); + } + }); +} + +fn hex_encode(bytes: &[u8]) -> String { + let mut s = String::with_capacity(bytes.len() * 2); + for b in bytes { + s.push_str(&format!("{b:02x}")); + } + s +} + +/// Process one batch of due deliveries. Returns how many were attempted. +pub async fn run_once(pool: &Pool) -> Result { + let client = reqwest::Client::builder() + .user_agent("jtype-web") + .timeout(std::time::Duration::from_secs(10)) + // Don't follow redirects: a 3xx to an internal host would bypass the + // create-time SSRF validation of target_url. + .redirect(reqwest::redirect::Policy::none()) + .build() + .unwrap_or_else(|_| reqwest::Client::new()); + + let rows = sqlx::query( + r#"SELECT d.id, d.webhook_id, d.event_type, d.payload, d.attempt_count, d.max_attempts, + w.target_url, w.secret + FROM kanban_webhook_deliveries d + JOIN kanban_webhooks w ON w.id = d.webhook_id + WHERE d.status IN ('pending', 'failed') + AND (d.next_retry_at IS NULL OR d.next_retry_at <= NOW()) + ORDER BY d.created_at ASC + LIMIT ?"#, + ) + .bind(BATCH) + .fetch_all(pool) + .await?; + + let mut attempted = 0u64; + for r in rows { + let id: String = r.try_get("id")?; + let webhook_id: String = r.try_get("webhook_id")?; + let event_type: String = r.try_get("event_type")?; + let payload: serde_json::Value = r.try_get("payload")?; + let prev_attempts: i32 = r.try_get("attempt_count")?; + let max_attempts: i32 = r.try_get("max_attempts")?; + let target_url: String = r.try_get("target_url")?; + let secret: String = r.try_get("secret")?; + + let body = serde_json::to_vec(&payload).unwrap_or_default(); + let mut mac = match HmacSha256::new_from_slice(secret.as_bytes()) { + Ok(m) => m, + Err(_) => continue, + }; + mac.update(&body); + let signature = hex_encode(&mac.finalize().into_bytes()); + + let resp = client + .post(&target_url) + .header("Content-Type", "application/json") + .header("X-JType-Event", &event_type) + .header("X-JType-Delivery", &id) + .header("X-JType-Signature", format!("sha256={signature}")) + .body(body) + .send() + .await; + + let attempt = prev_attempts + 1; + let success = matches!(&resp, Ok(r) if r.status().is_success()); + if success { + let code = resp.map(|r| r.status().as_u16() as i32).unwrap_or(0); + sqlx::query( + "UPDATE kanban_webhook_deliveries SET status='succeeded', attempt_count=?, last_status_code=?, next_retry_at=NULL WHERE id=?", + ) + .bind(attempt) + .bind(code) + .bind(&id) + .execute(pool) + .await?; + sqlx::query("UPDATE kanban_webhooks SET last_delivery_at=NOW(), last_status='ok' WHERE id=?") + .bind(&webhook_id) + .execute(pool) + .await?; + } else { + let (code, err): (Option, String) = match resp { + Ok(r) => (Some(r.status().as_u16() as i32), format!("HTTP {}", r.status())), + Err(e) => (None, e.to_string()), + }; + let dead = attempt >= max_attempts; + let status = if dead { "dead" } else { "failed" }; + let backoff = 2_i64.saturating_pow(attempt.max(0) as u32).saturating_mul(30).min(3600); + let err: String = err.chars().take(512).collect(); + sqlx::query( + "UPDATE kanban_webhook_deliveries SET status=?, attempt_count=?, last_status_code=?, last_error=?, next_retry_at=DATE_ADD(NOW(), INTERVAL ? SECOND) WHERE id=?", + ) + .bind(status) + .bind(attempt) + .bind(code) + .bind(&err) + .bind(backoff) + .bind(&id) + .execute(pool) + .await?; + sqlx::query("UPDATE kanban_webhooks SET last_delivery_at=NOW(), last_status='failed' WHERE id=?") + .bind(&webhook_id) + .execute(pool) + .await?; + } + attempted += 1; + } + Ok(attempted) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn hex_encode_zero_pads() { + assert_eq!(hex_encode(&[0x00, 0x0f, 0xff]), "000fff"); + } + + #[test] + fn hmac_sha256_matches_known_vector() { + // RFC-style vector: HMAC-SHA256("key", "The quick brown fox jumps over the lazy dog") + let mut mac = HmacSha256::new_from_slice(b"key").unwrap(); + mac.update(b"The quick brown fox jumps over the lazy dog"); + assert_eq!( + hex_encode(&mac.finalize().into_bytes()), + "f7bc83f430538424b13298e6aa6fb143ef4d59a14946175997479dbc2d1a3cd8" + ); + } +} diff --git a/services/jtype-web/tests/kanban_tests.rs b/services/jtype-web/tests/kanban_tests.rs index 3eb2b0c..5510b71 100644 --- a/services/jtype-web/tests/kanban_tests.rs +++ b/services/jtype-web/tests/kanban_tests.rs @@ -1657,6 +1657,68 @@ async fn conflict_response_includes_latest_card_snapshot() { assert_eq!(body["baseUpdatedClock"], base_clock); } +#[tokio::test] +async fn webhook_crud_and_enqueue() { + let (app, pool) = common::setup().await; + let (token, _) = common::register_user(app.clone(), &common::uid()).await; + let ws_id = common::create_workspace(app.clone(), &token, &common::wname()).await; + let (_s, board) = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/boards"), + Some(&token), Some(json!({ "name": "B" })), + ).await; + let board_id = board["id"].as_str().unwrap().to_string(); + let col = board["columns"][0]["id"].as_str().unwrap().to_string(); + + // create a webhook subscribed to card-updated + let (status, wh) = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), + Some(&token), + Some(json!({ "name": "CI", "targetUrl": "https://example.com/hook", "eventTypes": ["kanban:card-updated"] })), + ).await; + assert_eq!(status, StatusCode::OK); + assert!(wh["secret"].is_string(), "plaintext secret returned once"); + assert_eq!(wh["secretMasked"], "whsec_••••"); + let wh_id = wh["id"].as_str().unwrap().to_string(); + + // reject non-https and SSRF targets (incl. userinfo / private-IP / IPv6 bypasses) + for bad_url in [ + "http://localhost/hook", + "https://user@127.0.0.1/hook", + "https://10.0.0.5/hook", + "https://192.168.1.10/hook", + "https://169.254.169.254/latest", + "https://[::1]/hook", + "https://foo.internal/hook", + ] { + let (bad, _b) = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), + Some(&token), Some(json!({ "name": "x", "targetUrl": bad_url, "eventTypes": ["*"] })), + ).await; + assert_eq!(bad, StatusCode::BAD_REQUEST, "should reject {bad_url}"); + } + + // list → 1 + let (_s, list) = common::req( + app.clone(), "GET", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks"), Some(&token), None, + ).await; + assert_eq!(list.as_array().unwrap().len(), 1); + + // creating a card enqueues a delivery (synchronously, before the worker runs) + let _ = common::req( + app.clone(), "POST", &format!("/api/v1/workspaces/{ws_id}/kanban/boards/{board_id}/cards"), + Some(&token), Some(json!({ "columnId": col, "title": "X" })), + ).await; + let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM kanban_webhook_deliveries WHERE webhook_id = ?") + .bind(&wh_id).fetch_one(&pool).await.unwrap(); + assert!(count >= 1, "card create enqueued a webhook delivery"); + + // delete webhook → 204 (cascade removes deliveries) + let (status, _) = common::req( + app, "DELETE", &format!("/api/v1/workspaces/{ws_id}/kanban/webhooks/{wh_id}"), Some(&token), None, + ).await; + assert_eq!(status, StatusCode::NO_CONTENT); +} + #[tokio::test] async fn card_comments_crud() { let (app, _pool) = common::setup().await;