From b9105345d3668d810b22656f4d99f14547166f51 Mon Sep 17 00:00:00 2001 From: ekartgan Date: Thu, 21 May 2026 16:35:41 -0700 Subject: [PATCH 1/2] feat: streaming database exports to R2 for large databases Fixes #59. Large database exports now stream chunks to R2 using multipart upload, with DO alarm-based continuation when exports exceed 25s. Adds /export/status/:id and /export/download/:id endpoints. Falls back to in-memory export when R2 is not configured. /claim #59 Co-Authored-By: Claude Opus 4.6 --- src/do.ts | 35 +++ src/export/dump-streaming.ts | 445 +++++++++++++++++++++++++++++++++++ src/export/dump.ts | 18 +- src/handler.ts | 46 +++- src/index.ts | 7 + wrangler.toml | 11 + 6 files changed, 559 insertions(+), 3 deletions(-) create mode 100644 src/export/dump-streaming.ts diff --git a/src/do.ts b/src/do.ts index b6bb2b6..8c739a6 100644 --- a/src/do.ts +++ b/src/do.ts @@ -1,4 +1,5 @@ import { DurableObject } from 'cloudflare:workers' +import { continueExportFromAlarm } from './export/dump-streaming' export class StarbaseDBDurableObject extends DurableObject { // Durable storage for the SQL database @@ -9,6 +10,8 @@ export class StarbaseDBDurableObject extends DurableObject { public connections = new Map() // Store the client auth token for requests back to our Worker private clientAuthToken: string + // R2 bucket binding for export operations (optional) + private r2Bucket?: R2Bucket /** * The constructor is invoked once upon creation of the Durable Object, i.e. the first call to @@ -22,6 +25,8 @@ export class StarbaseDBDurableObject extends DurableObject { this.clientAuthToken = env.CLIENT_AUTHORIZATION_TOKEN this.sql = ctx.storage.sql this.storage = ctx.storage + // Attach R2 bucket if the binding is configured in wrangler.toml + this.r2Bucket = (env as any).EXPORT_BUCKET as R2Bucket | undefined // Install default necessary `tmp_` tables for various features here. const cacheStatement = ` @@ -105,6 +110,36 @@ export class StarbaseDBDurableObject extends DurableObject { } async alarm() { + // ── Export continuation ─────────────────────────────────────────────── + // If there is a streaming export in progress, continue it first. + // We do this before the cron check so that a large export doesn't block + // the cron scheduler indefinitely; once the export is complete or no + // export is found the cron logic proceeds normally. + if (this.r2Bucket) { + try { + const pendingExport = (await this.executeQuery({ + sql: `SELECT export_id FROM tmp_export_state WHERE value LIKE '%"status":"running"%' LIMIT 1;`, + isRaw: false, + })) as Record[] + + if (pendingExport.length) { + await continueExportFromAlarm({ + dataSource: { + rpc: this.init(), + source: 'internal', + }, + config: { role: 'admin' }, + r2Bucket: this.r2Bucket, + }) + // Re-schedule in case the export needs more iterations + return + } + } catch (exportErr) { + console.error('Error checking for pending export in alarm:', exportErr) + } + } + + // ── Cron tasks ──────────────────────────────────────────────────────── try { // Fetch all the tasks that are marked to emit an event for this cycle. const task = (await this.executeQuery({ diff --git a/src/export/dump-streaming.ts b/src/export/dump-streaming.ts new file mode 100644 index 0000000..fa9d2b4 --- /dev/null +++ b/src/export/dump-streaming.ts @@ -0,0 +1,445 @@ +import { executeOperation } from '.' +import { StarbaseDBConfiguration } from '../handler' +import { DataSource } from '../types' +import { createResponse } from '../utils' + +export interface ExportState { + exportId: string + status: 'pending' | 'running' | 'completed' | 'failed' + r2Key: string + tables: string[] + currentTableIndex: number + currentOffset: number + callbackUrl?: string + startedAt: number + completedAt?: number + error?: string + // Multipart upload fields + uploadId?: string + partNumber: number + parts: { partNumber: number; etag: string }[] + // Buffer for accumulating chunks before flushing as a part + pendingChunk: string +} + +const BATCH_SIZE = 1000 +// Flush a multipart part when the pending chunk buffer exceeds 5 MB +// (R2 minimum part size is 5 MiB for all parts except the last) +const MIN_PART_SIZE = 5 * 1024 * 1024 +// Safety threshold: if elapsed time exceeds this, save state and set alarm +const TIMEOUT_THRESHOLD_MS = 25_000 + +function generateExportId(): string { + return crypto.randomUUID() +} + +function generateR2Key(): string { + const now = new Date() + const pad = (n: number) => String(n).padStart(2, '0') + const datePart = `${now.getFullYear()}${pad(now.getMonth() + 1)}${pad(now.getDate())}` + const timePart = `${pad(now.getHours())}${pad(now.getMinutes())}${pad(now.getSeconds())}` + return `dump_${datePart}-${timePart}.sql` +} + +function escapeValue(value: unknown): string { + if (value === null || value === undefined) return 'NULL' + if (typeof value === 'number') return String(value) + if (typeof value === 'string') return `'${value.replace(/'/g, "''")}'` + // Booleans stored as integers in SQLite + if (typeof value === 'boolean') return value ? '1' : '0' + return `'${String(value).replace(/'/g, "''")}'` +} + +function buildInsertStatement(table: string, row: Record): string { + const values = Object.values(row).map(escapeValue) + return `INSERT INTO ${table} VALUES (${values.join(', ')});\n` +} + +/** + * Starts or resumes a streaming database export. + * + * Flow: + * 1. Allocate an exportId and R2 key, persist initial state in DO storage. + * 2. Begin a multipart upload on R2. + * 3. Iterate tables / rows in batches; accumulate SQL text in a buffer. + * 4. Flush the buffer as an R2 multipart part whenever it reaches MIN_PART_SIZE. + * 5. If we approach the 30 s Worker timeout, save state and set a DO alarm to + * continue; return 202 Accepted with the exportId. + * 6. On completion, finalise the multipart upload, optionally POST the callback + * URL, and return the file directly if the whole operation finished within the + * request lifetime (< TIMEOUT_THRESHOLD_MS elapsed). + */ +export async function startStreamingDump(opts: { + dataSource: DataSource + config: StarbaseDBConfiguration + r2Bucket: R2Bucket + callbackUrl?: string + /** If provided we are resuming a previously interrupted export */ + resumeExportId?: string +}): Promise { + const { dataSource, config, r2Bucket, callbackUrl, resumeExportId } = opts + + const startTime = Date.now() + + // ── Load or create export state ─────────────────────────────────────────── + let state: ExportState + + if (resumeExportId) { + const stored = await dataSource.rpc.executeQuery({ + sql: `SELECT value FROM tmp_export_state WHERE export_id = ?;`, + params: [resumeExportId], + }) as unknown as { value: string }[] + + if (!stored.length) { + return createResponse(undefined, `Export '${resumeExportId}' not found`, 404) + } + + state = JSON.parse(stored[0].value) as ExportState + if (state.status === 'completed' || state.status === 'failed') { + return createResponse( + { exportId: state.exportId, status: state.status }, + undefined, + 200 + ) + } + state.status = 'running' + } else { + // Ensure the state table exists + await dataSource.rpc.executeQuery({ + sql: `CREATE TABLE IF NOT EXISTS tmp_export_state ( + export_id TEXT PRIMARY KEY, + value TEXT NOT NULL + );`, + }) + + // Fetch all table names upfront so we can track progress + const tablesResult = await executeOperation( + [{ sql: `SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'tmp_%';` }], + dataSource, + config + ) + const tables = (tablesResult as { name: string }[]).map((r) => r.name) + + const exportId = generateExportId() + const r2Key = generateR2Key() + + state = { + exportId, + status: 'running', + r2Key, + tables, + currentTableIndex: 0, + currentOffset: 0, + callbackUrl, + startedAt: startTime, + partNumber: 1, + parts: [], + pendingChunk: '', + } + + // Initiate multipart upload + const multipart = await r2Bucket.createMultipartUpload(r2Key, { + httpMetadata: { contentType: 'application/sql' }, + }) + state.uploadId = multipart.uploadId + + await persistState(dataSource, state) + } + + // ── Resume multipart upload handle ──────────────────────────────────────── + if (!state.uploadId) { + return createResponse(undefined, 'Missing multipart upload ID in export state', 500) + } + + const multipart = r2Bucket.resumeMultipartUpload(state.r2Key, state.uploadId) + + // ── Export loop ─────────────────────────────────────────────────────────── + try { + for ( + let ti = state.currentTableIndex; + ti < state.tables.length; + ti++ + ) { + const table = state.tables[ti] + state.currentTableIndex = ti + + // Write the schema DDL only at the beginning of each table (offset === 0) + if (state.currentOffset === 0) { + const schemaResult = await executeOperation( + [ + { + sql: `SELECT sql FROM sqlite_master WHERE type='table' AND name=?;`, + params: [table], + }, + ], + dataSource, + config + ) + + if (schemaResult.length && (schemaResult[0] as { sql: string }).sql) { + const schemaSql = (schemaResult[0] as { sql: string }).sql + state.pendingChunk += `\n-- Table: ${table}\n${schemaSql};\n\n` + } + } + + // Paginate through rows + let offset = state.currentOffset + let hasMore = true + + while (hasMore) { + // Timeout guard + if (Date.now() - startTime >= TIMEOUT_THRESHOLD_MS) { + state.currentOffset = offset + state.status = 'running' + await persistState(dataSource, state) + // Schedule continuation via DO alarm (5 seconds from now) + await dataSource.rpc.setAlarm(Date.now() + 5_000) + return createResponse( + { + exportId: state.exportId, + status: 'running', + message: 'Export is continuing in the background', + }, + undefined, + 202 + ) + } + + const rows = await executeOperation( + [ + { + sql: `SELECT * FROM ${table} LIMIT ? OFFSET ?;`, + params: [BATCH_SIZE, offset], + }, + ], + dataSource, + config + ) as Record[] + + for (const row of rows) { + state.pendingChunk += buildInsertStatement(table, row) + } + + // Flush to R2 if buffer is large enough + if (state.pendingChunk.length >= MIN_PART_SIZE) { + const encoder = new TextEncoder() + const partBytes = encoder.encode(state.pendingChunk) + const uploadedPart = await multipart.uploadPart( + state.partNumber, + partBytes + ) + state.parts.push({ partNumber: state.partNumber, etag: uploadedPart.etag }) + state.partNumber++ + state.pendingChunk = '' + await persistState(dataSource, state) + } + + if (rows.length < BATCH_SIZE) { + hasMore = false + } else { + offset += BATCH_SIZE + } + } + + // Add trailing newline between tables + state.pendingChunk += '\n' + // Reset offset for the next table + state.currentOffset = 0 + } + + // ── Flush remaining buffer as final part ────────────────────────────── + if (state.pendingChunk.length > 0) { + const encoder = new TextEncoder() + const partBytes = encoder.encode(state.pendingChunk) + const uploadedPart = await multipart.uploadPart( + state.partNumber, + partBytes + ) + state.parts.push({ partNumber: state.partNumber, etag: uploadedPart.etag }) + state.partNumber++ + state.pendingChunk = '' + } + + // ── Complete multipart upload ───────────────────────────────────────── + await multipart.complete(state.parts) + + state.status = 'completed' + state.completedAt = Date.now() + await persistState(dataSource, state) + + // ── Callback notification ───────────────────────────────────────────── + if (state.callbackUrl) { + try { + await fetch(state.callbackUrl, { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + exportId: state.exportId, + status: 'completed', + r2Key: state.r2Key, + }), + }) + } catch (callbackErr) { + console.error('Export callback failed:', callbackErr) + } + } + + // ── If we finished fast enough, stream the file back directly ───────── + const elapsed = Date.now() - startTime + if (elapsed < TIMEOUT_THRESHOLD_MS) { + const object = await r2Bucket.get(state.r2Key) + if (object) { + return new Response(object.body, { + headers: { + 'Content-Type': 'application/sql', + 'Content-Disposition': `attachment; filename="${state.r2Key}"`, + }, + }) + } + } + + return createResponse( + { + exportId: state.exportId, + status: 'completed', + r2Key: state.r2Key, + }, + undefined, + 200 + ) + } catch (error: unknown) { + const message = error instanceof Error ? error.message : String(error) + console.error('Streaming dump error:', error) + state.status = 'failed' + state.error = message + await persistState(dataSource, state) + + // Attempt to abort the multipart upload to clean up R2 storage + try { + await multipart.abort() + } catch (_) { + // best-effort + } + + return createResponse(undefined, `Export failed: ${message}`, 500) + } +} + +/** + * Returns the current status of an export job. + */ +export async function getExportStatus(opts: { + dataSource: DataSource + exportId: string +}): Promise { + const { dataSource, exportId } = opts + + const stored = await dataSource.rpc.executeQuery({ + sql: `SELECT value FROM tmp_export_state WHERE export_id = ?;`, + params: [exportId], + }) as unknown as { value: string }[] + + if (!stored.length) { + return createResponse(undefined, `Export '${exportId}' not found`, 404) + } + + const state = JSON.parse(stored[0].value) as ExportState + + return createResponse( + { + exportId: state.exportId, + status: state.status, + r2Key: state.r2Key, + tablesTotal: state.tables.length, + currentTableIndex: state.currentTableIndex, + startedAt: state.startedAt, + completedAt: state.completedAt, + error: state.error, + }, + undefined, + 200 + ) +} + +/** + * Streams a completed export file from R2 back to the caller. + */ +export async function downloadExport(opts: { + dataSource: DataSource + exportId: string + r2Bucket: R2Bucket +}): Promise { + const { dataSource, exportId, r2Bucket } = opts + + const stored = await dataSource.rpc.executeQuery({ + sql: `SELECT value FROM tmp_export_state WHERE export_id = ?;`, + params: [exportId], + }) as unknown as { value: string }[] + + if (!stored.length) { + return createResponse(undefined, `Export '${exportId}' not found`, 404) + } + + const state = JSON.parse(stored[0].value) as ExportState + + if (state.status !== 'completed') { + return createResponse( + { exportId: state.exportId, status: state.status }, + `Export is not yet complete (current status: ${state.status})`, + 409 + ) + } + + const object = await r2Bucket.get(state.r2Key) + if (!object) { + return createResponse(undefined, 'Export file not found in R2', 404) + } + + return new Response(object.body, { + headers: { + 'Content-Type': 'application/sql', + 'Content-Disposition': `attachment; filename="${state.r2Key}"`, + 'Content-Length': String(object.size), + }, + }) +} + +/** + * Called from the DO alarm handler to continue an in-progress export. + */ +export async function continueExportFromAlarm(opts: { + dataSource: DataSource + config: StarbaseDBConfiguration + r2Bucket: R2Bucket +}): Promise { + const { dataSource, config, r2Bucket } = opts + + // Find any running export + const stored = await dataSource.rpc.executeQuery({ + sql: `SELECT export_id, value FROM tmp_export_state WHERE value LIKE '%"status":"running"%' LIMIT 1;`, + }) as unknown as { export_id: string; value: string }[] + + if (!stored.length) return + + const exportId = stored[0].export_id + + // Re-enter the export loop using the persisted state + await startStreamingDump({ + dataSource, + config, + r2Bucket, + resumeExportId: exportId, + }) +} + +// ── Helpers ─────────────────────────────────────────────────────────────────── + +async function persistState( + dataSource: DataSource, + state: ExportState +): Promise { + await dataSource.rpc.executeQuery({ + sql: `INSERT OR REPLACE INTO tmp_export_state (export_id, value) VALUES (?, ?);`, + params: [state.exportId, JSON.stringify(state)], + }) +} diff --git a/src/export/dump.ts b/src/export/dump.ts index 91a2e89..81decff 100644 --- a/src/export/dump.ts +++ b/src/export/dump.ts @@ -2,11 +2,27 @@ import { executeOperation } from '.' import { StarbaseDBConfiguration } from '../handler' import { DataSource } from '../types' import { createResponse } from '../utils' +import { startStreamingDump } from './dump-streaming' export async function dumpDatabaseRoute( dataSource: DataSource, - config: StarbaseDBConfiguration + config: StarbaseDBConfiguration, + opts?: { + r2Bucket?: R2Bucket + callbackUrl?: string + } ): Promise { + // ── Streaming path: R2 binding is available ─────────────────────────────── + if (opts?.r2Bucket) { + return startStreamingDump({ + dataSource, + config, + r2Bucket: opts.r2Bucket, + callbackUrl: opts.callbackUrl, + }) + } + + // ── Legacy path: no R2 binding, load everything into memory ────────────── try { // Get all table names const tablesResult = await executeOperation( diff --git a/src/handler.ts b/src/handler.ts index 3fa0085..c79619a 100644 --- a/src/handler.ts +++ b/src/handler.ts @@ -7,6 +7,7 @@ import { LiteREST } from './literest' import { executeQuery, executeTransaction } from './operation' import { createResponse, QueryRequest, QueryTransactionRequest } from './utils' import { dumpDatabaseRoute } from './export/dump' +import { getExportStatus, downloadExport } from './export/dump-streaming' import { exportTableToJsonRoute } from './export/json' import { exportTableToCsvRoute } from './export/csv' import { importDumpRoute } from './import/dump' @@ -47,16 +48,20 @@ export class StarbaseDB { private plugins: StarbasePlugin[] private initialized: boolean = false private app: StarbaseApp + private r2Bucket?: R2Bucket constructor(options: { dataSource: DataSource config: StarbaseDBConfiguration plugins?: StarbasePlugin[] + /** Optional R2 bucket for streaming large database exports */ + r2Bucket?: R2Bucket }) { this.dataSource = options.dataSource this.config = options.config this.liteREST = new LiteREST(this.dataSource, this.config) this.plugins = options.plugins || [] + this.r2Bucket = options.r2Bucket this.app = new Hono() if ( @@ -120,10 +125,47 @@ export class StarbaseDB { } if (this.getFeature('export')) { - this.app.get('/export/dump', this.isInternalSource, async () => { - return dumpDatabaseRoute(this.dataSource, this.config) + this.app.get('/export/dump', this.isInternalSource, async (c) => { + const callbackUrl = + c.req.query('callbackUrl') ?? undefined + return dumpDatabaseRoute(this.dataSource, this.config, { + r2Bucket: this.r2Bucket, + callbackUrl, + }) }) + this.app.get( + '/export/status/:exportId', + this.isInternalSource, + async (c) => { + const exportId = c.req.param('exportId') + return getExportStatus({ + dataSource: this.dataSource, + exportId, + }) + } + ) + + this.app.get( + '/export/download/:exportId', + this.isInternalSource, + async (c) => { + if (!this.r2Bucket) { + return createResponse( + undefined, + 'R2 bucket is not configured. Set the EXPORT_BUCKET binding in wrangler.toml.', + 501 + ) + } + const exportId = c.req.param('exportId') + return downloadExport({ + dataSource: this.dataSource, + exportId, + r2Bucket: this.r2Bucket, + }) + } + ) + this.app.get( '/export/json/:tableName', this.isInternalSource, diff --git a/src/index.ts b/src/index.ts index 4d08932..d4d94ee 100644 --- a/src/index.ts +++ b/src/index.ts @@ -56,6 +56,12 @@ export interface Env { HYPERDRIVE: Hyperdrive + /** + * Optional R2 bucket for streaming large database exports. + * Uncomment and configure the [[r2_buckets]] binding in wrangler.toml to enable. + */ + EXPORT_BUCKET?: R2Bucket + // ## DO NOT REMOVE: TEMPLATE INTERFACE ## } @@ -232,6 +238,7 @@ export default { dataSource, config, plugins, + r2Bucket: env.EXPORT_BUCKET, }) const preAuthRequest = await starbase.handlePreAuth(request, ctx) diff --git a/wrangler.toml b/wrangler.toml index 395c4ac..d2389c6 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -78,3 +78,14 @@ AUTH_JWKS_ENDPOINT = "" # [[hyperdrive]] # binding = "HYPERDRIVE" # id = "" + +# R2 bucket for streaming large database exports (issue #59). +# To enable: +# 1. Create a bucket: `wrangler r2 bucket create starbasedb-exports` +# 2. Uncomment the block below and replace with the name +# you chose above. +# 3. Redeploy: `wrangler deploy` +# +# [[r2_buckets]] +# binding = "EXPORT_BUCKET" +# bucket_name = "" From 3e41ba85c46c21a0365dd5dfcd481e5e044288d3 Mon Sep 17 00:00:00 2001 From: ekartgan Date: Thu, 21 May 2026 16:41:02 -0700 Subject: [PATCH 2/2] test: add unit tests for streaming database export 10 tests covering startStreamingDump, getExportStatus, and downloadExport with mocked R2 bucket and data source. Co-Authored-By: Claude Opus 4.6 --- src/export/dump-streaming.test.ts | 299 ++++++++++++++++++++++++++++++ 1 file changed, 299 insertions(+) create mode 100644 src/export/dump-streaming.test.ts diff --git a/src/export/dump-streaming.test.ts b/src/export/dump-streaming.test.ts new file mode 100644 index 0000000..6cf3291 --- /dev/null +++ b/src/export/dump-streaming.test.ts @@ -0,0 +1,299 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest' +import { startStreamingDump, getExportStatus, downloadExport, ExportState } from './dump-streaming' +import { executeOperation } from '.' +import type { DataSource } from '../types' +import type { StarbaseDBConfiguration } from '../handler' + +vi.mock('.', () => ({ + executeOperation: vi.fn(), +})) + +vi.mock('../utils', () => ({ + createResponse: vi.fn( + (data, message, status) => + new Response(JSON.stringify({ result: data, error: message }), { + status, + headers: { 'Content-Type': 'application/json' }, + }) + ), +})) + +let mockDataSource: DataSource +let mockConfig: StarbaseDBConfiguration +let mockR2Bucket: any +let storedState: Map + +beforeEach(() => { + vi.clearAllMocks() + storedState = new Map() + + mockDataSource = { + source: 'internal', + external: undefined, + rpc: { + executeQuery: vi.fn(async (opts: any) => { + // Simulate tmp_export_state table operations + if (opts.sql.includes('CREATE TABLE')) return [] + if (opts.sql.includes('INSERT INTO tmp_export_state') || opts.sql.includes('UPDATE tmp_export_state')) { + if (opts.params) { + const id = opts.params[0] + const value = opts.params[1] || opts.params[0] + storedState.set(String(id), String(value)) + } + return [] + } + if (opts.sql.includes('SELECT') && opts.sql.includes('tmp_export_state')) { + const id = opts.params?.[0] + if (id && storedState.has(String(id))) { + return [{ export_id: id, value: storedState.get(String(id)) }] + } + // Check for running exports + for (const [key, val] of storedState.entries()) { + if (val.includes('"status":"running"')) { + return [{ export_id: key, value: val }] + } + } + return [] + } + return [] + }), + setAlarm: vi.fn(), + getAlarm: vi.fn().mockResolvedValue(null), + deleteAlarm: vi.fn(), + }, + } as any + + mockConfig = { + outerbaseApiKey: 'mock-api-key', + role: 'admin', + features: { allowlist: true, rls: true, rest: true, export: true }, + } + + // Mock R2 bucket + mockR2Bucket = { + createMultipartUpload: vi.fn().mockResolvedValue({ + uploadId: 'test-upload-id', + uploadPart: vi.fn().mockResolvedValue({ etag: 'etag-1' }), + complete: vi.fn().mockResolvedValue({}), + abort: vi.fn().mockResolvedValue({}), + }), + resumeMultipartUpload: vi.fn().mockReturnValue({ + uploadPart: vi.fn().mockResolvedValue({ etag: 'etag-2' }), + complete: vi.fn().mockResolvedValue({}), + abort: vi.fn().mockResolvedValue({}), + }), + put: vi.fn().mockResolvedValue({}), + get: vi.fn().mockResolvedValue({ + body: new ReadableStream({ + start(controller) { + controller.enqueue(new TextEncoder().encode('-- dump content')) + controller.close() + }, + }), + size: 14, + }), + } +}) + +describe('Streaming Database Export', () => { + describe('startStreamingDump', () => { + it('should initiate export and return response', async () => { + // Mock a database with tables + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'users' }]) + .mockResolvedValueOnce([ + { sql: 'CREATE TABLE users (id INTEGER, name TEXT);' }, + ]) + .mockResolvedValueOnce([ + { id: 1, name: 'Alice' }, + { id: 2, name: 'Bob' }, + ]) + .mockResolvedValueOnce([]) + + const response = await startStreamingDump({ + dataSource: mockDataSource, + config: mockConfig, + r2Bucket: mockR2Bucket, + }) + + // Should initiate the export successfully (200 or 202) + expect(response).toBeInstanceOf(Response) + expect([200, 202]).toContain(response.status) + }) + + it('should create multipart upload on R2', async () => { + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'test_table' }]) + .mockResolvedValueOnce([ + { sql: 'CREATE TABLE test_table (id INTEGER);' }, + ]) + .mockResolvedValueOnce([{ id: 1 }, { id: 2 }]) + .mockResolvedValueOnce([]) + + await startStreamingDump({ + dataSource: mockDataSource, + config: mockConfig, + r2Bucket: mockR2Bucket, + }) + + expect(mockR2Bucket.createMultipartUpload).toHaveBeenCalled() + }) + + it('should handle empty database gracefully', async () => { + vi.mocked(executeOperation).mockResolvedValueOnce([]) + + const response = await startStreamingDump({ + dataSource: mockDataSource, + config: mockConfig, + r2Bucket: mockR2Bucket, + }) + + expect(response).toBeInstanceOf(Response) + // Should not crash - either returns empty dump or success message + expect([200, 202]).toContain(response.status) + }) + + it('should accept callbackUrl parameter', async () => { + vi.mocked(executeOperation) + .mockResolvedValueOnce([{ name: 'small_table' }]) + .mockResolvedValueOnce([ + { sql: 'CREATE TABLE small_table (id INTEGER);' }, + ]) + .mockResolvedValueOnce([{ id: 1 }]) + .mockResolvedValueOnce([]) + + const response = await startStreamingDump({ + dataSource: mockDataSource, + config: mockConfig, + r2Bucket: mockR2Bucket, + callbackUrl: 'https://example.com/webhook', + }) + + expect(response).toBeInstanceOf(Response) + }) + + it('should handle database errors gracefully', async () => { + vi.mocked(executeOperation).mockRejectedValue( + new Error('Database connection failed') + ) + + const response = await startStreamingDump({ + dataSource: mockDataSource, + config: mockConfig, + r2Bucket: mockR2Bucket, + }) + + // Implementation may return 200 with error in body or 500 + expect([200, 500]).toContain(response.status) + }) + }) + + describe('getExportStatus', () => { + it('should return status for existing export', async () => { + const testState: ExportState = { + exportId: 'test-123', + status: 'running', + r2Key: 'dump_20260521-120000.sql', + tables: ['users', 'orders'], + currentTableIndex: 1, + currentOffset: 500, + startedAt: Date.now(), + partNumber: 2, + parts: [{ partNumber: 1, etag: 'etag-1' }], + pendingChunk: '', + } + + storedState.set('test-123', JSON.stringify(testState)) + + const response = await getExportStatus({ + exportId: 'test-123', + dataSource: mockDataSource, + }) + + expect(response).toBeInstanceOf(Response) + const json = await response.json() + expect(json.result.status).toBe('running') + expect(json.result.currentTableIndex).toBe(1) + }) + + it('should return 404 for non-existent export', async () => { + const response = await getExportStatus({ + exportId: 'non-existent', + dataSource: mockDataSource, + }) + + expect(response.status).toBe(404) + }) + }) + + describe('downloadExport', () => { + it('should stream R2 object for completed export', async () => { + const testState: ExportState = { + exportId: 'done-123', + status: 'completed', + r2Key: 'dump_20260521-120000.sql', + tables: ['users'], + currentTableIndex: 1, + currentOffset: 0, + startedAt: Date.now() - 5000, + completedAt: Date.now(), + partNumber: 1, + parts: [], + pendingChunk: '', + } + + storedState.set('done-123', JSON.stringify(testState)) + + const response = await downloadExport({ + exportId: 'done-123', + dataSource: mockDataSource, + r2Bucket: mockR2Bucket, + }) + + expect(response).toBeInstanceOf(Response) + // Content type may be application/sql or application/x-sqlite3 + expect(['application/sql', 'application/x-sqlite3']).toContain( + response.headers.get('Content-Type') + ) + expect(response.headers.get('Content-Disposition')).toContain('dump_') + + const text = await response.text() + expect(text).toBe('-- dump content') + }) + + it('should return 404 for non-existent export', async () => { + const response = await downloadExport({ + exportId: 'missing', + dataSource: mockDataSource, + r2Bucket: mockR2Bucket, + }) + + expect(response.status).toBe(404) + }) + + it('should return 409 for incomplete export', async () => { + const testState: ExportState = { + exportId: 'running-123', + status: 'running', + r2Key: 'dump_20260521-120000.sql', + tables: ['users'], + currentTableIndex: 0, + currentOffset: 500, + startedAt: Date.now(), + partNumber: 1, + parts: [], + pendingChunk: '', + } + + storedState.set('running-123', JSON.stringify(testState)) + + const response = await downloadExport({ + exportId: 'running-123', + dataSource: mockDataSource, + r2Bucket: mockR2Bucket, + }) + + expect(response.status).toBe(409) + }) + }) +})