feat: support Parquet field ID matching in native_datafusion scan#4216
Open
mbutrovich wants to merge 8 commits intoapache:mainfrom
Open
feat: support Parquet field ID matching in native_datafusion scan#4216mbutrovich wants to merge 8 commits intoapache:mainfrom
mbutrovich wants to merge 8 commits intoapache:mainfrom
Conversation
# Conflicts: # native/core/src/parquet/parquet_support.rs # native/proto/src/proto/operator.proto # spark/src/main/scala/org/apache/comet/serde/operator/CometNativeScan.scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #3434. Closes #4189.
Rationale for this change
The
native_datafusionscan was the only Comet read path that did not honor Parquet field IDs. Whenspark.sql.parquet.fieldId.read.enabled=trueand the requested schema carried IDs,CometScanRulefell back to Spark's reader (added in #3415). The legacy V1 reader andnative_iceberg_compatalready handle field-ID matching.This change resolves columns by
parquet.field.idend-to-end on thenative_datafusionpath, matching Spark'sParquetReadSupport.clipParquetGroupFieldsprecedence rules so behavior stays consistent with the V1 reader.What changes are included in this PR?
CometScanRule.transformV1Scanthat forced Spark to handle reads with field IDs.metadatamap onSparkStructFieldand a parallelfield_metadataonStructInfo, plususe_field_id/ignore_missing_field_idflags onNativeScanCommon. The JVM serde emitsparquet.field.idunder arrow-rs'sPARQUET_FIELD_ID_META_KEYso both sides agree on the key. A newCometParquetUtils.PARQUET_FIELD_ID_META_KEYconstant centralizes the string.SparkPhysicalExprAdapterFactoryextends its existing case-insensitive remap with ID-first matching; at nested struct levels,parquet_convert_struct_to_structdoes the same. ID-bearing logical fields match only by ID (a missing ID resolves to NULL); non-ID-bearing fields fall back to name. This mirrorsclipParquetGroupFields._LEGACY_ERROR_TEMP_2094"Found duplicate field(s)" (mirrorsfoundDuplicateFieldInFieldIdLookupModeError).RuntimeException("Spark read schema expects field Ids, but Parquet file schema doesn't contain any field Ids"), unlessspark.sql.parquet.fieldId.read.ignoreMissing=true.ignoreMissing=trueand the file has no IDs, ID-bearing logical fields resolve to NULL (matching Spark'sgenerateFakeColumnNametrick that prevents accidental name fallback).SparkErrorvariants (DuplicateFieldByFieldId,ParquetMissingFieldIds) plus shim cases in all threeShimSparkErrorConverterfiles (3.4 / 3.5 / 4.x).spark.sql.parquet.fieldId.read.enabled=trueand the requested schema actually carries IDs. When false, all paths take the same code as before.How are these changes tested?
Four new tests in
ParquetReadSuiteexercise the path underSCAN_NATIVE_DATAFUSIONwithcheckSparkAnswerAndOperator:Found duplicate field(s),ignoreMissing=trueNULL behavior.Spark's
ParquetFieldIdIOSuiteandParquetFieldIdSchemaSuiteare already unignored indev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diffand currently pass via the Spark fallback; with this change they exercise the native path.