Rewrite the parquet output adapter manager#712
Conversation
ParquetWriter builds RecordBatches and hands them to a pluggable RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++ file-writer hierarchy and unifies output conversion via visitCspValueType. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove dead StructColumnArrayBuilder and parquet_dict_basket_output_adapter, de-virtualize scheduleEndCycleEvent, propagate file metadata to per-column files in split mode, and expand output tests. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Closure-based C++ sink writes parquet/IPC/split-column files directly, removing the per-batch C++<->Python hop. Adds FileExistsError. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
From the multi-model review:
- delete the now-dead Python sink (_parquet_rb_writer.py, rb_sink wiring,
TestOutputSinkDirect) since file I/O is all C++ now
- RecordBatchFileSink: guard mkdir(""), fix close-path exception safety,
resolve compression via Arrow's Codec API (case-insensitive)
- honor an explicit writeTimestampColumn
- fail fast on single-file + dict basket
- add regression tests
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Remove the unused manager-level m_indexSink/setIndexSink (per-basket index sinks are unaffected) and stop all writers before destroying any in stop(). Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Encapsulate the scratch isSet reset behind StructField::clearIsSet, add a debug-only length assert in buildRecordBatch, and document that file_visitor runs synchronously on the engine thread. Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
ptomecek
left a comment
There was a problem hiding this comment.
I went through the whole output path (ParquetWriter -> RecordBatchSink -> makeFileSink), the dict-basket teardown, the Python wiring, and compared everything against the old FileWriterWrapper hierarchy. This is a clean rewrite. Each of the hardening fixes is real and correctly done: the mkdir guard for bare filenames, the close-path exception safety (no double-close, all sub-writers and the stream get closed even if one throws), compression resolved through Arrow's codec API, the explicit timestamp column being honored, the single-file + dict-basket fast failure, FileExistsError on overwrite, and the stop()-stops-everything-before-destroying-anything teardown. Arrow Status/Result values are checked at every call site. I didn't find any correctness, resource, or concurrency bugs.
Two low-severity things inline, plus one parity note I couldn't attach to a line:
Nested-struct field ordering: ArrowFieldWriter::NestedStructWriter (in ArrowFieldWriter.cpp, which isn't part of this PR) orders the arrow struct's child fields by declaration order, whereas the old code used the struct's memory-layout order. It reads back fine through the new name-based reader, but the on-disk child order differs from files written by older csp, which matters for anything reading those columns positionally. A test that pins down the struct-within-struct schema shape would be worth adding.
Overall this looks good to merge. The one thing I'd sort out first is the silently-dropped column below, since it turns an error into missing data.
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
|
Looks good to me now |
AdamGlustein
left a comment
There was a problem hiding this comment.
As a mere human who attempted reading through this code, I find it very difficult to follow with the lambda-based approach. First, figuring out what function is actually being called is not easy, and deciphering what actually makes it into the closure of that function is even harder. It would be much simpler (in my opinion) to make these callbacks member functions and replace the lambda capture variables with member variables for these classes.
Also, I think some of the complexity here was necessitated by being backwards compatible with the original Parquet adapter. In my opinion, I think we can relax that constraint. There are a lot of overly specific things (like symbol columns, etc.) that we don't need to support and can just drop completely from the code. With a minor version bump we're not going to break any contracts, and I highly doubt some of these vestigial features are used by anyone anymore.
| catch( ... ) { if( !firstError ) firstError = std::current_exception(); } | ||
| }; | ||
|
|
||
| auto && indexBuilder = m_cycleIndexOutputAdapter -> getColumnArrayBuilder( 0 ); |
There was a problem hiding this comment.
This code is the same as cpp/csp/adapters/parquet/ParquetDictBasketOutputWriter.cpp:105, make it a private helper method and call it flushIndexBatch() or something
There was a problem hiding this comment.
Done — extracted the duplicated index-flush block into a private flushIndexBatch() helper. It's now called from onEndCycle() (chunk full), onFileNameChange() (rotation), and stop() (final flush), so the build-RecordBatch-and-hand-to-sink logic lives in one place.
| auto field = arrowBuilder -> scratchField(); | ||
| m_columnArrayBuilder = arrowBuilder; | ||
|
|
||
| csp::adapters::arrow::visitCspValueType( type -> type(), |
There was a problem hiding this comment.
I don't think we need m_type at all in this class anymore, we just use it to check if the type is supported at construction time and we have type available in the constructor anyways (just makes it more confusing that we check type -> type() for validity but throw the error with m_type -> type(); from what I see these are always the same)
There was a problem hiding this comment.
Done — removed m_type from ParquetOutputHandler entirely. The base ctor no longer takes or stores the type, and the unsupported-type error now reports the local type param, so the validity check and the error message reference the same value.
|
|
||
| RecordBatchSink sink; | ||
| sink.onStart = [schemaHolder]( const std::shared_ptr<::arrow::Schema> & schema ) { *schemaHolder = schema; }; | ||
| sink.onBatch = [current]( const std::shared_ptr<::arrow::RecordBatch> & rb ) |
There was a problem hiding this comment.
Some of the design here seems overly complicated to me, for example why do all of these callbacks (onStart, onBatch, onFileChange) need to be stored as lambda function members on the object? Can't they just be plain old member functions on the RecordBatchSink? I find it's quite hard to follow this design pattern
There was a problem hiding this comment.
Done — reworked this away from the closure-based design. The four callbacks are now plain member functions (onStart/onBatch/onFileChange/onStop) and the previously-captured state is member variables (m_schema, m_current, m_currentPath, plus the config). No more lambda captures to trace through.
I also collapsed the abstract RecordBatchSink interface and the concrete RecordBatchFileSink into a single concrete RecordBatchSink class — there's only one implementation, so the interface/unique_ptr/virtual dispatch wasn't earning its keep. The file is now RecordBatchSink.cpp.
| initFileWriterContainer( arrow::schema( arrowFields, m_fileMetaData ) ); | ||
|
|
||
| m_schema = ::arrow::schema( arrowFields, m_fileMetaData ); | ||
| if( m_sink.onStart ) |
There was a problem hiding this comment.
This code is quite confusing to me.
All onStart does (from what I can see) is transfer the schema to the sink object.
Yet m_fileOpen is true after. I can't even really see where we open the file in onFileChange either.
Overall let's get rid of all these lambda function callbacks if we can, trying to figure out a) which function is actually being used and b) the closure, is a nightmare when looking at the code.
There was a problem hiding this comment.
Done — start() should read clearly now: it hands the schema to the sink via onStart(), then does m_fileOpen = m_sink->onFileChange(fileName) to open the first file (if any). The actual file open happens inside RecordBatchSink::onFileChange (via openWriter()), and m_fileOpen is set from that method's bool return rather than being inferred. With the callbacks now being ordinary member functions, it's no longer a question of which lambda runs.
| flushBatch(); | ||
| if( m_sink.onFileChange ) | ||
| m_sink.onFileChange( fileName ); | ||
| m_fileOpen = !fileName.empty(); |
There was a problem hiding this comment.
This seems like a weird way of checking if the file change was successful (emptying the filename in the function that was called?). Why not just return a bool from m_sink.onFileChange if it was successful?
There was a problem hiding this comment.
Done — onFileChange now returns bool (true if a file is open after the call, i.e. the path was non-empty). ParquetWriter sets m_fileOpen directly from that return value, so the "empty the filename to signal success" trick is gone.
There was a problem hiding this comment.
Don't see us using this, we just hardcode 2.6 at cpp/csp/adapters/parquet/RecordBatchFileSink.cpp:83
There was a problem hiding this comment.
Done — removed the unused PARQUET_VERSION class attribute and the _get_default_parquet_version() helper, along with the now-dead importlib.metadata / packaging imports. As you noted, the C++ writer hardcodes PARQUET_2_6, so this value never flowed anywhere.
| { | ||
| switch( m_type -> type() ) | ||
| bool isBytes = false; | ||
| CspTypePtr effectiveType = type; |
There was a problem hiding this comment.
Any reason for this effectiveType variable?
There was a problem hiding this comment.
No reason — it was just an unmodified copy of type. Removed it and pass type straight to createArrowBackedArrayBuilder.
Signed-off-by: Arham Chopra <arham.chopra@cubistsystematic.com>
Rewrite the parquet output adapter for RecordBatch-based writing
Replaces the old per-format C++ file-writer hierarchy (
FileWriterWrapper/ParquetFileWriterWrapper/ArrowIPCFileWriterWrapper/FileWriterWrapperContainer) with a RecordBatch sink architecture:ParquetWriterbuilds ArrowRecordBatches and hands them to a smallRecordBatchSinkcallback interface, and a single closure-based C++ sink (makeFileSink) writes parquet files, Arrow IPC streams, and split-column directories. This mirrors the input-side rewrite in #704 and shares the same Arrow type machinery.Motivation
The old output path had:
FileWriterWrapperContainerto fan out to split-column filesDialectGenericListWriterInterfacefor list columns, duplicating type dispatch already implemented in the shared Arrow layerStructColumnArrayBuilder) that re-implemented nested-struct serialization the Arrow nodes already doparquet_dict_basket_output_adapter(dead since basket output goes through theparquet_dict_basket_writernode)The new implementation:
arrow::RecordBatchperbatch_sizerows and writes it through a 4-callback sink, so the file backend is a single, swappable componentArrowFieldWriter(viaArrowBackedArrayBuilder) for every column type — the same serialization code thestruct_to_record_batchesArrow node and the input adapter use (scalars, structs, nested structs, lists)Architecture
RecordBatchSink(new) is a struct of four callbacks (onStart/onBatch/onFileChange/onStop) — the only contract between the writer and the file backend.makeFileSink(new,RecordBatchFileSink.cpp) returns a closure-based sink (no class hierarchy) that owns overwrite checks, parent-directory creation, file rotation, compression, and the optionalfile_visitor. Compression is resolved through Arrow's ownarrow::util::CodecAPI rather than a hardcoded name map.ArrowBackedArrayBuilder(new) bridges csp's row-at-a-time "may-not-tick → null" model onto the batch-oriented sharedArrowFieldWriter, in two modes: scratch (single-field struct the tick writes into) and external (reads a field directly from a published struct).ParquetOutputAdapterManageris simplified to orchestrate the writer + per-basket dict writers and wire the sink (and a per-basket sink factory) at construction.What's removed
FileWriterWrapper/ParquetFileWriterWrapper/ArrowIPCFileWriterWrapper/FileWriterWrapperContainer— the old per-format C++ file-writer hierarchyDialectGenericListWriterInterface— list columns now go throughArrowBackedArrayBuilderStructColumnArrayBuilder— nested-struct columns are written by the sharedArrowFieldWriter::NestedStructWriterparquet_dict_basket_output_adapter— registered but unreachable dead adapterParquetOutputAdapter.cpp/ArrowSingleColumnArrayBuilder.hper-type boilerplate, folded into the shared Arrow writerBug fixes / hardening
Surfaced by a multi-model review of the new sink:
mkdir("")on a bare filename — writing to a relative path with no directory component (e.g."out.parquet") no longer fails withInvalid argument; the empty dirname is guarded.file_visitorruns (no double-close if it throws), and the output stream / all split sub-writers are always closed even if one close fails (no leaked file descriptors).arrow::util::Codec::GetCompressionType+IsAvailable(case-insensitive, tracks whatever the Arrow build supports, clear error otherwise) instead of a hardcoded lowercase map.writeTimestampColumnhonored — an explicitly requested timestamp column is no longer silently downgraded.split_columns_to_files=Truenow raises a clear error instead of a low-levelIOError.FileExistsError— writing over an existing file withallow_overwrite=Falseraises PythonFileExistsError.stop()flush/close all writers before destroying any of them.Performance
This is a maintainability/simplification change, not a performance optimization, and it is performance-neutral relative to
main:WriteTable/ IPC serialization). That code is unchanged here and produces byte-identical output tomain, so end-to-end write time is at parity withmain.mainon a microbenchmark. Routing every column through the sharedArrowFieldWriter(rather thanmain's per-type column builders) trades a small amount of per-cell speed for a large reduction in duplicated serialization code: on a 2M-row × 10-column workload the isolated per-row append loop is ~10–15% slower thanmain. That loop is a small fraction of the encode cost and is not observable end-to-end, where the parquet encode (and, in typical graphs, the upstream engine/feed) dominates the run.A direct typed-scalar-builder variant that closes the conversion gap was prototyped and measured: it recovers the append-loop difference but moves the end-to-end number by <0.5%, so it was not worth reintroducing per-type builder code into the simplified design.
A whole-file accuracy harness (schema, row groups, compression codec, metadata, and values) confirms the new sink produces logically identical output to
mainacross 19 scenarios.API compatibility
The public Python API (
ParquetWriter,ParquetOutputConfig,publish,publish_struct,publish_dict_basket,filename_provider,file_visitor,file_metadata/column_metadata) is unchanged.csp/tests/adapters/test_parquet_output.py(65 tests, covering all scalar/struct/list/numpy types, batch-size→row-group counts, compression codecs, Arrow IPC, split-column, rotation, dict baskets, metadata, overwrite/FileExistsError) and the existingtest_parquet.pysuite pass.