Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
#
# These are explicitly windows files and should use crlf
*.bat text eol=crlf

gradlew text eol=lf
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@

# NATS stuff #
# NATS / custom #
##############
gnatsd.log
.claude/**
**/Debug*.java
*.csv

# Compiled source #
Expand Down
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ plugins {
id("signing")
}

def jarVersion = "0.0.0"
def jarVersion = "0.0.1"
group = 'io.synadia'

def isRelease = System.getenv("BUILD_EVENT") == "release"
Expand Down Expand Up @@ -40,7 +40,7 @@ repositories {
}

dependencies {
implementation 'io.nats:jnats:2.25.2'
implementation 'io.nats:jnats:2.25.3'
implementation 'org.jspecify:jspecify:1.0.0'

testImplementation 'io.nats:jnats-server-runner:3.1.0'
Expand Down
2 changes: 1 addition & 1 deletion downloader/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ repositories {
}

dependencies {
implementation 'io.nats:jnats:2.25.2'
implementation 'io.nats:jnats:2.25.3'
implementation 'io.synadia:active-passive:0.0.0-SNAPSHOT'
implementation 'io.synadia:active-passive-jdk17:0.0.0-SNAPSHOT'
implementation 'io.synadia:active-passive-jdk21:0.0.0-SNAPSHOT'
Expand Down
2 changes: 1 addition & 1 deletion downloader/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
<dependency>
<groupId>io.nats</groupId>
<artifactId>jnats</artifactId>
<version>2.25.2</version>
<version>2.25.3</version>
</dependency>
<dependency>
<groupId>io.synadia</groupId>
Expand Down
72 changes: 68 additions & 4 deletions src/main/java/io/nats/client/impl/ApPassiveServerPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,44 @@
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;

import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

public class ApPassiveServerPool implements ServerPool {
final ServerPool pool;
final AtomicReference<NatsUri> activeServerRef;
final Map<String, List<String>> resolvedMap;
Options.HostnameResolveMode resolveMode;

public ApPassiveServerPool(ServerPool pool) {
this.pool = pool;
activeServerRef = new AtomicReference<>();
resolvedMap = new HashMap<>();
}

public void setActiveServer(NatsUri activeNuri) {
activeServerRef.set(activeNuri);
resolve(activeNuri);
}

@Override
public void initialize(@NonNull Options opts) {
resolveMode = opts.hostnameResolveMode();
pool.initialize(opts);
resolve(pool.getServerList());
}

@Override
public boolean acceptDiscoveredUrls(@NonNull List<@NonNull String> discoveredServers) {
return pool.acceptDiscoveredUrls(discoveredServers);
boolean accepted = pool.acceptDiscoveredUrls(discoveredServers);
if (accepted) {
resolve(pool.getServerList());
}
return accepted;
}

@Override
Expand All @@ -41,7 +55,7 @@ public boolean acceptDiscoveredUrls(@NonNull List<@NonNull String> discoveredSer

NatsUri firstPeek = pool.peekNextServer();
NatsUri peek = firstPeek;
while (peek != null && peek.equivalent(active)) {
while (peek != null && isEquivalent(peek, active)) {
pool.nextServer(); // advance and peek again
peek = pool.peekNextServer();
if (peek == firstPeek) { // if we've looped around, nothing else we can do
Expand All @@ -51,6 +65,26 @@ public boolean acceptDiscoveredUrls(@NonNull List<@NonNull String> discoveredSer
return peek;
}

private boolean isEquivalent(NatsUri test, NatsUri active) {
String testHost = test.getHost();
String activeHost = active.getHost();
if (testHost.equals(activeHost)) {
return true;
}
if (resolveMode.resolve) {
// The NATS server reports discovered servers as IP addresses, while users typically
// supply hostnames - so a comparison is always host-vs-IP, never hostname-vs-hostname
// by resolution (two distinct hostnames only match by the direct equality check above).
if (active.hostIsIpAddress()) {
// active is an IP: equivalent only to a test hostname that resolves to that IP.
return !test.hostIsIpAddress() && _resolveHostToIps(testHost).contains(activeHost);
}
// active is a hostname: equivalent only to a test IP that is one of its resolved IPs.
return test.hostIsIpAddress() && _resolveHostToIps(activeHost).contains(testHost);
}
return false;
}

@Override
public @Nullable NatsUri nextServer() {
NatsUri active = activeServerRef.get();
Expand All @@ -59,7 +93,7 @@ public boolean acceptDiscoveredUrls(@NonNull List<@NonNull String> discoveredSer
}
NatsUri firstServer = pool.nextServer();
NatsUri server = firstServer;
while (server != null && server.equivalent(active)) {
while (server != null && isEquivalent(server, active)) {
server = pool.nextServer(); // get the next nextServer
if (server == firstServer) { // if we've looped around, nothing else we can do
break;
Expand All @@ -70,7 +104,37 @@ public boolean acceptDiscoveredUrls(@NonNull List<@NonNull String> discoveredSer

@Override
public @Nullable List<String> resolveHostToIps(@NonNull String host) {
return pool.resolveHostToIps(host);
return _resolveHostToIps(host);
}

private @NonNull List<String> _resolveHostToIps(@NonNull String host) {
List<String> resolved = resolvedMap.get(host);
if (resolved == null) {
resolved = pool.resolveHostToIps(host, resolveMode.maxOneResult, resolveMode.includeIPV6);
if (resolved == null || resolved.isEmpty()) {
// placeholder so we don't keep re-resolving a host that resolves to nothing
resolved = Collections.emptyList();
}
resolvedMap.put(host, resolved);
}
return resolved;
}

private void resolve(NatsUri nuri) {
if (!nuri.hostIsIpAddress() && !nuri.isWebsocket() && resolveMode.resolve) {
_resolveHostToIps(nuri.getHost());
}
}

private void resolve(@NonNull List<String> serverList) {
for (String server : serverList) {
try {
resolve(new NatsUri(server));
}
catch (URISyntaxException e) {
throw new RuntimeException(e); // this should never happen, if it does it's a user error
}
}
}

@Override
Expand Down
Loading
Loading