From 8701ee18e04d4e4fbce88a5adaac997901f2f8c7 Mon Sep 17 00:00:00 2001 From: Pedro Augusto <53156040+pedro3pv@users.noreply.github.com> Date: Thu, 9 Apr 2026 15:49:42 -0300 Subject: [PATCH] feat(converters): add parquet to csv conversion with compressor support --- bun.lock | 14 ++++ package.json | 3 + src/converters/main.ts | 5 ++ src/converters/parquet.ts | 118 +++++++++++++++++++++++++++++++ tests/converters/parquet.test.ts | 81 +++++++++++++++++++++ 5 files changed, 221 insertions(+) create mode 100644 src/converters/parquet.ts create mode 100644 tests/converters/parquet.test.ts diff --git a/bun.lock b/bun.lock index abf42043..22aa5172 100644 --- a/bun.lock +++ b/bun.lock @@ -1,5 +1,6 @@ { "lockfileVersion": 1, + "configVersion": 0, "workspaces": { "": { "name": "convertx-frontend", @@ -8,7 +9,10 @@ "@elysiajs/jwt": "^1.4.1", "@elysiajs/static": "^1.4.7", "@kitajs/html": "^4.2.13", + "csv-stringify": "^6.7.0", "elysia": "1.4.22", + "hyparquet": "^1.25.6", + "hyparquet-compressors": "^1.1.1", "sanitize-filename": "^1.6.4", "tar": "^7.5.13", }, @@ -277,6 +281,8 @@ "csstype": ["csstype@3.2.3", "", {}, "sha512-z1HGKcYy2xA8AGQfwrn0PAy+PB7X/GSj3UVJW9qKyn43xWa+gl5nXmU4qqLMRzWVLFC8KusUX8T/0kCiOYpAIQ=="], + "csv-stringify": ["csv-stringify@6.7.0", "", {}, "sha512-UdtziYp5HuTz7e5j8Nvq+a/3HQo+2/aJZ9xntNTpmRRIg/3YYqDVgiS9fvAhtNbnyfbv2ZBe0bqCHqzhE7FqWQ=="], + "debug": ["debug@4.4.3", "", { "dependencies": { "ms": "^2.1.3" } }, "sha512-RGwwWnwQvkVfavKVt22FGLw+xYSdzARwm0ru6DhTVA3umU5hZc28V3kO4stgYryrTlLpuvgI9GiijltAjNbcqA=="], "deep-is": ["deep-is@0.1.4", "", {}, "sha512-oIPzksmTg4/MriiaYGO+okXDT7ztn/w3Eptv/+gSIdMdKsJo0u4CfYNFJPy+4SKMuCqGw2wxnA+URMg3t8a/bQ=="], @@ -343,6 +349,8 @@ "formatly": ["formatly@0.3.0", "", { "dependencies": { "fd-package-json": "^2.0.0" }, "bin": { "formatly": "bin/index.mjs" } }, "sha512-9XNj/o4wrRFyhSMJOvsuyMwy8aUfBaZ1VrqHVfohyXf0Sw0e+yfKG+xZaY3arGCOMdwFsqObtzVOc1gU9KiT9w=="], + "fzstd": ["fzstd@0.1.1", "", {}, "sha512-dkuVSOKKwh3eas5VkJy1AW1vFpet8TA/fGmVA5krThl8YcOVE/8ZIoEA1+U1vEn5ckxxhLirSdY837azmbaNHA=="], + "get-caller-file": ["get-caller-file@2.0.5", "", {}, "sha512-DyFP3BM/3YHTQOCUL/w0OZHR0lpKeGrxotcHWcqNEdnltqFwXVfhEBQ94eIo34AfQpo0rGki4cyIiftY06h2Fg=="], "get-east-asian-width": ["get-east-asian-width@1.5.0", "", {}, "sha512-CQ+bEO+Tva/qlmw24dCejulK5pMzVnUOFOijVogd3KQs07HnRIgp8TGipvCCRT06xeYEbpbgwaCxglFyiuIcmA=="], @@ -355,6 +363,12 @@ "has-flag": ["has-flag@4.0.0", "", {}, "sha512-EykJT/Q1KjTWctppgIAgfSO0tKVuZUjhgMr17kqTumMl6Afv3EISleU7qZUzoXDFTAHTDC4NOoG/ZxU3EvlMPQ=="], + "hyparquet": ["hyparquet@1.25.6", "", {}, "sha512-Q9W5IjkVch3ZMnYd4qFv2q8suu5Jc36yt7J+zUNM9grwnP1S189icp0jdEQKM5HJvQkTVy8NMiQ8n/dM5QAt1A=="], + + "hyparquet-compressors": ["hyparquet-compressors@1.1.1", "", { "dependencies": { "fzstd": "0.1.1", "hysnappy": "1.0.0" } }, "sha512-yx7aA3Rhj0YycbdV71+XznQSLAefa4cT0urpgNXy4aM6eSeCknaVDNne8y45Uz74Fb15yyXUzOStlceOJBan7A=="], + + "hysnappy": ["hysnappy@1.0.0", "", {}, "sha512-MNrC4NfwDGPb889O6gIfEtbvEZCSWUsSEhsz4Oq2FRcpGtXHfeVz3KciSPp5Pnnz1NjFMgDQNfxdJozymJEDDA=="], + "ieee754": ["ieee754@1.2.1", "", {}, "sha512-dcyqhDvX1C46lXZcVqCpK+FtMRQVdIMN6/Df5js2zouUsqG7I6sFxitIC+7KYK29KdXOLHdu9zL4sFnoVQnqaA=="], "ignore": ["ignore@5.3.2", "", {}, "sha512-hsBTNUqQTDwkWtcdYI2i06Y/nUBEsNEDJKjWdigLvegy8kDuJAS8uRlpkkcQpyEXL0Z/pjDy5HBmMjRCJ2gq+g=="], diff --git a/package.json b/package.json index fb0dacf6..6ea69541 100644 --- a/package.json +++ b/package.json @@ -20,7 +20,10 @@ "@elysiajs/jwt": "^1.4.1", "@elysiajs/static": "^1.4.7", "@kitajs/html": "^4.2.13", + "csv-stringify": "^6.7.0", "elysia": "1.4.22", + "hyparquet": "^1.25.6", + "hyparquet-compressors": "^1.1.1", "sanitize-filename": "^1.6.4", "tar": "^7.5.13" }, diff --git a/src/converters/main.ts b/src/converters/main.ts index 711b5594..a49b2f59 100644 --- a/src/converters/main.ts +++ b/src/converters/main.ts @@ -25,6 +25,7 @@ import { convert as convertVtracer, properties as propertiesVtracer } from "./vt import { convert as convertVcf, properties as propertiesVcf } from "./vcf"; import { convert as convertxelatex, properties as propertiesxelatex } from "./xelatex"; import { convert as convertMarkitdown, properties as propertiesMarkitdown } from "./markitdown"; +import { convert as convertParquet, properties as propertiesParquet } from "./parquet"; // This should probably be reconstructed so that the functions are not imported instead the functions hook into this to make the converters more modular @@ -137,6 +138,10 @@ const properties: Record< properties: propertiesMarkitdown, converter: convertMarkitdown, }, + parquet: { + properties: propertiesParquet, + converter: convertParquet, + }, }; function chunks(arr: T[], size: number): T[][] { diff --git a/src/converters/parquet.ts b/src/converters/parquet.ts new file mode 100644 index 00000000..65522868 --- /dev/null +++ b/src/converters/parquet.ts @@ -0,0 +1,118 @@ +import * as fs from "node:fs/promises"; +import { createWriteStream } from "node:fs"; +import { stringify } from "csv-stringify"; +import { parquetMetadataAsync, parquetRead } from "hyparquet"; +import { compressors } from "hyparquet-compressors"; + +export const properties = { + from: { + data: ["parquet"], + }, + to: { + data: ["csv"], + }, +}; + +export async function convert( + filePath: string, + fileType: string, + convertTo: string, + targetPath: string, + options?: unknown, // eslint-disable-line @typescript-eslint/no-unused-vars +): Promise { + const fileHandle = await fs.open(filePath, "r"); + try { + const stat = await fileHandle.stat(); + const file = { + byteLength: stat.size, + async slice(start: number, end?: number): Promise { + const length = (end ?? stat.size) - start; + const buf = Buffer.allocUnsafe(length); + const { bytesRead } = await fileHandle.read(buf, 0, length, start); + return buf.buffer.slice(buf.byteOffset, buf.byteOffset + bytesRead); + }, + }; + + const metadata = await parquetMetadataAsync(file, { compressors } as never); + const stringifier = stringify({ + header: true, + cast: { + object: (value: unknown) => + JSON.stringify(value, (_key: string, val: unknown) => + typeof val === "bigint" ? Number(val) : val, + ), + }, + }); + const writeStream = createWriteStream(targetPath); + + return new Promise((resolve, reject) => { + stringifier.pipe(writeStream); + + let settled = false; + const settle = (err?: unknown) => { + if (settled) return; + settled = true; + fileHandle.close().catch(() => {}); + if (err) reject(err); + else resolve("Done"); + }; + + writeStream.on("finish", settle); + writeStream.on("error", settle); + stringifier.on("error", settle); + + const writeRow = async (rawRow: Record) => { + const row: Record = {}; + for (const [key, value] of Object.entries(rawRow)) { + row[key] = typeof value === "bigint" ? Number(value) : value; + } + if (stringifier.write(row)) return; + await new Promise((resolve) => { + stringifier.once("drain", resolve); + }); + }; + + const writePromises: Promise[] = []; + + (async () => { + try { + let rowStart = 0; + for (const rowGroup of metadata.row_groups) { + const numRows = Number(rowGroup.num_rows); + const rowEnd = rowStart + numRows; + + await parquetRead({ + file, + rowStart, + rowEnd, + rowFormat: "object", + compressors, + onComplete: (rows) => { + if (Array.isArray(rows)) { + writePromises.push( + (async () => { + for (const row of rows) { + await writeRow(row); + } + })(), + ); + } + }, + }); + rowStart = rowEnd; + } + + await Promise.all(writePromises); + stringifier.end(); + } catch (err) { + settle(err); + writeStream.destroy(err as Error); + stringifier.destroy(err as Error); + } + })(); + }); + } catch (err) { + await fileHandle.close(); + throw err; + } +} diff --git a/tests/converters/parquet.test.ts b/tests/converters/parquet.test.ts new file mode 100644 index 00000000..a4213a64 --- /dev/null +++ b/tests/converters/parquet.test.ts @@ -0,0 +1,81 @@ +import { expect, test, describe, mock } from "bun:test"; +import { Writable } from "node:stream"; + +const mockParquetMetadataAsync = mock(async () => { + return { + row_groups: [{ num_rows: 1 }], + }; +}); + +const mockParquetRead = mock( + async (options: { + file: unknown; + rowStart: number; + rowEnd: number; + rowFormat: string; + onComplete: (rows: { col1: string }[]) => void; + }) => { + if (options.onComplete) { + options.onComplete([{ col1: "value" }]); + } + return []; + }, +); + +const mockCreateWriteStream = mock(() => { + return new Writable({ + write(_chunk, _encoding, callback) { + callback(); + }, + }); +}); + +const mockFileHandle = { + stat: mock(async () => ({ size: 100 })), + read: mock(async (buf: Buffer) => { + return { bytesRead: buf.length, buffer: buf }; + }), + close: mock(async () => {}), +}; + +mock.module("node:fs/promises", () => ({ + open: mock(async () => mockFileHandle), +})); + +mock.module("hyparquet", () => { + return { + parquetMetadataAsync: mockParquetMetadataAsync, + parquetRead: mockParquetRead, + }; +}); + +mock.module("node:fs", () => { + return { + createWriteStream: mockCreateWriteStream, + }; +}); + +import { convert } from "../../src/converters/parquet"; + +describe("parquet converter", () => { + test("convert resolves when process succeeds", async () => { + const result = await convert("input.parquet", "parquet", "csv", "output.csv"); + expect(result).toBe("Done"); + }); + + test("convert rejects when parquetRead fails", async () => { + mockParquetRead.mockRejectedValueOnce(new Error("Read error")); + + await expect(convert("invalid.parquet", "parquet", "csv", "output.csv")).rejects.toThrow( + "Read error", + ); + }); + + test("convert rejects when metadata fails", async () => { + mockParquetMetadataAsync.mockRejectedValueOnce(new Error("Metadata error")); + + await expect(convert("invalid.parquet", "parquet", "csv", "output.csv")).rejects.toThrow( + "Metadata error", + ); + }); +});