From 1b4008ffc70e99b1650337ba7ba8749338721e39 Mon Sep 17 00:00:00 2001 From: Clemens Portele Date: Sun, 28 Jun 2026 15:15:19 +0200 Subject: [PATCH] streams: parallel execution of independent sources with ordered output Add two operators that let independent inner sources run concurrently without changing the order of the emitted items: - Transformer.flatMapConcurrent(fn, maxConcurrency, prefetch): subscribes up to maxConcurrency mapped inner sources at once while emitting their items in the original order (concatMapEager). - Source.guarded(acquire, release, inner): runs acquire on subscribe and release on termination on a worker thread, so a blocking acquire (e.g. taking permits from a semaphore that bounds resource use) does not block the subscribing thread. --- .../xtraplatform/streams/app/ReactiveRx.java | 17 ++++++++++++++ .../streams/app/SourceDefault.java | 23 +++++++++++++++++++ .../streams/app/TransformerDefault.java | 22 +++++++++++++++++- .../xtraplatform/streams/domain/Reactive.java | 23 +++++++++++++++++++ .../streams/domain/ReactiveRxSpec.groovy | 22 ++++++++++++++++++ 5 files changed, 106 insertions(+), 1 deletion(-) diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java index fe7024b06..9d986b969 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/ReactiveRx.java @@ -13,6 +13,7 @@ import hu.akarnokd.rxjava3.operators.Flowables; import io.reactivex.rxjava3.core.Flowable; import io.reactivex.rxjava3.processors.UnicastProcessor; +import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.subscribers.DefaultSubscriber; import jakarta.inject.Inject; import jakarta.inject.Singleton; @@ -130,6 +131,17 @@ static Flowable assemble(SourceDefault source) { byteBuffer -> Arrays.copyOfRange( byteBuffer.array(), byteBuffer.position(), byteBuffer.limit())); + case GUARDED: + // acquire on subscribe, release on terminate; run on a worker thread (subscribeOn) so a + // blocking acquire does not block the subscribing/drain thread + return Flowable.using( + () -> { + source.getAcquire().run(); + return Boolean.TRUE; + }, + ignored -> assemble(source.getGuardedInner()), + ignored -> source.getRelease().run()) + .subscribeOn(Schedulers.io()); } throw new IllegalStateException(); @@ -197,6 +209,11 @@ static Flowable assemble(Flowable flowable, TransformerDefault assemble(transformer.getFlatMap().apply(u))); + case FLATMAP_EAGER: + return flowable.concatMapEager( + u -> assemble(transformer.getFlatMap().apply(u)), + transformer.getMaxConcurrency(), + transformer.getPrefetch() > 0 ? transformer.getPrefetch() : Flowable.bufferSize()); } throw new IllegalStateException(); diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java index 395d4b9db..5c834f965 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/SourceDefault.java @@ -26,6 +26,7 @@ enum Type { PUBLISHER, SINGLE, INPUT_STREAM, + GUARDED, } private final Type type; @@ -37,6 +38,9 @@ enum Type { private Source prepend; private Source mergeSorted; private Comparator mergeSortedComparator; + private Runnable acquire; + private Runnable release; + private Source guardedInner; public SourceDefault(Iterable iterable) { this(Type.ITERABLE, iterable, null, null, null); @@ -54,6 +58,13 @@ public SourceDefault(InputStream inputStream) { this(Type.INPUT_STREAM, null, null, null, inputStream); } + public SourceDefault(Runnable acquire, Runnable release, Source guardedInner) { + this(Type.GUARDED, null, null, null, null); + this.acquire = acquire; + this.release = release; + this.guardedInner = guardedInner; + } + SourceDefault( Type type, Iterable iterable, Publisher publisher, T item, InputStream inputStream) { this.type = type; @@ -148,4 +159,16 @@ public Optional> getMergeSorted() { public Optional> getMergeSortedComparator() { return Optional.ofNullable(mergeSortedComparator); } + + public Runnable getAcquire() { + return acquire; + } + + public Runnable getRelease() { + return release; + } + + public Source getGuardedInner() { + return guardedInner; + } } diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java index 2968e0473..2ccce5b45 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/app/TransformerDefault.java @@ -24,7 +24,8 @@ public enum Type { FILTER, PEEK, REDUCE, - FLATMAP + FLATMAP, + FLATMAP_EAGER } private final Type type; @@ -37,6 +38,8 @@ public enum Type { private Source prepend; private Source mergeSorted; private Comparator mergeSortedComparator; + private int prefetch; + private int maxConcurrency; public TransformerDefault(Function function) { this(Type.MAP, function, null, null, null, null, null); @@ -46,6 +49,15 @@ public static TransformerDefault flatMap(Function> fun return new TransformerDefault<>(Type.FLATMAP, null, null, null, null, null, function); } + public static TransformerDefault flatMapConcurrent( + Function> function, int maxConcurrency, int prefetch) { + TransformerDefault transformer = + new TransformerDefault<>(Type.FLATMAP_EAGER, null, null, null, null, null, function); + transformer.maxConcurrency = maxConcurrency; + transformer.prefetch = prefetch; + return transformer; + } + public TransformerDefault(Predicate predicate) { this(Type.FILTER, null, predicate, null, null, null, null); } @@ -123,6 +135,14 @@ public Function> getFlatMap() { return flatMap; } + public int getPrefetch() { + return prefetch; + } + + public int getMaxConcurrency() { + return maxConcurrency; + } + public Optional> getPrepend() { return Optional.ofNullable(prepend); } diff --git a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java index 3df1c01ee..2f0815ba9 100644 --- a/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java +++ b/xtraplatform-streams/src/main/java/de/ii/xtraplatform/streams/domain/Reactive.java @@ -86,6 +86,17 @@ static Source single(T item) { static Source inputStream(InputStream inputStream) { return new SourceDefault<>(inputStream); } + + /** + * Wraps {@code inner} so that {@code acquire} runs once when the source is subscribed and + * {@code release} runs once when it terminates (completion, error, or cancellation). The + * acquire/release run on a worker thread, so a blocking {@code acquire} (e.g. acquiring + * connection permits from a semaphore) does not block the subscribing thread. Used to bound the + * connections in flight across concurrently running sub-queries. + */ + static Source guarded(Runnable acquire, Runnable release, Source inner) { + return new SourceDefault<>(acquire, release, inner); + } } interface Transformer { @@ -145,6 +156,18 @@ static Transformer reduce(U zero, BiFunction reducer) { static Transformer flatMap(Function> flatMap) { return TransformerDefault.flatMap(flatMap); } + + /** + * Like {@link #flatMap(Function)} but subscribes to up to {@code maxConcurrency} of the mapped + * inner sources concurrently while still emitting their items strictly in the original order + * (RxJava {@code concatMapEager}). Each inner source may run ahead by up to {@code prefetch} + * buffered items (0 selects the implementation default). Use to overlap independent inner + * sources (e.g. parallel SQL sub-queries) without reordering the output. + */ + static Transformer flatMapConcurrent( + Function> flatMap, int maxConcurrency, int prefetch) { + return TransformerDefault.flatMapConcurrent(flatMap, maxConcurrency, prefetch); + } } interface TransformerCustom extends Transformer { diff --git a/xtraplatform-streams/src/test/groovy/de/ii/xtraplatform/streams/domain/ReactiveRxSpec.groovy b/xtraplatform-streams/src/test/groovy/de/ii/xtraplatform/streams/domain/ReactiveRxSpec.groovy index f6ed32b1d..7bf283445 100644 --- a/xtraplatform-streams/src/test/groovy/de/ii/xtraplatform/streams/domain/ReactiveRxSpec.groovy +++ b/xtraplatform-streams/src/test/groovy/de/ii/xtraplatform/streams/domain/ReactiveRxSpec.groovy @@ -203,6 +203,28 @@ class ReactiveRxSpec extends Specification { e.cause instanceof IllegalStateException } + def "flatMapConcurrent preserves the order of the inner sources"() { + given: + // each of the 10 inner sources expands to a contiguous 10-block; with up to 4 subscribed + // concurrently the blocks must still come out strictly in order (0..99), never interleaved + Reactive.Stream> stream = Source.iterable(0..9) + .via(Transformer.flatMapConcurrent({ Integer i -> Source.iterable((i * 10)..(i * 10 + 9)) }, 4, 8)) + .to(Sink.ignore()) + .withResult([ids: []] as Map) + .handleError((result, throwable) -> { result.error = throwable; return result; }) + .handleItem((result, id) -> { + result.ids << id + return result + }) + + when: + def result = runStream(stream) + + then: + result.error == null + result.ids == (0..99).toList() + } + static Transformer transformerLogging() { return Transformer.peek((Integer i) -> println(i)) }