Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
"@elysiajs/jwt": "^1.4.1",
"@elysiajs/static": "^1.4.7",
"@kitajs/html": "^4.2.13",
"csv-stringify": "^6.7.0",
"elysia": "1.4.22",
"hyparquet": "^1.25.6",
Comment thread
pedro3pv marked this conversation as resolved.
"hyparquet-compressors": "^1.1.1",
"sanitize-filename": "^1.6.4",
"tar": "^7.5.13"
},
Expand Down
5 changes: 5 additions & 0 deletions src/converters/main.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import { convert as convertVtracer, properties as propertiesVtracer } from "./vt
import { convert as convertVcf, properties as propertiesVcf } from "./vcf";
import { convert as convertxelatex, properties as propertiesxelatex } from "./xelatex";
import { convert as convertMarkitdown, properties as propertiesMarkitdown } from "./markitdown";
import { convert as convertParquet, properties as propertiesParquet } from "./parquet";

// This should probably be reconstructed so that the functions are not imported instead the functions hook into this to make the converters more modular

Expand Down Expand Up @@ -137,6 +138,10 @@ const properties: Record<
properties: propertiesMarkitdown,
converter: convertMarkitdown,
},
parquet: {
properties: propertiesParquet,
converter: convertParquet,
},
};

function chunks<T>(arr: T[], size: number): T[][] {
Expand Down
118 changes: 118 additions & 0 deletions src/converters/parquet.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import * as fs from "node:fs/promises";
import { createWriteStream } from "node:fs";
import { stringify } from "csv-stringify";
import { parquetMetadataAsync, parquetRead } from "hyparquet";
import { compressors } from "hyparquet-compressors";

export const properties = {
from: {
data: ["parquet"],
},
to: {
data: ["csv"],
},
};

export async function convert(
filePath: string,
fileType: string,
convertTo: string,
targetPath: string,
options?: unknown, // eslint-disable-line @typescript-eslint/no-unused-vars
): Promise<string> {
const fileHandle = await fs.open(filePath, "r");
try {
const stat = await fileHandle.stat();
const file = {
byteLength: stat.size,
async slice(start: number, end?: number): Promise<ArrayBuffer> {
const length = (end ?? stat.size) - start;
const buf = Buffer.allocUnsafe(length);
const { bytesRead } = await fileHandle.read(buf, 0, length, start);
return buf.buffer.slice(buf.byteOffset, buf.byteOffset + bytesRead);
},
};

const metadata = await parquetMetadataAsync(file, { compressors } as never);
const stringifier = stringify({
header: true,
cast: {
object: (value: unknown) =>
JSON.stringify(value, (_key: string, val: unknown) =>
typeof val === "bigint" ? Number(val) : val,
),
},
});
const writeStream = createWriteStream(targetPath);

return new Promise((resolve, reject) => {
stringifier.pipe(writeStream);

let settled = false;
const settle = (err?: unknown) => {
if (settled) return;
settled = true;
fileHandle.close().catch(() => {});
if (err) reject(err);
else resolve("Done");
};

writeStream.on("finish", settle);
writeStream.on("error", settle);
stringifier.on("error", settle);

const writeRow = async (rawRow: Record<string, unknown>) => {
const row: Record<string, unknown> = {};
for (const [key, value] of Object.entries(rawRow)) {
row[key] = typeof value === "bigint" ? Number(value) : value;
}
if (stringifier.write(row)) return;
await new Promise<void>((resolve) => {
stringifier.once("drain", resolve);
});
};

const writePromises: Promise<void>[] = [];

(async () => {
try {
let rowStart = 0;
for (const rowGroup of metadata.row_groups) {
const numRows = Number(rowGroup.num_rows);
const rowEnd = rowStart + numRows;

await parquetRead({
file,
rowStart,
rowEnd,
rowFormat: "object",
compressors,
onComplete: (rows) => {
if (Array.isArray(rows)) {
writePromises.push(
(async () => {
for (const row of rows) {
await writeRow(row);
}
})(),
);
}
},
});
rowStart = rowEnd;
}

await Promise.all(writePromises);
stringifier.end();
} catch (err) {
settle(err);
writeStream.destroy(err as Error);
stringifier.destroy(err as Error);
}
})();
});
} catch (err) {
await fileHandle.close();
throw err;
}
}
81 changes: 81 additions & 0 deletions tests/converters/parquet.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import { expect, test, describe, mock } from "bun:test";
import { Writable } from "node:stream";

const mockParquetMetadataAsync = mock(async () => {
return {
row_groups: [{ num_rows: 1 }],
};
});

const mockParquetRead = mock(
async (options: {
file: unknown;
rowStart: number;
rowEnd: number;
rowFormat: string;
onComplete: (rows: { col1: string }[]) => void;
}) => {
if (options.onComplete) {
options.onComplete([{ col1: "value" }]);
}
return [];
},
);

const mockCreateWriteStream = mock(() => {
return new Writable({
write(_chunk, _encoding, callback) {
callback();
},
});
});

const mockFileHandle = {
stat: mock(async () => ({ size: 100 })),
read: mock(async (buf: Buffer) => {
return { bytesRead: buf.length, buffer: buf };
}),
close: mock(async () => {}),
};

mock.module("node:fs/promises", () => ({
open: mock(async () => mockFileHandle),
}));

mock.module("hyparquet", () => {
return {
parquetMetadataAsync: mockParquetMetadataAsync,
parquetRead: mockParquetRead,
};
});

mock.module("node:fs", () => {
return {
createWriteStream: mockCreateWriteStream,
};
});

import { convert } from "../../src/converters/parquet";

describe("parquet converter", () => {
test("convert resolves when process succeeds", async () => {
const result = await convert("input.parquet", "parquet", "csv", "output.csv");
expect(result).toBe("Done");
});
Comment thread
pedro3pv marked this conversation as resolved.

test("convert rejects when parquetRead fails", async () => {
mockParquetRead.mockRejectedValueOnce(new Error("Read error"));

await expect(convert("invalid.parquet", "parquet", "csv", "output.csv")).rejects.toThrow(
"Read error",
);
});

test("convert rejects when metadata fails", async () => {
mockParquetMetadataAsync.mockRejectedValueOnce(new Error("Metadata error"));

await expect(convert("invalid.parquet", "parquet", "csv", "output.csv")).rejects.toThrow(
"Metadata error",
);
});
});
Loading