Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gradle/layers.versions.toml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[versions]
xtraplatform-core = '7.0.0-SNAPSHOT'
xtraplatform-core = '7.0.0-reactive-concurrent-flatmap-SNAPSHOT'
xtraplatform-native = '2.6.0-SNAPSHOT'

Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
Expand All @@ -34,6 +35,12 @@ public interface SqlConnector

int getMaxConnections();

/**
* Shared budget of connection permits (sized to the connection pool) that bounds how many
* connections the concurrently running single-shot sub-queries hold at once, across all requests.
*/
Semaphore getConnectionBudget();

int getMinConnections();

int getQueueSize();
Expand Down Expand Up @@ -111,6 +118,9 @@ void register(String currentTable, SqlRowMeta metaResult) {
}
}

// Per-sub-query row buffer for the concurrent single-shot value phase (see getSourceStream).
int UNPAGED_SUBQUERY_PREFETCH = 256;

// TODO: simplify, class SqlQueryRunner, remove options, singleFeature
@Override
default Reactive.Source<SqlRow> getSourceStream(
Expand Down Expand Up @@ -218,48 +228,94 @@ default Reactive.Source<SqlRow> getSourceStream(
.build();
// a server-side cursor keeps memory bounded while streaming all rows
int fetchSize = unpaged ? (int) queryBatch.getChunkSize() : 0;
Function<Integer, Source<SqlRow>> valuePhase =
index -> {
int[] i = {0};
Source<SqlRow>[] sqlRows =
querySets
.get(index)
.getValueQueries()
.apply(sqlRowMeta, 0L, 0L)
.map(
valueQuery ->
getSqlClient()
.getSourceStream(
valueQuery,
new ImmutableSqlQueryOptions.Builder()
.from(options)
.tableSchema(
querySets
.get(index)
.getTableSchemas()
.get(i[0]))
.type(
querySets
.get(index)
.getOptions()
.getType())
.containerPriority(i[0]++)
.queryIndex(
querySets.get(index).getQueryIndex())
.fetchSize(fetchSize)
.isParallel(unpaged)
.build()))
.toArray((IntFunction<Source<SqlRow>[]>) Source[]::new);

Source<SqlRow> merged = mergeAndSort(sqlRows);

if (!unpaged) {
return merged;
}

// Bound the connections held by concurrently running sub-queries: a
// sub-query needs all its table connections at once, so acquire them
// as
// a block before its rows are read and release them when it
// terminates.
// The semaphore is shared across requests, so concurrent searches can
// never deadlock by each grabbing part of the pool.
int tables = querySets.get(index).getTableSchemas().size();
return Reactive.Source.guarded(
() -> getConnectionBudget().acquireUninterruptibly(tables),
() -> getConnectionBudget().release(tables),
merged);
};

// Single-shot multi-queries read every sub-query in one pass with no
// cross-sub-query dependency (the result sets are already materialized), so
// the sub-queries can run concurrently. Their rows are still emitted
// strictly
// in sub-query order, which the decoder requires; only the JDBC reads
// overlap.
// Concurrency is bounded so the sub-queries in flight never request more
// than
// the connection pool can serve. Single-feature reads keep the serial path.
Transformer<Integer, SqlRow> valueTransformer;
if (unpaged) {
int maxTables =
Math.max(
1,
querySets.stream()
.mapToInt(qs -> qs.getTableSchemas().size())
.max()
.orElse(1));
int maxConcurrency =
Math.max(
1,
Math.min(
querySets.size(), (getMaxConnections() - 1) / maxTables));
valueTransformer =
Transformer.flatMapConcurrent(
valuePhase, maxConcurrency, UNPAGED_SUBQUERY_PREFETCH);
} else {
valueTransformer = Transformer.flatMap(valuePhase);
}

return Source.iterable(
IntStream.range(0, querySets.size())
.boxed()
.collect(Collectors.toList()))
.via(
Transformer.flatMap(
index -> {
int[] i = {0};
Source<SqlRow>[] sqlRows =
querySets
.get(index)
.getValueQueries()
.apply(sqlRowMeta, 0L, 0L)
.map(
valueQuery ->
getSqlClient()
.getSourceStream(
valueQuery,
new ImmutableSqlQueryOptions.Builder()
.from(options)
.tableSchema(
querySets
.get(index)
.getTableSchemas()
.get(i[0]))
.type(
querySets
.get(index)
.getOptions()
.getType())
.containerPriority(i[0]++)
.queryIndex(
querySets
.get(index)
.getQueryIndex())
.fetchSize(fetchSize)
.build()))
.toArray(
(IntFunction<Source<SqlRow>[]>) Source[]::new);

return mergeAndSort(sqlRows);
}))
.via(valueTransformer)
.prepend(Source.single(sqlRowMeta));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ default int getFetchSize() {
return 0;
}

/**
* When enabled, the query is subscribed on a worker thread so that several such queries can
* execute concurrently (the blocking JDBC connection provider otherwise runs every query on the
* single subscribing thread). Used for the concurrent single-shot value phase.
*/
@Value.Default
default boolean isParallel() {
return false;
}

@Value.Default
default boolean isGeometryWkb() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import de.ii.xtraplatform.streams.domain.Reactive;
import de.ii.xtraplatform.streams.domain.Reactive.Transformer;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -159,6 +160,15 @@ public Reactive.Source<SqlRow> getSourceStream(String query, SqlQueryOptions opt
});
}

// The blocking connection provider runs connect+execute+read on the subscribing thread, so
// without this the whole stream is single-threaded. Subscribing on a worker thread lets several
// parallel-flagged queries (e.g. the concurrent single-shot value phase) run at once, each on
// its
// own connection.
if (options.isParallel()) {
flowable = flowable.subscribeOn(Schedulers.io());
}

return Reactive.Source.publisher(flowable);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.davidmoten.rxjava3.jdbc.Database;
Expand All @@ -72,6 +73,7 @@ public class SqlConnectorRx extends AbstractVolatilePolling implements SqlConnec
private final int maxConnections;
private final int minConnections;
private final int queueSize;
private final Semaphore connectionBudget;
private final Path dataDir;
private final String applicationName;
private final String providerId;
Expand Down Expand Up @@ -105,6 +107,7 @@ public SqlConnectorRx(
connectionInfo.getPool().getMinConnections() >= 0
? connectionInfo.getPool().getMinConnections()
: maxConnections;
this.connectionBudget = new Semaphore(Math.max(1, maxConnections), true);

// int capacity = maxConnections / maxQueries;
// TODO
Expand Down Expand Up @@ -138,6 +141,11 @@ public int getMaxConnections() {
return maxConnections;
}

@Override
public Semaphore getConnectionBudget() {
return connectionBudget;
}

@Override
public int getMinConnections() {
return minConnections;
Expand Down
Loading