From d3cdeaebb2a8004e3f28d00ace2cce1f6cb9de2a Mon Sep 17 00:00:00 2001 From: Arham Chopra Date: Wed, 10 Jun 2026 15:43:43 -0400 Subject: [PATCH 01/11] Refactor parquet output to a RecordBatch sink architecture ParquetWriter builds RecordBatches and hands them to a pluggable RecordBatchSink (onStart/onBatch/onFileChange/onStop). Removes the old C++ file-writer hierarchy and unifies output conversion via visitCspValueType. Signed-off-by: Arham Chopra --- cpp/csp/adapters/arrow/ArrowTypeVisitor.h | 57 ++- .../parquet/ArrowBackedArrayBuilder.cpp | 158 ++++++++ .../parquet/ArrowBackedArrayBuilder.h | 86 +++++ .../parquet/ArrowIPCFileWriterWrapper.cpp | 49 --- .../parquet/ArrowIPCFileWriterWrapper.h | 44 --- .../parquet/ArrowSingleColumnArrayBuilder.h | 272 +------------- cpp/csp/adapters/parquet/CMakeLists.txt | 14 +- .../DialectGenericListWriterInterface.h | 55 --- .../parquet/FileNameGeneratorReplicator.h | 95 +++++ .../adapters/parquet/FileWriterWrapper.cpp | 49 --- cpp/csp/adapters/parquet/FileWriterWrapper.h | 54 --- .../parquet/FileWriterWrapperContainer.cpp | 116 ------ .../parquet/FileWriterWrapperContainer.h | 74 ---- .../parquet/ParquetDictBasketOutputWriter.cpp | 68 ++-- .../parquet/ParquetDictBasketOutputWriter.h | 7 +- .../parquet/ParquetFileWriterWrapper.cpp | 57 --- .../parquet/ParquetFileWriterWrapper.h | 43 --- .../adapters/parquet/ParquetOutputAdapter.cpp | 346 +++--------------- .../adapters/parquet/ParquetOutputAdapter.h | 78 ++-- .../parquet/ParquetOutputAdapterManager.cpp | 39 +- .../parquet/ParquetOutputAdapterManager.h | 29 +- cpp/csp/adapters/parquet/ParquetWriter.cpp | 127 +++---- cpp/csp/adapters/parquet/ParquetWriter.h | 33 +- cpp/csp/adapters/parquet/RecordBatchSink.h | 31 ++ .../adapters/parquet/RecordBatchWithFlag.h | 21 ++ .../python/adapters/parquetadapterimpl.cpp | 261 ++++++------- csp/adapters/_parquet_rb_writer.py | 197 ++++++++++ csp/adapters/output_adapters/parquet.py | 33 ++ 28 files changed, 1013 insertions(+), 1480 deletions(-) create mode 100644 cpp/csp/adapters/parquet/ArrowBackedArrayBuilder.cpp create mode 100644 cpp/csp/adapters/parquet/ArrowBackedArrayBuilder.h delete mode 100644 cpp/csp/adapters/parquet/ArrowIPCFileWriterWrapper.cpp delete mode 100644 cpp/csp/adapters/parquet/ArrowIPCFileWriterWrapper.h delete mode 100644 cpp/csp/adapters/parquet/DialectGenericListWriterInterface.h create mode 100644 cpp/csp/adapters/parquet/FileNameGeneratorReplicator.h delete mode 100644 cpp/csp/adapters/parquet/FileWriterWrapper.cpp delete mode 100644 cpp/csp/adapters/parquet/FileWriterWrapper.h delete mode 100644 cpp/csp/adapters/parquet/FileWriterWrapperContainer.cpp delete mode 100644 cpp/csp/adapters/parquet/FileWriterWrapperContainer.h delete mode 100644 cpp/csp/adapters/parquet/ParquetFileWriterWrapper.cpp delete mode 100644 cpp/csp/adapters/parquet/ParquetFileWriterWrapper.h create mode 100644 cpp/csp/adapters/parquet/RecordBatchSink.h create mode 100644 cpp/csp/adapters/parquet/RecordBatchWithFlag.h create mode 100644 csp/adapters/_parquet_rb_writer.py diff --git a/cpp/csp/adapters/arrow/ArrowTypeVisitor.h b/cpp/csp/adapters/arrow/ArrowTypeVisitor.h index 67e74abb7..eabfb6d92 100644 --- a/cpp/csp/adapters/arrow/ArrowTypeVisitor.h +++ b/cpp/csp/adapters/arrow/ArrowTypeVisitor.h @@ -1,9 +1,13 @@ // Arrow type visitor: maps arrow::Type::type to the corresponding C++ value type. +// Eliminates repeated switch statements on arrow types across the codebase. // // Usage: // visitArrowValueType( typeId, -// [&]( auto tag ) { using T = typename decltype(tag)::type; ... }, -// [&]() { /* unsupported */ } ); +// [&]( auto tag ) -> ReturnType { +// using T = typename decltype( tag )::type; +// return doSomething( ... ); +// }, +// [&]() -> ReturnType { /* unsupported type fallback */ } ); #ifndef _IN_CSP_ADAPTERS_ARROW_ArrowTypeVisitor_H #define _IN_CSP_ADAPTERS_ARROW_ArrowTypeVisitor_H @@ -20,12 +24,14 @@ namespace csp::adapters::arrow template struct TypeTag { using type = T; }; -// Invokes fn(TypeTag{}) for the C++ type corresponding to the arrow type. +// Invokes fn( TypeTag{} ) for the C++ value type corresponding to +// the given arrow type. Calls onDefault() for unrecognised arrow types. template decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, DefaultFn && onDefault ) { switch( typeId ) { + // --- Numeric --- case ::arrow::Type::BOOL: return fn( TypeTag{} ); case ::arrow::Type::INT8: return fn( TypeTag{} ); case ::arrow::Type::INT16: return fn( TypeTag{} ); @@ -39,6 +45,7 @@ decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, Defaul case ::arrow::Type::FLOAT: case ::arrow::Type::DOUBLE: return fn( TypeTag{} ); + // --- String / Binary --- case ::arrow::Type::STRING: case ::arrow::Type::LARGE_STRING: case ::arrow::Type::BINARY: @@ -47,6 +54,7 @@ decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, Defaul case ::arrow::Type::DICTIONARY: return fn( TypeTag{} ); + // --- Temporal --- case ::arrow::Type::TIMESTAMP: return fn( TypeTag{} ); case ::arrow::Type::DURATION: return fn( TypeTag{} ); case ::arrow::Type::DATE32: @@ -54,6 +62,7 @@ decltype(auto) visitArrowValueType( ::arrow::Type::type typeId, Fn && fn, Defaul case ::arrow::Type::TIME32: case ::arrow::Type::TIME64: return fn( TypeTag