Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,15 @@ default String getOp() {
@JsonIgnore
Optional<List<Object>> getMaterializedValues();

/**
* Name of a table the service has materialized the result set into (one column of member values).
* Used for sets that are too large to inline as a literal list: the predicate is encoded as
* {@code IN (SELECT <value column> FROM <table>)} against the pre-materialized, indexed table, so
* the producing query runs once instead of being re-derived in every consumer.
*/
@JsonIgnore
Optional<String> getMaterializedTable();

@JsonIgnore
@Value.Lazy
default String getSetName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,10 @@ private String reduceSelectToColumnForTemplate(String expression) {

// output column alias of every result-set CTE; consumers reference it as `SELECT <col> FROM
// <cte>`
private static final String CTE_VALUE_COL = "rs_value";
/** Stable name of the single value column projected by a result-set producer. */
public static final String RESULT_SET_VALUE_COLUMN = "rs_value";

private static final String CTE_VALUE_COL = RESULT_SET_VALUE_COLUMN;

/**
* Collects the result-set subqueries of one top-level {@code inResultSet} predicate as named,
Expand Down Expand Up @@ -395,6 +398,15 @@ public String encodeResultSetProducer(InResultSet inResultSet) {
return resultSetProducerSelect(inResultSet, false, null);
}

/**
* Producer SELECT of a result set for materialization into a table: the value column is aliased
* to {@link #RESULT_SET_VALUE_COLUMN} so the resulting table has a stable column name to index
* and to reference from the consuming filters.
*/
public String encodeResultSetProducerAliased(InResultSet inResultSet) {
return resultSetProducerSelect(inResultSet, true, null);
}

/**
* Type of the value column of a result set, used to coerce and render its materialized values.
*/
Expand Down Expand Up @@ -2085,6 +2097,17 @@ private String encodeInResultSet(InResultSet inResultSet, String mainExpression)
return "1 = 0";
}

// if the result set has been materialized into a table, reference it directly so the producer
// runs once and each consumer only scans the indexed table
if (inResultSet.getMaterializedTable().isPresent()) {
return String.format(
mainExpression,
"",
String.format(
" IN (SELECT %s FROM %s)",
CTE_VALUE_COL, inResultSet.getMaterializedTable().get()));
}

// if the result set has been materialized up front, inline its values as a literal IN list
if (inResultSet.getMaterializedValues().isPresent()) {
List<Object> values = inResultSet.getMaterializedValues().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import de.ii.xtraplatform.features.domain.MultiFeatureQuery.SubQuery;
import de.ii.xtraplatform.features.domain.SchemaBase;
import de.ii.xtraplatform.features.sql.domain.SqlClient;
import de.ii.xtraplatform.features.sql.domain.SqlDialect;
import de.ii.xtraplatform.features.sql.domain.SqlQueryOptions;
import de.ii.xtraplatform.features.sql.domain.SqlRow;
import java.util.ArrayList;
Expand All @@ -30,9 +31,12 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.slf4j.Logger;
Expand All @@ -43,29 +47,54 @@
* run a single time (its dependencies already materialized as literal id lists), and the collected
* values are attached to the {@link InResultSet} nodes of the consuming filters so that they are
* encoded as a literal {@code IN} list instead of a per-statement nested subquery. A result set
* that exceeds the configured cap is left unmaterialized and falls back to the inline (CTE)
* encoding.
* that exceeds the configured cap is materialized once into an indexed table that consumers join
* (when the dialect supports it); otherwise it is left unmaterialized and falls back to the inline
* (CTE) re-evaluation.
*/
public class ResultSetMaterializer {

private static final Logger LOGGER = LoggerFactory.getLogger(ResultSetMaterializer.class);

// prefix for the request-scoped tables that hold oversized result sets
private static final String TABLE_PREFIX = "_rs_mat_";
// monotonic counter making each materialize() call's table names unique within this JVM
private static final AtomicLong SEQUENCE = new AtomicLong();

private final Supplier<SqlClient> sqlClient;
private final FilterEncoderSql filterEncoder;
private final int maxSetSize;
private final SqlDialect dialect;
// random per-provider-instance token so concurrent instances on the same database cannot collide
private final String instanceId;

public ResultSetMaterializer(
Supplier<SqlClient> sqlClient, FilterEncoderSql filterEncoder, int maxSetSize) {
Supplier<SqlClient> sqlClient,
FilterEncoderSql filterEncoder,
int maxSetSize,
SqlDialect dialect) {
this.sqlClient = sqlClient;
this.filterEncoder = filterEncoder;
this.maxSetSize = maxSetSize;
this.dialect = dialect;
this.instanceId = Integer.toHexString(ThreadLocalRandom.current().nextInt());
}

/**
* Returns a copy of the query with every materializable result set computed and inlined. If the
* query uses no result sets, it is returned unchanged.
* query uses no result sets, it is returned unchanged. Any tables created for oversized sets are
* named uniquely per call; their names are not tracked and must be dropped by the caller via the
* overload that collects them.
*/
public MultiFeatureQuery materialize(MultiFeatureQuery query) {
return materialize(query, new ArrayList<>());
}

/**
* As {@link #materialize(MultiFeatureQuery)}, but records the names of any tables created for
* oversized result sets into {@code createdTables}. The caller owns their lifecycle and must
* {@link #dropTables(java.util.Collection) drop} them once the query's stream has completed.
*/
public MultiFeatureQuery materialize(MultiFeatureQuery query, List<String> createdTables) {
Map<String, InResultSet> sets = new LinkedHashMap<>();
for (SubQuery subQuery : query.getQueries()) {
for (Cql2Expression filter : subQuery.getFilters()) {
Expand All @@ -76,7 +105,12 @@ public MultiFeatureQuery materialize(MultiFeatureQuery query) {
return query;
}

// one sequence value per call gives every oversized set in this request a name that is unique
// across concurrent requests (and, with the instance token, across provider instances)
long sequence = SEQUENCE.incrementAndGet();
Map<String, List<Object>> materialized = new HashMap<>();
// oversized sets materialized into a request-scoped table: set name -> table name
Map<String, String> materializedTables = new HashMap<>();
int shortCircuited = 0;
// materialize level by level: within a level the producers are independent and run concurrently
// (bounded by the connection pool). SQL is built single-threaded between levels, so the filter
Expand All @@ -85,6 +119,10 @@ public MultiFeatureQuery materialize(MultiFeatureQuery query) {
for (List<String> level : topologicalLevels(sets)) {
Map<String, CompletableFuture<Collection<SqlRow>>> running = new LinkedHashMap<>();
Map<String, SchemaBase.Type> valueTypes = new HashMap<>();
// keep each level's prepared nodes (dependencies already applied) for the join phase, where
// an
// oversized set is materialized into a table from the very same producer
Map<String, InResultSet> prepared = new HashMap<>();
for (String name : level) {
InResultSet node = sets.get(name);
// if a dependency has already materialized to no members in a position that forces this
Expand All @@ -96,18 +134,21 @@ && isProvablyEmpty(node.getProducerFilter().get(), materialized)) {
shortCircuited++;
continue;
}
InResultSet prepared =
InResultSet preparedNode =
node.getProducerFilter().isPresent()
? new ImmutableInResultSet.Builder()
.from(node)
.producerFilter(applyMaterialized(node.getProducerFilter().get(), materialized))
.producerFilter(
applyMaterialized(
node.getProducerFilter().get(), materialized, materializedTables))
.build()
: node;
prepared.put(name, preparedNode);

// bound the fetch to one past the cap so an oversized set is detected without loading it
// all
String producerQuery =
filterEncoder.encodeResultSetProducer(prepared) + " LIMIT " + (maxSetSize + 1);
filterEncoder.encodeResultSetProducer(preparedNode) + " LIMIT " + (maxSetSize + 1);
valueTypes.put(name, filterEncoder.resultSetValueType(node));
running.put(name, sqlClient.get().run(producerQuery, SqlQueryOptions.single()));
}
Expand All @@ -116,7 +157,11 @@ && isProvablyEmpty(node.getProducerFilter().get(), materialized)) {
String name = entry.getKey();
Collection<SqlRow> rows = entry.getValue().join();
if (rows.size() > maxSetSize) {
if (LOGGER.isWarnEnabled()) {
if (dialect.supportsResultSetTables()) {
// too large to inline as a literal list: materialize the producer once into an indexed
// table; consumers reference it instead of re-deriving the producer each time
materializeTable(name, prepared.get(name), sequence, materializedTables, createdTables);
} else if (LOGGER.isWarnEnabled()) {
LOGGER.warn(
"Result set '{}' has more than the materialization cap of {} members; falling back"
+ " to inline evaluation for this set.",
Expand Down Expand Up @@ -150,7 +195,10 @@ && isProvablyEmpty(node.getProducerFilter().get(), materialized)) {
.from(subQuery)
.filters(
subQuery.getFilters().stream()
.map(filter -> applyMaterialized(filter, materialized))
.map(
filter ->
applyMaterialized(
filter, materialized, materializedTables))
.collect(Collectors.toList()))
.build())
.collect(Collectors.toList());
Expand Down Expand Up @@ -229,8 +277,57 @@ private static Set<String> dependenciesOf(InResultSet node, Map<String, InResult
}

private static Cql2Expression applyMaterialized(
Cql2Expression expression, Map<String, List<Object>> materialized) {
return (Cql2Expression) expression.accept(new ApplyMaterialized(materialized));
Cql2Expression expression,
Map<String, List<Object>> materialized,
Map<String, String> materializedTables) {
return (Cql2Expression)
expression.accept(new ApplyMaterialized(materialized, materializedTables));
}

/**
* Materializes an oversized producer into an indexed table that consumers reference instead of
* re-deriving the producer. The table name is unique to this materialize call (instance token +
* call sequence + set name), so concurrent requests never collide; the name is recorded in {@code
* createdTables} so the caller can drop it once the query's stream has completed.
*/
private void materializeTable(
String name,
InResultSet prepared,
long sequence,
Map<String, String> materializedTables,
List<String> createdTables) {
String suffix = name.replaceAll("[^A-Za-z0-9_]", "_").toLowerCase(Locale.ROOT);
if (suffix.length() > 24) {
suffix = suffix.substring(0, 24);
}
String table = TABLE_PREFIX + instanceId + "_" + sequence + "_" + suffix;
SqlClient client = sqlClient.get();
client
.run(
dialect.createResultSetTable(
table, filterEncoder.encodeResultSetProducerAliased(prepared)),
SqlQueryOptions.ddl())
.join();
client
.run(
dialect.createResultSetTableIndex(table, FilterEncoderSql.RESULT_SET_VALUE_COLUMN),
SqlQueryOptions.ddl())
.join();
materializedTables.put(name, table);
createdTables.add(table);
}

/** Drops the given result-set tables, best effort. Safe to call with an empty collection. */
public void dropTables(java.util.Collection<String> tables) {
for (String table : tables) {
try {
sqlClient.get().run(dialect.dropResultSetTable(table), SqlQueryOptions.ddl()).join();
} catch (RuntimeException e) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Could not drop result-set table '{}': {}", table, e.getMessage());
}
}
}
}

/**
Expand Down Expand Up @@ -331,12 +428,15 @@ public CqlNode visit(BinaryScalarOperation scalarOperation, List<CqlNode> childr
}
}

/** Attaches materialized values to the {@link InResultSet} nodes that have them. */
/** Attaches materialized values (or a materialized table) to the {@link InResultSet} nodes. */
private static class ApplyMaterialized extends CqlVisitorCopy {
private final Map<String, List<Object>> materialized;
private final Map<String, String> materializedTables;

ApplyMaterialized(Map<String, List<Object>> materialized) {
ApplyMaterialized(
Map<String, List<Object>> materialized, Map<String, String> materializedTables) {
this.materialized = materialized;
this.materializedTables = materializedTables;
}

@Override
Expand All @@ -348,6 +448,10 @@ public CqlNode visit(BinaryScalarOperation scalarOperation, List<CqlNode> childr
if (values != null) {
return new ImmutableInResultSet.Builder().from(node).materializedValues(values).build();
}
String table = materializedTables.get(node.getSetName());
if (table != null) {
return new ImmutableInResultSet.Builder().from(node).materializedTable(table).build();
}
}
return copy;
}
Expand Down
Loading
Loading