diff --git a/gradle/layers.versions.toml b/gradle/layers.versions.toml index c4d40b332..2c4e079ee 100644 --- a/gradle/layers.versions.toml +++ b/gradle/layers.versions.toml @@ -1,4 +1,4 @@ [versions] -xtraplatform-core = '7.0.0-SNAPSHOT' +xtraplatform-core = '7.0.0-reactive-concurrent-flatmap-SNAPSHOT' xtraplatform-native = '2.6.0-SNAPSHOT' diff --git a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlConnector.java b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlConnector.java index 3b0d8f5ff..cf807662b 100644 --- a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlConnector.java +++ b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlConnector.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; +import java.util.concurrent.Semaphore; import java.util.function.Function; import java.util.function.IntFunction; import java.util.stream.Collectors; @@ -34,6 +35,12 @@ public interface SqlConnector int getMaxConnections(); + /** + * Shared budget of connection permits (sized to the connection pool) that bounds how many + * connections the concurrently running single-shot sub-queries hold at once, across all requests. + */ + Semaphore getConnectionBudget(); + int getMinConnections(); int getQueueSize(); @@ -111,6 +118,9 @@ void register(String currentTable, SqlRowMeta metaResult) { } } + // Per-sub-query row buffer for the concurrent single-shot value phase (see getSourceStream). + int UNPAGED_SUBQUERY_PREFETCH = 256; + // TODO: simplify, class SqlQueryRunner, remove options, singleFeature @Override default Reactive.Source getSourceStream( @@ -218,48 +228,94 @@ default Reactive.Source getSourceStream( .build(); // a server-side cursor keeps memory bounded while streaming all rows int fetchSize = unpaged ? (int) queryBatch.getChunkSize() : 0; + Function> valuePhase = + index -> { + int[] i = {0}; + Source[] sqlRows = + querySets + .get(index) + .getValueQueries() + .apply(sqlRowMeta, 0L, 0L) + .map( + valueQuery -> + getSqlClient() + .getSourceStream( + valueQuery, + new ImmutableSqlQueryOptions.Builder() + .from(options) + .tableSchema( + querySets + .get(index) + .getTableSchemas() + .get(i[0])) + .type( + querySets + .get(index) + .getOptions() + .getType()) + .containerPriority(i[0]++) + .queryIndex( + querySets.get(index).getQueryIndex()) + .fetchSize(fetchSize) + .isParallel(unpaged) + .build())) + .toArray((IntFunction[]>) Source[]::new); + + Source merged = mergeAndSort(sqlRows); + + if (!unpaged) { + return merged; + } + + // Bound the connections held by concurrently running sub-queries: a + // sub-query needs all its table connections at once, so acquire them + // as + // a block before its rows are read and release them when it + // terminates. + // The semaphore is shared across requests, so concurrent searches can + // never deadlock by each grabbing part of the pool. + int tables = querySets.get(index).getTableSchemas().size(); + return Reactive.Source.guarded( + () -> getConnectionBudget().acquireUninterruptibly(tables), + () -> getConnectionBudget().release(tables), + merged); + }; + + // Single-shot multi-queries read every sub-query in one pass with no + // cross-sub-query dependency (the result sets are already materialized), so + // the sub-queries can run concurrently. Their rows are still emitted + // strictly + // in sub-query order, which the decoder requires; only the JDBC reads + // overlap. + // Concurrency is bounded so the sub-queries in flight never request more + // than + // the connection pool can serve. Single-feature reads keep the serial path. + Transformer valueTransformer; + if (unpaged) { + int maxTables = + Math.max( + 1, + querySets.stream() + .mapToInt(qs -> qs.getTableSchemas().size()) + .max() + .orElse(1)); + int maxConcurrency = + Math.max( + 1, + Math.min( + querySets.size(), (getMaxConnections() - 1) / maxTables)); + valueTransformer = + Transformer.flatMapConcurrent( + valuePhase, maxConcurrency, UNPAGED_SUBQUERY_PREFETCH); + } else { + valueTransformer = Transformer.flatMap(valuePhase); + } + return Source.iterable( IntStream.range(0, querySets.size()) .boxed() .collect(Collectors.toList())) - .via( - Transformer.flatMap( - index -> { - int[] i = {0}; - Source[] sqlRows = - querySets - .get(index) - .getValueQueries() - .apply(sqlRowMeta, 0L, 0L) - .map( - valueQuery -> - getSqlClient() - .getSourceStream( - valueQuery, - new ImmutableSqlQueryOptions.Builder() - .from(options) - .tableSchema( - querySets - .get(index) - .getTableSchemas() - .get(i[0])) - .type( - querySets - .get(index) - .getOptions() - .getType()) - .containerPriority(i[0]++) - .queryIndex( - querySets - .get(index) - .getQueryIndex()) - .fetchSize(fetchSize) - .build())) - .toArray( - (IntFunction[]>) Source[]::new); - - return mergeAndSort(sqlRows); - })) + .via(valueTransformer) .prepend(Source.single(sqlRowMeta)); } diff --git a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlQueryOptions.java b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlQueryOptions.java index d2ba5ebb1..8ef976657 100644 --- a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlQueryOptions.java +++ b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/domain/SqlQueryOptions.java @@ -89,6 +89,16 @@ default int getFetchSize() { return 0; } + /** + * When enabled, the query is subscribed on a worker thread so that several such queries can + * execute concurrently (the blocking JDBC connection provider otherwise runs every query on the + * single subscribing thread). Used for the concurrent single-shot value phase. + */ + @Value.Default + default boolean isParallel() { + return false; + } + @Value.Default default boolean isGeometryWkb() { return false; diff --git a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlClientRx.java b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlClientRx.java index dd671d55f..1afe2c7f9 100644 --- a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlClientRx.java +++ b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlClientRx.java @@ -21,6 +21,7 @@ import de.ii.xtraplatform.streams.domain.Reactive; import de.ii.xtraplatform.streams.domain.Reactive.Transformer; import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -159,6 +160,15 @@ public Reactive.Source getSourceStream(String query, SqlQueryOptions opt }); } + // The blocking connection provider runs connect+execute+read on the subscribing thread, so + // without this the whole stream is single-threaded. Subscribing on a worker thread lets several + // parallel-flagged queries (e.g. the concurrent single-shot value phase) run at once, each on + // its + // own connection. + if (options.isParallel()) { + flowable = flowable.subscribeOn(Schedulers.io()); + } + return Reactive.Source.publisher(flowable); } diff --git a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlConnectorRx.java b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlConnectorRx.java index 5f593ced5..3a81c4864 100644 --- a/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlConnectorRx.java +++ b/xtraplatform-features-sql/src/main/java/de/ii/xtraplatform/features/sql/infra/db/SqlConnectorRx.java @@ -48,6 +48,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import org.davidmoten.rxjava3.jdbc.Database; @@ -72,6 +73,7 @@ public class SqlConnectorRx extends AbstractVolatilePolling implements SqlConnec private final int maxConnections; private final int minConnections; private final int queueSize; + private final Semaphore connectionBudget; private final Path dataDir; private final String applicationName; private final String providerId; @@ -105,6 +107,7 @@ public SqlConnectorRx( connectionInfo.getPool().getMinConnections() >= 0 ? connectionInfo.getPool().getMinConnections() : maxConnections; + this.connectionBudget = new Semaphore(Math.max(1, maxConnections), true); // int capacity = maxConnections / maxQueries; // TODO @@ -138,6 +141,11 @@ public int getMaxConnections() { return maxConnections; } + @Override + public Semaphore getConnectionBudget() { + return connectionBudget; + } + @Override public int getMinConnections() { return minConnections;