Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 185 additions & 2 deletions docs/content.zh/docs/dev/table/functions/ptfs.md
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,189 @@ void testScalarOnly() throws Exception {
{{< /tab >}}
{{< /tabs >}}

#### Testing with State

The harness supports all PTF state types: value state, `Row`, `ListView`, and `MapView`.

{{< tabs "state-testing" >}}
{{< tab "Java" >}}
```java
// A PTF that uses all four state types: value state, Row, ListView, and MapView.
@DataTypeHint("ROW<count BIGINT>")
public class StatefulPTF extends ProcessTableFunction<Row> {
public static class ValueState {
public long count = 0L;
}

public void eval(
@StateHint ValueState valueState,
@StateHint(type = @DataTypeHint("ROW<lastValue INT>")) Row rowState,
@StateHint(type = @DataTypeHint("ARRAY<INT>")) ListView<Integer> listState,
@StateHint MapView<String, Integer> mapState,
@ArgumentHint(ArgumentTrait.SET_SEMANTIC_TABLE) Row input) throws Exception {
// Value state — increment counter
valueState.count++;

// Row state — track the last value seen
int value = input.getFieldAs("value");
rowState.setField("lastValue", value);

// ListView state — accumulate values
listState.add(value);

// MapView state — count occurrences by name
String name = input.getFieldAs("name");
Integer tagCount = mapState.get(name);
mapState.put(name, tagCount == null ? 1 : tagCount + 1);

collect(Row.of(valueState.count));
}
}

@Test
void testWithState() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
.build()) {

harness.processElement(Row.of("Alice", 10));
harness.processElement(Row.of("Alice", 20));

List<Row> output = harness.getOutput();
assertThat(output.get(0)).isEqualTo(Row.of("Alice", 1L));
assertThat(output.get(1)).isEqualTo(Row.of("Alice", 2L));
Comment thread
autophagy marked this conversation as resolved.
}
}
```
{{< /tab >}}
{{< /tabs >}}

**Initial State Setup**: Use `.withInitialStateForKey()` to pre-populate state before processing.
State initialization is scoped per partition key:

{{< tabs "initial-state" >}}
{{< tab "Java" >}}
```java
@Test
void testWithInitialState() throws Exception {
// Value state
StatefulPTF.ValueState initialValue = new StatefulPTF.ValueState();
initialValue.count = 100L;

// Row state
Row initialRow = Row.withNames();
initialRow.setField("lastValue", 42);

// ListView state
ListView<Integer> initialList = new ListView<>();
initialList.add(10);
initialList.add(20);

// MapView state
MapView<String, Integer> initialMap = new MapView<>();
initialMap.put("Alice", 5);

try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
// Initial state is set per partition key
.withInitialStateForKey("valueState", Row.of("Alice"), initialValue)
.withInitialStateForKey("rowState", Row.of("Alice"), initialRow)
.withInitialStateForKey("listState", Row.of("Alice"), initialList)
.withInitialStateForKey("mapState", Row.of("Alice"), initialMap)
.build()) {

harness.processElement(Row.of("Alice", 10));

List<Row> output = harness.getOutput();
assertThat(output).containsExactly(Row.of("Alice", 101L));
Comment thread
autophagy marked this conversation as resolved.
}
}
```
{{< /tab >}}
{{< /tabs >}}

**State Introspection**: Use `getStateForKey()`, `getKeysForState()`, and `getStateForAllKeys()` to inspect state during tests:

{{< tabs "state-introspection" >}}
{{< tab "Java" >}}
```java
@Test
void testStateIntrospection() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
.build()) {

harness.processElement(Row.of("Alice", 10));
harness.processElement(Row.of("Bob", 20));

// Check value state
StatefulPTF.ValueState aliceState =
harness.getStateForKey("valueState", Row.of("Alice"));
assertThat(aliceState.count).isEqualTo(1L);

// Check Row state
Row aliceRowState = harness.getStateForKey("rowState", Row.of("Alice"));
assertThat(aliceRowState.getField("lastValue")).isEqualTo(10);

// Check ListView state
ListView<Integer> aliceList = harness.getStateForKey("listState", Row.of("Alice"));
assertThat(aliceList.getList()).containsExactly(10);

// Check MapView state
MapView<String, Integer> aliceMap = harness.getStateForKey("mapState", Row.of("Alice"));
assertThat(aliceMap.get("Alice")).isEqualTo(1);

// Get all partition keys with state
Set<Row> keys = harness.getKeysForState("valueState");
assertThat(keys).containsExactlyInAnyOrder(Row.of("Alice"), Row.of("Bob"));

// Get all state across partition keys
Map<Row, StatefulPTF.ValueState> allState =
harness.getStateForAllKeys("valueState");
assertThat(allState.get(Row.of("Bob")).count).isEqualTo(1L);
}
}
```
{{< /tab >}}
{{< /tabs >}}

Comment thread
autophagy marked this conversation as resolved.
**State Mutation**: Use `setStateForKey()`, `clearStateForKey()`, and `clearStateEntryForKey()` to modify state during tests:

{{< tabs "state-mutation" >}}
{{< tab "Java" >}}
```java
@Test
void testStateMutation() throws Exception {
try (ProcessTableFunctionTestHarness<Row> harness =
ProcessTableFunctionTestHarness.ofClass(StatefulPTF.class)
.withTableArgument("input", DataTypes.of("ROW<name STRING, value INT>"))
.withPartitionBy("input", "name")
.build()) {

harness.processElement(Row.of("Alice", 10));

// Overwrite a specific state entry for a partition key
StatefulPTF.ValueState newState = new StatefulPTF.ValueState();
newState.count = 100L;
harness.setStateForKey("valueState", Row.of("Alice"), newState);

// Clear a specific state entry (resets to default)
harness.clearStateEntryForKey("listState", Row.of("Alice"));

// Clear all state for a partition key
harness.clearStateForKey(Row.of("Alice"));
}
}
```
{{< /tab >}}
{{< /tabs >}}

#### Configuring Table Argument Types

In contexts where the harness can't infer the table argument types for table arguments (when using unannotated `Row` inputs,
Expand Down Expand Up @@ -2348,8 +2531,8 @@ void testPOJO() throws Exception {

### PTF Features Unsupported by the TestHarness

- `Context` paramter
- State (`@StateHint`)
- `Context` parameter
- Timers (`onTimer`)
- `on_time` / `rowtime`
- Update traits (`SUPPORTS_UPDATES`, `REQUIRE_UPDATE_BEFORE`)
- State TTL (state is supported but TTL expiration is not yet implemented)
Loading