Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion native/spark-expr/src/math_funcs/wide_decimal_binary_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,16 @@ impl PhysicalExpr for WideDecimalBinaryExpr {
let left_val = self.left.evaluate(batch)?;
let right_val = self.right.evaluate(batch)?;

// Track scalar-ness so we can return a Scalar when both inputs are scalars.
// Without this, a (Scalar op Scalar) result would be returned as a length-1
// Array, and downstream comparisons against full batches would incorrectly
// see two Array operands with mismatched lengths instead of (Array, Scalar).
// See https://github.com/apache/datafusion-comet/issues/1615 (the q23 BHJ
// join-filter scalar-subquery crash).
let both_scalar = matches!(
(&left_val, &right_val),
(ColumnarValue::Scalar(_), ColumnarValue::Scalar(_))
);
let (left_arr, right_arr): (ArrayRef, ArrayRef) = match (&left_val, &right_val) {
(ColumnarValue::Array(l), ColumnarValue::Array(r)) => (Arc::clone(l), Arc::clone(r)),
(ColumnarValue::Scalar(l), ColumnarValue::Array(r)) => {
Expand Down Expand Up @@ -280,7 +290,16 @@ impl PhysicalExpr for WideDecimalBinaryExpr {
result
};
let result = result.with_data_type(DataType::Decimal128(p_out, s_out));
Ok(ColumnarValue::Array(Arc::new(result)))
if both_scalar {
// Convert the length-1 result back into a Scalar so downstream
// expressions (binary ops, comparisons) can take the scalar fast-path
// and propagate the scalar tag (Datum::is_scalar) through arrow-rs
// kernels.
let scalar = datafusion::common::ScalarValue::try_from_array(&result, 0)?;
Ok(ColumnarValue::Scalar(scalar))
} else {
Ok(ColumnarValue::Array(Arc::new(result)))
}
}

fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
Expand Down Expand Up @@ -557,4 +576,66 @@ mod tests {
let arr = result.as_primitive::<Decimal128Type>();
assert_eq!(arr.value(0), 20000); // 2.0000
}

/// Regression test for the TPC-DS q23 BroadcastHashJoin crash (issue #1615).
///
/// When both inputs are `ColumnarValue::Scalar`, `evaluate` must return a
/// `ColumnarValue::Scalar` -- not a length-1 `ColumnarValue::Array`. Otherwise
/// downstream comparisons against full batches see two `Array` operands with
/// mismatched lengths and arrow-ord's `compare_op` rejects them with
/// "Cannot compare arrays of different lengths, got N vs 1".
#[test]
fn test_scalar_scalar_returns_scalar() {
use datafusion::common::ScalarValue;
use datafusion::physical_expr::expressions::Literal;

// 0.95 * 100.00 -- the same Scalar x Scalar decimal multiply pattern that
// appears in the q23 filter `0.95 * scalar_subquery > ssales`.
let left: Arc<dyn PhysicalExpr> =
Arc::new(Literal::new(ScalarValue::Decimal128(Some(95), 38, 2)));
let right: Arc<dyn PhysicalExpr> =
Arc::new(Literal::new(ScalarValue::Decimal128(Some(10000), 38, 2)));

let expr = WideDecimalBinaryExpr::new(
left,
right,
WideDecimalOp::Multiply,
38,
2,
EvalMode::Legacy,
);

// Empty schema -- both inputs are Literals so no columns are needed.
let batch = RecordBatch::new_empty(Arc::new(Schema::empty()));
match expr.evaluate(&batch).unwrap() {
ColumnarValue::Scalar(ScalarValue::Decimal128(Some(v), 38, 2)) => {
// 0.95 * 100.00 = 95.00 -> at scale 2, integer 9500
assert_eq!(v, 9500);
}
ColumnarValue::Scalar(other) => {
panic!("expected Decimal128(Some(_), 38, 2), got {other:?}");
}
ColumnarValue::Array(_) => {
panic!(
"Scalar x Scalar must return ColumnarValue::Scalar, not Array. This is the q23 BHJ crash regression (issue #1615)."
);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @coderfender mentioned, this is unrelated to #1615, which was for an Arrow C-Data offset-buffer bug. Also there is some weird spacing in this message.

Suggested change
panic!(
"Scalar x Scalar must return ColumnarValue::Scalar, not Array. This is the q23 BHJ crash regression (issue #1615)."
);
panic!(
"Scalar x Scalar must return ColumnarValue::Scalar, not Array"
);

}
}
}

/// Companion test: when at least one input is an Array, the result must remain an Array.
/// Guards against over-eager scalar-unwrapping in the fix.
#[test]
fn test_array_input_returns_array() {
let batch = make_batch(
vec![Some(150), Some(250)],
38,
2,
vec![Some(100), Some(200)],
38,
2,
);
let result = eval_expr(&batch, WideDecimalOp::Add, 38, 2, EvalMode::Legacy).unwrap();
assert_eq!(result.len(), 2);
}
}
Loading