Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import hu.akarnokd.rxjava3.operators.Flowables;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.processors.UnicastProcessor;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subscribers.DefaultSubscriber;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
Expand Down Expand Up @@ -130,6 +131,17 @@ static <U> Flowable<U> assemble(SourceDefault<U> source) {
byteBuffer ->
Arrays.copyOfRange(
byteBuffer.array(), byteBuffer.position(), byteBuffer.limit()));
case GUARDED:
// acquire on subscribe, release on terminate; run on a worker thread (subscribeOn) so a
// blocking acquire does not block the subscribing/drain thread
return Flowable.using(
() -> {
source.getAcquire().run();
return Boolean.TRUE;
},
ignored -> assemble(source.getGuardedInner()),
ignored -> source.getRelease().run())
.subscribeOn(Schedulers.io());
}

throw new IllegalStateException();
Expand Down Expand Up @@ -197,6 +209,11 @@ static <U, V> Flowable<V> assemble(Flowable<U> flowable, TransformerDefault<U, V
return flowable.reduce(transformer.getItem(), transformer.getReducer()::apply).toFlowable();
case FLATMAP:
return flowable.concatMap(u -> assemble(transformer.getFlatMap().apply(u)));
case FLATMAP_EAGER:
return flowable.concatMapEager(
u -> assemble(transformer.getFlatMap().apply(u)),
transformer.getMaxConcurrency(),
transformer.getPrefetch() > 0 ? transformer.getPrefetch() : Flowable.bufferSize());
}

throw new IllegalStateException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ enum Type {
PUBLISHER,
SINGLE,
INPUT_STREAM,
GUARDED,
}

private final Type type;
Expand All @@ -37,6 +38,9 @@ enum Type {
private Source<T> prepend;
private Source<T> mergeSorted;
private Comparator<T> mergeSortedComparator;
private Runnable acquire;
private Runnable release;
private Source<T> guardedInner;

public SourceDefault(Iterable<T> iterable) {
this(Type.ITERABLE, iterable, null, null, null);
Expand All @@ -54,6 +58,13 @@ public SourceDefault(InputStream inputStream) {
this(Type.INPUT_STREAM, null, null, null, inputStream);
}

public SourceDefault(Runnable acquire, Runnable release, Source<T> guardedInner) {
this(Type.GUARDED, null, null, null, null);
this.acquire = acquire;
this.release = release;
this.guardedInner = guardedInner;
}

SourceDefault(
Type type, Iterable<T> iterable, Publisher<T> publisher, T item, InputStream inputStream) {
this.type = type;
Expand Down Expand Up @@ -148,4 +159,16 @@ public Optional<Source<T>> getMergeSorted() {
public Optional<Comparator<T>> getMergeSortedComparator() {
return Optional.ofNullable(mergeSortedComparator);
}

public Runnable getAcquire() {
return acquire;
}

public Runnable getRelease() {
return release;
}

public Source<T> getGuardedInner() {
return guardedInner;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public enum Type {
FILTER,
PEEK,
REDUCE,
FLATMAP
FLATMAP,
FLATMAP_EAGER
}

private final Type type;
Expand All @@ -37,6 +38,8 @@ public enum Type {
private Source<U> prepend;
private Source<U> mergeSorted;
private Comparator<U> mergeSortedComparator;
private int prefetch;
private int maxConcurrency;

public TransformerDefault(Function<T, U> function) {
this(Type.MAP, function, null, null, null, null, null);
Expand All @@ -46,6 +49,15 @@ public static <T, U> TransformerDefault<T, U> flatMap(Function<T, Source<U>> fun
return new TransformerDefault<>(Type.FLATMAP, null, null, null, null, null, function);
}

public static <T, U> TransformerDefault<T, U> flatMapConcurrent(
Function<T, Source<U>> function, int maxConcurrency, int prefetch) {
TransformerDefault<T, U> transformer =
new TransformerDefault<>(Type.FLATMAP_EAGER, null, null, null, null, null, function);
transformer.maxConcurrency = maxConcurrency;
transformer.prefetch = prefetch;
return transformer;
}

public TransformerDefault(Predicate<T> predicate) {
this(Type.FILTER, null, predicate, null, null, null, null);
}
Expand Down Expand Up @@ -123,6 +135,14 @@ public Function<T, Source<U>> getFlatMap() {
return flatMap;
}

public int getPrefetch() {
return prefetch;
}

public int getMaxConcurrency() {
return maxConcurrency;
}

public Optional<Source<U>> getPrepend() {
return Optional.ofNullable(prepend);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ static <T> Source<T> single(T item) {
static Source<byte[]> inputStream(InputStream inputStream) {
return new SourceDefault<>(inputStream);
}

/**
* Wraps {@code inner} so that {@code acquire} runs once when the source is subscribed and
* {@code release} runs once when it terminates (completion, error, or cancellation). The
* acquire/release run on a worker thread, so a blocking {@code acquire} (e.g. acquiring
* connection permits from a semaphore) does not block the subscribing thread. Used to bound the
* connections in flight across concurrently running sub-queries.
*/
static <T> Source<T> guarded(Runnable acquire, Runnable release, Source<T> inner) {
return new SourceDefault<>(acquire, release, inner);
}
}

interface Transformer<T, U> {
Expand Down Expand Up @@ -145,6 +156,18 @@ static <T, U> Transformer<T, U> reduce(U zero, BiFunction<U, T, U> reducer) {
static <T, U> Transformer<T, U> flatMap(Function<T, Source<U>> flatMap) {
return TransformerDefault.flatMap(flatMap);
}

/**
* Like {@link #flatMap(Function)} but subscribes to up to {@code maxConcurrency} of the mapped
* inner sources concurrently while still emitting their items strictly in the original order
* (RxJava {@code concatMapEager}). Each inner source may run ahead by up to {@code prefetch}
* buffered items (0 selects the implementation default). Use to overlap independent inner
* sources (e.g. parallel SQL sub-queries) without reordering the output.
*/
static <T, U> Transformer<T, U> flatMapConcurrent(
Function<T, Source<U>> flatMap, int maxConcurrency, int prefetch) {
return TransformerDefault.flatMapConcurrent(flatMap, maxConcurrency, prefetch);
}
}

interface TransformerCustom<T, U> extends Transformer<T, U> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,28 @@ class ReactiveRxSpec extends Specification {
e.cause instanceof IllegalStateException
}

def "flatMapConcurrent preserves the order of the inner sources"() {
given:
// each of the 10 inner sources expands to a contiguous 10-block; with up to 4 subscribed
// concurrently the blocks must still come out strictly in order (0..99), never interleaved
Reactive.Stream<Map<String, Object>> stream = Source.iterable(0..9)
.via(Transformer.flatMapConcurrent({ Integer i -> Source.iterable((i * 10)..(i * 10 + 9)) }, 4, 8))
.to(Sink.ignore())
.withResult([ids: []] as Map<String, Object>)
.handleError((result, throwable) -> { result.error = throwable; return result; })
.handleItem((result, id) -> {
result.ids << id
return result
})

when:
def result = runStream(stream)

then:
result.error == null
result.ids == (0..99).toList()
}

static Transformer<Integer, Integer> transformerLogging() {
return Transformer.peek((Integer i) -> println(i))
}
Expand Down