Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,14 @@ public void onPush(SqlRow sqlRow) {

private void handleMetaRow(SqlRowMeta sqlRow) {

context
.metadata()
.numberReturned(
context.metadata().getNumberReturned().orElse(0) + sqlRow.getNumberReturned());
// a negative numberReturned marks it as not computed (single-shot/unpaged); leave it unset so
// the encoder omits numberReturned instead of reporting 0
if (sqlRow.getNumberReturned() >= 0) {
context
.metadata()
.numberReturned(
context.metadata().getNumberReturned().orElse(0) + sqlRow.getNumberReturned());
}
if (sqlRow.getNumberMatched().isPresent()) {
context
.metadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class FeatureQueryEncoderSql implements FeatureQueryEncoder<SqlQueryBatch
private final int chunkSize;
private final SqlDialect sqlDialect;
private final boolean geometryAsWkb;
private final boolean computeNumberMatched;

public FeatureQueryEncoderSql(
Map<String, List<SqlQueryTemplates>> allQueryTemplates,
Expand All @@ -65,6 +66,7 @@ public FeatureQueryEncoderSql(
this.allQueryTemplatesMutations = allQueryTemplatesMutations;
this.chunkSize = queryGeneratorSettings.getChunkSize();
this.geometryAsWkb = queryGeneratorSettings.getGeometryAsWkb();
this.computeNumberMatched = queryGeneratorSettings.getComputeNumberMatched();
this.sqlDialect = sqlDialect;
}

Expand Down Expand Up @@ -114,6 +116,7 @@ private SqlQueryBatch encode(FeatureQuery query, Map<String, String> additionalQ
query,
additionalQueryParameters,
query.returnsSingleFeature(),
false,
0)))
.flatMap(s -> s)
.collect(Collectors.toList());
Expand All @@ -131,7 +134,13 @@ private SqlQueryBatch encode(FeatureQuery query, Map<String, String> additionalQ

private SqlQueryBatch encode(
MultiFeatureQuery query, Map<String, String> additionalQueryParameters) {
int chunks = (query.getLimit() / chunkSize) + (query.getLimit() % chunkSize > 0 ? 1 : 0);
// A multi-query that does not support paging is executed single-shot: every matching row is
// read in one pass per (sub-query, table), with no meta query, chunking or key-range window.
// This drops numberReturned/numberMatched in exchange for far fewer statements and round-trips
// at large sizes (an optional per-sub-query maximum may still cap each sub-query).
boolean unpaged = !query.getSupportPaging() && !query.hitsOnly();
int chunks =
unpaged ? 1 : (query.getLimit() / chunkSize) + (query.getLimit() % chunkSize > 0 ? 1 : 0);

List<SqlQuerySet> querySets =
IntStream.range(0, query.getQueries().size())
Expand All @@ -156,7 +165,8 @@ private SqlQueryBatch encode(
typeQuery,
query,
additionalQueryParameters,
false,
unpaged,
unpaged,
queryIndex)))
.flatMap(s -> s);
})
Expand All @@ -168,6 +178,9 @@ private SqlQueryBatch encode(
.limit(query.getLimit())
.offset(query.getOffset())
.chunkSize(chunkSize)
.isUnpaged(unpaged)
// when paging, count every sub-query so numberMatched is the full invariant total
.isComputeNumberMatched(query.getSupportPaging() && computeNumberMatched)
.build()
.withQuerySets(querySets);
}
Expand All @@ -181,14 +194,14 @@ private SqlQuerySet createQuerySet(
Query query,
Map<String, String> additionalQueryParameters,
boolean skipMetaQuery,
boolean unpaged,
int queryIndex) {
List<SortKey> sortKeys =
transformSortKeys(typeQuery.getSortKeys(), queryTemplates.getMapping());
boolean useMinMaxKeys = queryTemplates.getMapping().getMainTable().isSortKeyUnique();
// a multi-query may opt out of computing numberMatched to avoid a count query per sub-query
boolean computeNumberMatched =
!(query instanceof MultiFeatureQuery)
|| ((MultiFeatureQuery) query).getComputeNumberMatched();
// a paged multi-query computes numberMatched; a single-shot one (no paging) does not
boolean supportPaging =
!(query instanceof MultiFeatureQuery) || ((MultiFeatureQuery) query).getSupportPaging();

BiFunction<Long, Long, Optional<String>> metaQuery =
(maxLimit, skipped) ->
Expand All @@ -209,27 +222,42 @@ private SqlQuerySet createQuerySet(
query.hitsOnly(),
// numberMatched is invariant across chunks, so compute it only on the
// first chunk of each collection; later chunks reuse that value
chunk == 0 && computeNumberMatched));
chunk == 0 && supportPaging));

TriFunction<SqlRowMeta, Long, Long, Stream<String>> valueQueries =
(metaResult, maxLimit, skipped) ->
queryTemplates.getValueQueryTemplates().stream()
.map(
valueQueryTemplate ->
valueQueryTemplate.generateValueQuery(
Math.min(limit, maxLimit),
Math.max(0L, offset - skipped),
sortKeys,
typeQuery.getFilter(),
typeQuery.forceSimpleFeatureGeometry(),
(useMinMaxKeys
&& ((Objects.nonNull(metaResult.getMinKey())
&& Objects.nonNull(metaResult.getMaxKey()))
|| metaResult.getNumberReturned() == 0))
? Optional.of(
Tuple.of(metaResult.getMinKey(), metaResult.getMaxKey()))
: Optional.empty(),
additionalQueryParameters));
// single-shot reads matching rows in one pass: no offset and no key-range
// window (which would otherwise constrain the result set to the meta
// query's minKey/maxKey); an optional per-sub-query maximum caps each
// sub-query (0 = no limit)
unpaged
? valueQueryTemplate.generateValueQuery(
query instanceof MultiFeatureQuery
? ((MultiFeatureQuery) query).getMaxFeaturesPerSubQuery()
: 0L,
0L,
sortKeys,
typeQuery.getFilter(),
typeQuery.forceSimpleFeatureGeometry(),
Optional.empty(),
additionalQueryParameters)
: valueQueryTemplate.generateValueQuery(
Math.min(limit, maxLimit),
Math.max(0L, offset - skipped),
sortKeys,
typeQuery.getFilter(),
typeQuery.forceSimpleFeatureGeometry(),
(useMinMaxKeys
&& ((Objects.nonNull(metaResult.getMinKey())
&& Objects.nonNull(metaResult.getMaxKey()))
|| metaResult.getNumberReturned() == 0))
? Optional.of(
Tuple.of(metaResult.getMinKey(), metaResult.getMaxKey()))
: Optional.empty(),
additionalQueryParameters));

// reuse SchemaSql instances instead of copying them; this is expensive and unnecessary, since
// they are immutable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,13 @@ class Paging {
private long lastNumberReturned;
private long lastNumberSkipped;
private boolean noOffset;
private final boolean computeNumberMatched;

public Paging(long limit, long offset, long chunkSize) {
public Paging(long limit, long offset, long chunkSize, boolean computeNumberMatched) {
this.limit = limit;
this.offset = offset;
this.chunkSize = chunkSize;
this.computeNumberMatched = computeNumberMatched;

this.featureCountdown = limit;
this.numberSkipped = 0L;
Expand All @@ -75,9 +77,14 @@ Optional<Tuple<Long, Long>> get(String currentTable) {
long found = lastNumberReturned + lastNumberSkipped;

// Once the limit is reached or the current collection is exhausted (its last chunk returned
// fewer rows than the chunk size), no further meta query is needed: there are no more rows to
// read and numberMatched was already computed on the collection's first chunk.
// fewer rows than the chunk size), no further rows need to be read. When numberMatched is
// computed, a not-yet-counted sub-query is still given a count-only meta query (maxLimit 0)
// so the reported total covers every sub-query, not only those contributing to the page; its
// value query is skipped downstream because the meta reports numberReturned 0.
if (featureCountdown <= 0 || (Objects.equals(lastTable, currentTable) && found < chunkSize)) {
if (computeNumberMatched && !Objects.equals(lastTable, currentTable)) {
return Optional.of(Tuple.of(0L, 0L));
}
return Optional.empty();
}

Expand Down Expand Up @@ -109,7 +116,11 @@ void register(String currentTable, SqlRowMeta metaResult) {
default Reactive.Source<SqlRow> getSourceStream(
SqlQueryBatch queryBatch, SqlQueryOptions options) {
Paging paging =
new Paging(queryBatch.getLimit(), queryBatch.getOffset(), queryBatch.getChunkSize());
new Paging(
queryBatch.getLimit(),
queryBatch.getOffset(),
queryBatch.getChunkSize(),
queryBatch.isComputeNumberMatched());

Source<SqlRow> sqlRowSource1 =
Source.iterable(queryBatch.getQuerySets())
Expand Down Expand Up @@ -196,10 +207,17 @@ default Reactive.Source<SqlRow> getSourceStream(
.via(
Transformer.flatMap(
plan -> {
if (queryBatch.isSingleFeature()) {
if (queryBatch.isSingleFeature() || queryBatch.isUnpaged()) {
boolean unpaged = queryBatch.isUnpaged();
List<SqlQuerySet> querySets = queryBatch.getQuerySets();
// a single-shot query computes neither count; -1 marks both as absent (the
// decoder leaves numberReturned unset and numberMatched stays empty)
// instead of reporting 0
ImmutableSqlRowMeta sqlRowMeta =
getMetaQueryResult(0L, 0L, 0L, 0L, -1L).build();
getMetaQueryResult(0L, 0L, unpaged ? -1L : 0L, unpaged ? -1L : 0L, -1L)
.build();
// a server-side cursor keeps memory bounded while streaming all rows
int fetchSize = unpaged ? (int) queryBatch.getChunkSize() : 0;
return Source.iterable(
IntStream.range(0, querySets.size())
.boxed()
Expand Down Expand Up @@ -235,6 +253,7 @@ default Reactive.Source<SqlRow> getSourceStream(
querySets
.get(index)
.getQueryIndex())
.fetchSize(fetchSize)
.build()))
.toArray(
(IntFunction<Source<SqlRow>[]>) Source[]::new);
Expand All @@ -247,11 +266,14 @@ default Reactive.Source<SqlRow> getSourceStream(
List<SqlQuerySet> querySets = plan.first();
SqlRowMeta aggregatedMetaResult = plan.second().get(0);
List<SqlRowMeta> metaResults = plan.second().subList(1, plan.second().size());
// the value phase only reads window sub-queries; count-only ones are skipped
// via the numberReturned<=0 guard below, so it needs no count-only handling
Paging paging2 =
new Paging(
queryBatch.getLimit(),
queryBatch.getOffset(),
queryBatch.getChunkSize());
queryBatch.getChunkSize(),
false);
int[] i = {0};

if (options.isHitsOnly()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,26 @@ default boolean isSingleFeature() {
return false;
}

/**
* Single-shot (unpaged) mode: every matching row is read in one pass per (sub-query, table)
* without a meta query, without chunking, and without a key-range window. {@code limit} does not
* page and {@code numberReturned}/{@code numberMatched} are not computed.
*/
@Value.Default
default boolean isUnpaged() {
return false;
}

/**
* Whether {@code numberMatched} is reported. When enabled for a multi-query, the count is
* computed for every sub-query (not only those contributing to the current page) so that the
* reported {@code numberMatched} is the invariant total across all sub-queries; the value queries
* are still executed only for the sub-queries needed for the page.
*/
@Value.Default
default boolean isComputeNumberMatched() {
return false;
}

List<SqlQuerySet> getQuerySets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ default int getChunkSize() {
return 1000;
}

/**
* JDBC fetch size for the result set. When greater than 0, the query is executed inside a
* transaction so the database driver can use a server-side cursor and stream rows instead of
* buffering the whole result set in memory. Used for single-shot (unpaged) queries.
*/
@Value.Default
default int getFetchSize() {
return 0;
}

@Value.Default
default boolean isGeometryWkb() {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,31 @@ public Reactive.Source<SqlRow> getSourceStream(String query, SqlQueryOptions opt
}
List<SqlRow> logBuffer = new ArrayList<>(5);

// TODO encapsulating the query in a transaction is a workaround for what appears to be a bug in
// rxjava3-jdbc, see https://github.com/interactive-instruments/ldproxy/issues/1293
Flowable<SqlRow> flowable =
session
.select(query)
.get(
resultSet -> {
SqlRow row = new SqlRowVals(collator).read(resultSet, options);
org.davidmoten.rxjava3.jdbc.ResultSetMapper<SqlRow> mapper =
resultSet -> {
SqlRow row = new SqlRowVals(collator).read(resultSet, options);

if (LOGGER.isDebugEnabled(MARKER.SQL_RESULT) && logBuffer.size() < 10) {
logBuffer.add(row);
}
if (LOGGER.isDebugEnabled(MARKER.SQL_RESULT) && logBuffer.size() < 10) {
logBuffer.add(row);
}

return row;
});
return row;
};

// A positive fetch size requires a transaction so the database driver uses a server-side cursor
// and streams rows instead of buffering the whole result set in memory (PostgreSQL ignores the
// fetch size with autoCommit=true).
// TODO encapsulating the query in a transaction is also a workaround for what appears to be a

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new Todo was discovered. If it is not a priority right now,consider marking it for later attention.
TODO encapsulating the query in a transaction is also a workaround for what appears to be a

// bug in rxjava3-jdbc, see https://github.com/interactive-instruments/ldproxy/issues/1293
Flowable<SqlRow> flowable =
options.getFetchSize() > 0
? session
.select(query)
.transacted()
.fetchSize(options.getFetchSize())
.valuesOnly()
.get(mapper)
: session.select(query).get(mapper);

// TODO: prettify, see
// https://github.com/slick/slick/blob/main/slick/src/main/scala/slick/jdbc/StatementInvoker.scala
Expand Down
Loading
Loading