From a735ca8c70403975496ba99b7496f01f4c8c1d44 Mon Sep 17 00:00:00 2001 From: Asish Kumar Date: Sun, 24 May 2026 14:36:43 +0530 Subject: [PATCH] fix(amber): round-trip LogicalLink operator identities --- .../apache/texera/workflow/LogicalLink.scala | 42 +++++++++++++++++-- .../texera/workflow/LogicalLinkSpec.scala | 41 +++++++----------- .../amber/compiler/model/LogicalLink.scala | 42 +++++++++++++++++-- .../WorkflowCompilationResourceSpec.scala | 15 +++++++ 4 files changed, 107 insertions(+), 33 deletions(-) diff --git a/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala b/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala index e6553e3cdf1..145b1cf1c85 100644 --- a/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala +++ b/amber/src/main/scala/org/apache/texera/workflow/LogicalLink.scala @@ -20,9 +20,31 @@ package org.apache.texera.workflow import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} +import com.fasterxml.jackson.databind.JsonNode import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.PortIdentity +object LogicalLink { + private def readOperatorIdentity(node: JsonNode, fieldName: String): OperatorIdentity = { + if (node == null || node.isNull) { + OperatorIdentity(null) + } else if (node.isTextual) { + OperatorIdentity(node.asText()) + } else if (node.isObject) { + val idNode = node.get("id") + if (idNode == null || idNode.isNull) { + OperatorIdentity(null) + } else { + OperatorIdentity(idNode.asText()) + } + } else { + throw new IllegalArgumentException( + s"LogicalLink $fieldName must be a string or an object with an id field" + ) + } + } +} + case class LogicalLink( @JsonProperty("fromOpId") fromOpId: OperatorIdentity, fromPortId: PortIdentity, @@ -42,13 +64,27 @@ case class LogicalLink( s"LogicalLink self-loop not allowed: fromOpId == toOpId == ${fromOpId.id}" ) - @JsonCreator def this( - @JsonProperty("fromOpId") fromOpId: String, + fromOpId: String, fromPortId: PortIdentity, - @JsonProperty("toOpId") toOpId: String, + toOpId: String, toPortId: PortIdentity ) = { this(OperatorIdentity(fromOpId), fromPortId, OperatorIdentity(toOpId), toPortId) } + + @JsonCreator + def this( + @JsonProperty("fromOpId") fromOpId: JsonNode, + fromPortId: PortIdentity, + @JsonProperty("toOpId") toOpId: JsonNode, + toPortId: PortIdentity + ) = { + this( + LogicalLink.readOperatorIdentity(fromOpId, "fromOpId"), + fromPortId, + LogicalLink.readOperatorIdentity(toOpId, "toOpId"), + toPortId + ) + } } diff --git a/amber/src/test/scala/org/apache/texera/workflow/LogicalLinkSpec.scala b/amber/src/test/scala/org/apache/texera/workflow/LogicalLinkSpec.scala index bd56aa7d5f6..9e371173056 100644 --- a/amber/src/test/scala/org/apache/texera/workflow/LogicalLinkSpec.scala +++ b/amber/src/test/scala/org/apache/texera/workflow/LogicalLinkSpec.scala @@ -20,7 +20,7 @@ package org.apache.texera.workflow import com.fasterxml.jackson.databind.JsonNode -import com.fasterxml.jackson.databind.exc.{MismatchedInputException, ValueInstantiationException} +import com.fasterxml.jackson.databind.exc.ValueInstantiationException import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.util.JSONUtils.objectMapper @@ -131,10 +131,10 @@ class LogicalLinkSpec extends AnyFlatSpec { } // --------------------------------------------------------------------------- - // Secondary @JsonCreator constructor (string opId variant) + // Secondary string opId constructor // --------------------------------------------------------------------------- - "LogicalLink secondary @JsonCreator constructor" should "wrap raw String op ids in OperatorIdentity" in { + "LogicalLink secondary String constructor" should "wrap raw String op ids in OperatorIdentity" in { val link = new LogicalLink( fromOpId = "op-A", fromPortId = PortIdentity(0), @@ -160,7 +160,7 @@ class LogicalLinkSpec extends AnyFlatSpec { assert(link.toOpId == OperatorIdentity("my.op-2")) } - it should "reject the empty string as an op id via the @JsonCreator constructor" in { + it should "reject the empty string as an op id via the secondary String constructor" in { intercept[IllegalArgumentException] { new LogicalLink("", PortIdentity(0), "op-B", PortIdentity(1)) } @@ -169,7 +169,7 @@ class LogicalLinkSpec extends AnyFlatSpec { } } - it should "reject a null string op id via the @JsonCreator constructor" in { + it should "reject a null string op id via the secondary String constructor" in { intercept[IllegalArgumentException] { new LogicalLink(null: String, PortIdentity(0), "op-B", PortIdentity(1)) } @@ -178,7 +178,7 @@ class LogicalLinkSpec extends AnyFlatSpec { } } - it should "reject a self-loop via the @JsonCreator constructor (same string op id)" in { + it should "reject a self-loop via the secondary String constructor (same string op id)" in { val ex = intercept[IllegalArgumentException] { new LogicalLink("op-A", PortIdentity(0), "op-A", PortIdentity(1)) } @@ -194,12 +194,11 @@ class LogicalLinkSpec extends AnyFlatSpec { // wiring (annotations, default-Scala-module config) surfaces here. "LogicalLink Jackson deserialization" should - "deserialize fromOpId / toOpId from raw String values via the secondary @JsonCreator constructor" in { + "deserialize fromOpId / toOpId from raw String values via the Jackson creator" in { // Build the JSON by hand to mimic a user-saved workflow file where // `fromOpId` and `toOpId` are written as plain strings (the only shape // production actually receives, since the frontend emits them as - // strings). Jackson dispatches to the @JsonCreator string-overload - // constructor. + // strings). Jackson dispatches to the @JsonCreator constructor. val node = objectMapper.createObjectNode() node.put("fromOpId", "op-A") node.set("fromPortId", objectMapper.valueToTree[JsonNode](PortIdentity(0))) @@ -245,17 +244,7 @@ class LogicalLinkSpec extends AnyFlatSpec { assert(tree.has("toPortId")) } - it should "NOT round-trip through writeValueAsString (the @JsonCreator string overload is incompatible with the object-shape OperatorIdentity that writeValueAsString emits)" in { - // Characterization of a real asymmetry tracked by - // https://github.com/apache/texera/issues/5042. Production reads - // user-saved workflow JSON where `fromOpId`/`toOpId` are plain - // strings, but `objectMapper.writeValueAsString` writes - // OperatorIdentity as `{"id":"op-A"}` (the case-class object form). - // Re-reading the emitted JSON fails because Jackson dispatches on the - // @JsonCreator string overload, which can't accept an object for - // fromOpId. When the issue is fixed (additional @JsonCreator object - // overload or a custom @JsonDeserialize), this test must flip to a - // passing round-trip assertion alongside the fix. + it should "round-trip through writeValueAsString when OperatorIdentity fields use object shape" in { val original = LogicalLink( OperatorIdentity("op-A"), PortIdentity(0), @@ -269,16 +258,14 @@ class LogicalLinkSpec extends AnyFlatSpec { val tree = objectMapper.readTree(json) assert(tree.path("fromOpId").isObject, s"expected fromOpId to be an object: $json") assert(tree.path("fromOpId").path("id").asText() == "op-A") - // Re-reading the just-emitted JSON fails because the @JsonCreator - // String overload can't accept the object-shape fromOpId. - intercept[MismatchedInputException] { - objectMapper.readValue(json, classOf[LogicalLink]) - } + + val roundTripped = objectMapper.readValue(json, classOf[LogicalLink]) + assert(roundTripped == original) } - it should "reject missing string op-id fields when deserializing via Jackson" in { + it should "reject missing op-id fields when deserializing via Jackson" in { // When `fromOpId` / `toOpId` are omitted, Jackson invokes the - // @JsonCreator with `null` for the missing String args. The primary + // @JsonCreator with `null` for the missing args. The primary // constructor's `require` on non-null/non-empty ids then throws, and // Jackson wraps it in `ValueInstantiationException` with the original // `IllegalArgumentException` as the cause. diff --git a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalLink.scala b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalLink.scala index 5c7662f9668..24c09a2af2a 100644 --- a/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalLink.scala +++ b/workflow-compiling-service/src/main/scala/org/apache/texera/amber/compiler/model/LogicalLink.scala @@ -20,22 +20,58 @@ package org.apache.texera.amber.compiler.model import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty} +import com.fasterxml.jackson.databind.JsonNode import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.PortIdentity +object LogicalLink { + private def readOperatorIdentity(node: JsonNode, fieldName: String): OperatorIdentity = { + if (node == null || node.isNull) { + OperatorIdentity(null) + } else if (node.isTextual) { + OperatorIdentity(node.asText()) + } else if (node.isObject) { + val idNode = node.get("id") + if (idNode == null || idNode.isNull) { + OperatorIdentity(null) + } else { + OperatorIdentity(idNode.asText()) + } + } else { + throw new IllegalArgumentException( + s"LogicalLink $fieldName must be a string or an object with an id field" + ) + } + } +} + case class LogicalLink( @JsonProperty("fromOpId") fromOpId: OperatorIdentity, fromPortId: PortIdentity, @JsonProperty("toOpId") toOpId: OperatorIdentity, toPortId: PortIdentity ) { - @JsonCreator def this( - @JsonProperty("fromOpId") fromOpId: String, + fromOpId: String, fromPortId: PortIdentity, - @JsonProperty("toOpId") toOpId: String, + toOpId: String, toPortId: PortIdentity ) = { this(OperatorIdentity(fromOpId), fromPortId, OperatorIdentity(toOpId), toPortId) } + + @JsonCreator + def this( + @JsonProperty("fromOpId") fromOpId: JsonNode, + fromPortId: PortIdentity, + @JsonProperty("toOpId") toOpId: JsonNode, + toPortId: PortIdentity + ) = { + this( + LogicalLink.readOperatorIdentity(fromOpId, "fromOpId"), + fromPortId, + LogicalLink.readOperatorIdentity(toOpId, "toOpId"), + toPortId + ) + } } diff --git a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala index 87246fd7f34..f9464c155f1 100644 --- a/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala +++ b/workflow-compiling-service/src/test/scala/org/apache/texera/service/resource/WorkflowCompilationResourceSpec.scala @@ -25,6 +25,7 @@ import jakarta.ws.rs.client.Entity import jakarta.ws.rs.core.{MediaType, Response} import org.apache.texera.amber.compiler.model.{LogicalLink, LogicalPlanPojo} import org.apache.texera.amber.core.tuple.{Attribute, AttributeType} +import org.apache.texera.amber.core.virtualidentity.OperatorIdentity import org.apache.texera.amber.core.workflow.PortIdentity import org.apache.texera.amber.operator.filter.{ ComparisonType, @@ -127,6 +128,20 @@ class WorkflowCompilationResourceSpec extends AnyFlatSpec with BeforeAndAfterAll compilationResponse.asInstanceOf[WorkflowCompilationSuccess] } + it should "round-trip LogicalLink JSON emitted by the production objectMapper" in { + val original = LogicalLink( + OperatorIdentity("op-A"), + PortIdentity(0), + OperatorIdentity("op-B"), + PortIdentity(1) + ) + + val json = objectMapper.writeValueAsString(original) + val roundTripped = objectMapper.readValue(json, classOf[LogicalLink]) + + assertThat(roundTripped).isEqualTo(original) + } + it should "compile workflow successfully with multiple filter and limit operations" in { // construct the LogicalPlan: CSVScan --> Projection --> Limit --> Filter (TotalProfit > 10000) --> Filter (Region != "JPN") --> Limit val localCsvFilePath =