diff --git a/CHANGELOG.md b/CHANGELOG.md index 021b930d..efd207dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,17 @@ The format is inspired by Keep a Changelog, and this project adheres to semantic ## [Unreleased] +## [2.0.5] - 2026-05-26 + +### Fixed +- `NostrRelayClient.send()` no longer leaves the client stuck in the "request in flight" state when the write fails for a reason other than overflow. A plain transport `IOException` from the gated send previously bypassed `pendingRequest` cleanup, so the next `send()` (including an `@NostrRetryable` retry) hit `IllegalStateException: A request is already in flight` instead of retrying. `send()` now clears `pendingRequest` on any send failure, not only `SessionLimitExceededException`. (PR #525 review follow-up.) +- `closeQuietly()` now logs the swallowed throwable (with stack trace) and the relay URI instead of just the exception message, preserving diagnostics for best-effort closes. + +## [2.0.4] - 2026-05-26 + +### Fixed +- Close-vs-write race in `NostrRelayClient` (spec-026 US3). The client wrapped its session in Spring's `ConcurrentWebSocketSessionDecorator` but called the **no-arg** `clientSession.close()` (in `close()` and the `send()` timeout path). Spring 6.2.x's decorator overrides only `close(CloseStatus)` (guarded by `closeLock`); the no-arg `close()` falls through to `WebSocketSessionDecorator.close()` → `delegate.close()` with no coordination, sending a CLOSE frame straight to the Tomcat delegate while a `sendMessage` flush was in flight (Tomcat permits only one write in flight → `IllegalStateException: Concurrent write operations are not permitted`). Routing through `close(CloseStatus)` alone is insufficient because its `closeLock` is a separate lock from the `flushLock` guarding `delegate.sendMessage`. Introduced a `ReentrantReadWriteLock` session gate: every write (`send`/`subscribe`) holds the read side (sends stay concurrent — the decorator still serialises the delegate writes among them), every close holds the write side and always uses `close(CloseStatus)`, so a close waits for all in-flight sends to drain before sending the CLOSE frame. No lock nesting → no added deadlock risk. + ### Removed - Dead code cleanup — deleted unused classes: `IContent`, `JsonContent`, `Reaction` enum, `Response`, `Nip05Content`, `Nip05ContentDecoder`, `BaseAuthMessage`, `GenericMessage`, `IKey`, `GenericEventConverter`, `GenericEventTypeClassifier`, `GenericEventDecoder`, `FiltersDecoder`, `BaseTagDecoder`, `GenericEventValidator`, `GenericEventSerializer`, `GenericEventUpdater`, `GenericTagQuery`, `HttpClientProvider`, `DefaultHttpClientProvider`. - `testAuthMessage` test and `GenericEventSupportTest` removed (tested deleted classes). diff --git a/nostr-java-client/pom.xml b/nostr-java-client/pom.xml index fc9eb0b2..2f1f6a46 100644 --- a/nostr-java-client/pom.xml +++ b/nostr-java-client/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.3 + 2.0.5 ../pom.xml diff --git a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java index 0fda1289..858b962a 100644 --- a/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java +++ b/nostr-java-client/src/main/java/nostr/client/springwebsocket/NostrRelayClient.java @@ -38,6 +38,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Consumer; /** @@ -116,6 +117,18 @@ public class NostrRelayClient extends TextWebSocketHandler implements AutoClosea */ private final String relayUri; private final ReentrantLock sendLock = new ReentrantLock(); + /** + * Gates session writes against session close. Every {@code clientSession} + * write ({@code sendMessage}) holds the READ side; every close holds the + * WRITE side. This lets concurrent sends proceed in parallel — the + * {@link ConcurrentWebSocketSessionDecorator} still serialises the actual + * delegate writes among them — while a close waits for all in-flight sends + * to drain before sending a CLOSE frame. Needed because the decorator's + * {@code close(CloseStatus)} uses a separate lock from the flush lock, so it + * cannot by itself prevent a CLOSE frame from racing an in-flight write on + * the Tomcat delegate (which permits only one write in flight). + */ + private final ReentrantReadWriteLock sessionGate = new ReentrantReadWriteLock(); private PendingRequest pendingRequest; private final Map listeners = new ConcurrentHashMap<>(); private final AtomicReference connectionState = @@ -476,26 +489,29 @@ public List send(String json) throws IOException { } request = new PendingRequest(maxEventsPerRequest); pendingRequest = request; - log.info("Sending request to relay {}: {}", relayUri, json); - try { - clientSession.sendMessage(new TextMessage(json)); - } catch (SessionLimitExceededException e) { - // OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws; - // it does NOT close the delegate session. Close it explicitly here so - // upstream callers' isOpen()==false reconnect contract holds. - pendingRequest = null; - try { - clientSession.close(CloseStatus.SESSION_NOT_RELIABLE); - } catch (IOException closeEx) { - // Logged but not propagated — the original cause is the signal. - log.warn("Failed to close session after overflow: {}", closeEx.getMessage()); - } - throw new IOException("Failed to send relay payload", e); - } } finally { sendLock.unlock(); } + log.info("Sending request to relay {}: {}", relayUri, json); + try { + sendFrameGated(json); + } catch (SessionLimitExceededException e) { + // OverflowStrategy.TERMINATE only sets the limitExceeded flag and throws; + // it does NOT close the delegate session. Close it explicitly (under the + // write gate, outside the read gate just released) so upstream callers' + // isOpen()==false reconnect contract holds. + clearPendingRequest(request); + closeQuietly(CloseStatus.SESSION_NOT_RELIABLE, "after overflow"); + throw new IOException("Failed to send relay payload", e); + } catch (IOException | RuntimeException e) { + // Any other send failure must also release the in-flight slot, or the next + // send() — including @NostrRetryable's own retry — hits the "request already + // in flight" guard instead of retrying. + clearPendingRequest(request); + throw e; + } + long timeout = awaitTimeoutMs > 0 ? awaitTimeoutMs : DEFAULT_AWAIT_TIMEOUT_MS; log.debug("Waiting for relay response with timeout={}ms", timeout); @@ -513,11 +529,7 @@ public List send(String json) throws IOException { } finally { sendLock.unlock(); } - try { - clientSession.close(); - } catch (IOException closeEx) { - log.warn("Error closing session after timeout", closeEx); - } + closeQuietly(CloseStatus.SESSION_NOT_RELIABLE, "after timeout"); throw new RelayTimeoutException(timeout); } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -595,7 +607,7 @@ public AutoCloseable subscribe( new ListenerRegistration(messageListener, errorListener, closeListener)); try { - clientSession.sendMessage(new TextMessage(requestJson)); + sendFrameGated(requestJson); } catch (IOException e) { listeners.remove(listenerId); throw e; @@ -605,12 +617,7 @@ public AutoCloseable subscribe( // it does NOT close the delegate session. Close it explicitly here so // upstream callers' isOpen()==false reconnect contract holds. if (e instanceof SessionLimitExceededException) { - try { - clientSession.close(CloseStatus.SESSION_NOT_RELIABLE); - } catch (IOException closeEx) { - // Logged but not propagated — the original cause is the signal. - log.warn("Failed to close session after overflow: {}", closeEx.getMessage()); - } + closeQuietly(CloseStatus.SESSION_NOT_RELIABLE, "after overflow"); } throw new IOException("Failed to send subscription payload", e); } @@ -698,7 +705,39 @@ public AutoCloseable recoverSubscription( @Override public void close() throws IOException { - if (clientSession != null) { + closeGated(CloseStatus.NORMAL); + } + + /** + * Write one frame on the delegate while holding the READ side of the session + * gate, so it can run concurrently with other sends (the + * {@link ConcurrentWebSocketSessionDecorator} serialises the delegate writes + * among them) but never overlaps a close (which takes the WRITE side). + */ + private void sendFrameGated(String json) throws IOException { + sessionGate.readLock().lock(); + try { + clientSession.sendMessage(new TextMessage(json)); + } finally { + sessionGate.readLock().unlock(); + } + } + + /** + * Close the underlying session through {@code close(CloseStatus)} while + * holding the WRITE side of the session gate. This excludes every in-flight + * {@link #sendFrameGated} (which holds the read side), so the delegate never + * sees a CLOSE frame and a data frame at the same time. The no-arg + * {@code close()} is deliberately never used: it is not overridden by + * {@link ConcurrentWebSocketSessionDecorator} and would bypass its + * close/flush coordination entirely. + */ + private void closeGated(CloseStatus status) throws IOException { + if (clientSession == null) { + return; + } + sessionGate.writeLock().lock(); + try { boolean open = false; try { open = clientSession.isOpen(); @@ -706,8 +745,38 @@ public void close() throws IOException { log.warn("Exception while checking if clientSession is open during close()", e); } if (open) { - clientSession.close(); + clientSession.close(status); + } + } finally { + sessionGate.writeLock().unlock(); + } + } + + /** + * Best-effort {@link #closeGated} that logs rather than propagates failures — + * including the {@code SessionLimitExceededException} the decorator may raise + * while closing, so a best-effort close never masks the original timeout or + * overflow signal the caller is about to throw. + */ + private void closeQuietly(CloseStatus status, String reason) { + try { + closeGated(status); + } catch (Exception e) { + // Swallow but preserve diagnostics — pass the throwable so the stack trace + // and root cause are logged, not just the message. + log.warn("Error closing session {} on relay {}", reason, relayUri, e); + } + } + + /** Clear {@link #pendingRequest} if it still points at {@code request}. */ + private void clearPendingRequest(PendingRequest request) { + sendLock.lock(); + try { + if (pendingRequest == request) { + pendingRequest = null; } + } finally { + sendLock.unlock(); } } diff --git a/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientCloseWriteRaceTest.java b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientCloseWriteRaceTest.java new file mode 100644 index 00000000..7ddc6b93 --- /dev/null +++ b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientCloseWriteRaceTest.java @@ -0,0 +1,223 @@ +package nostr.client.springwebsocket; + +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; +import org.mockito.stubbing.Answer; +import org.springframework.web.socket.CloseStatus; +import org.springframework.web.socket.TextMessage; +import org.springframework.web.socket.WebSocketSession; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +/** + * Regression tests for the close-vs-write race (spec-026 US3). + * + *

Spring's {@link org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator} + * (Spring 6.2.x) overrides only {@code close(CloseStatus)} — guarded by a + * {@code closeLock} that is separate from the {@code flushLock} guarding + * {@code delegate.sendMessage}. It does not override the no-arg + * {@code close()}, which falls through to {@code WebSocketSessionDecorator.close()} + * → {@code delegate.close()} with no coordination at all. So a no-arg close sends a + * CLOSE frame straight to the Tomcat delegate while a {@code sendMessage} flush is in + * flight, tripping Tomcat's single-in-flight-write guard + * ({@code IllegalStateException: Concurrent write operations are not permitted}). + * + *

The fix has two parts, both asserted here: + *

    + *
  1. Routing — every close goes through {@code close(CloseStatus)}, never + * the no-arg {@code close()} (tests {@link #close_routesThroughCloseStatus_neverNoArgClose} + * and {@link #sendTimeout_routesCloseThroughCloseStatus_neverNoArgClose}).
  2. + *
  3. Serialisation — {@code subscribe()} / {@code send()} writes hold the + * READ side of the client's {@code sessionGate} ({@code ReentrantReadWriteLock}) + * and {@code close()} holds the WRITE side, so a close waits for all in-flight + * writes to drain and the delegate never sees a write and a close at once + * (test {@link #concurrentSubscribeAndClose_neverOverlapOnDelegate}).
  4. + *
+ */ +class NostrRelayClientCloseWriteRaceTest { + + private static final long TEST_AWAIT_TIMEOUT_MS = 5_000L; + private static final String REQ = "[\"REQ\",\"sub-1\",{}]"; + + // ---- Routing: close() must use close(CloseStatus), never the no-arg close() ---- + @Test + void close_routesThroughCloseStatus_neverNoArgClose() throws Exception { + AtomicBoolean isOpen = new AtomicBoolean(true); + WebSocketSession raw = Mockito.mock(WebSocketSession.class); + Mockito.when(raw.isOpen()).thenAnswer(inv -> isOpen.get()); + Mockito.doAnswer(inv -> { isOpen.set(false); return null; }) + .when(raw).close(any(CloseStatus.class)); + + NostrRelayClient client = + NostrRelayClient.forTestWithDecoratedSession(raw, TEST_AWAIT_TIMEOUT_MS); + + client.close(); + + // The decorator's close(CloseStatus) calls super.close(status) → delegate.close(status). + verify(raw, times(1)).close(any(CloseStatus.class)); + verify(raw, never()).close(); + } + + // ---- Routing: the send() timeout path must close via CloseStatus, not no-arg ---- + @Test + void sendTimeout_routesCloseThroughCloseStatus_neverNoArgClose() throws Exception { + AtomicBoolean isOpen = new AtomicBoolean(true); + WebSocketSession raw = Mockito.mock(WebSocketSession.class); + Mockito.when(raw.isOpen()).thenAnswer(inv -> isOpen.get()); + // sendMessage succeeds but no response is ever dispatched back, so send() times out. + Mockito.doNothing().when(raw).sendMessage(any(TextMessage.class)); + Mockito.doAnswer(inv -> { isOpen.set(false); return null; }) + .when(raw).close(any(CloseStatus.class)); + + // Short await timeout so the test does not block for the 5 s default. + NostrRelayClient client = + NostrRelayClient.forTestWithDecoratedSession(raw, 200L); + + assertThrows(RelayTimeoutException.class, () -> client.send(REQ)); + + verify(raw, times(1)).close(any(CloseStatus.class)); + verify(raw, never()).close(); + } + + // ---- Serialisation: an in-flight subscribe write and a concurrent close + // must never overlap on the delegate. ---- + @Test + void concurrentSubscribeAndClose_neverOverlapOnDelegate() throws Exception { + AtomicBoolean isOpen = new AtomicBoolean(true); + AtomicInteger delegateOps = new AtomicInteger(); // writes + closes in flight on the delegate + AtomicInteger maxDelegateOps = new AtomicInteger(); + AtomicBoolean raceDetected = new AtomicBoolean(false); + + CountDownLatch sendInFlight = new CountDownLatch(1); // signalled while the send is parked + CountDownLatch releaseSend = new CountDownLatch(1); // test releases the parked send + + WebSocketSession raw = Mockito.mock(WebSocketSession.class); + Mockito.when(raw.isOpen()).thenAnswer(inv -> isOpen.get()); + + // First sendMessage parks in-flight (holding the decorator flushLock and this + // client's sessionGate read lock) until the test releases it; later sends do + // not park. + AtomicBoolean parkedOnce = new AtomicBoolean(false); + Answer sendAnswer = inv -> { + enter(delegateOps, maxDelegateOps, raceDetected); + try { + if (parkedOnce.compareAndSet(false, true)) { + sendInFlight.countDown(); + if (!releaseSend.await(10, TimeUnit.SECONDS)) { + throw new IllegalStateException("parked send was never released"); + } + } + } finally { + delegateOps.decrementAndGet(); + } + return null; + }; + Answer closeAnswer = inv -> { + enter(delegateOps, maxDelegateOps, raceDetected); + try { + isOpen.set(false); + } finally { + delegateOps.decrementAndGet(); + } + return null; + }; + Mockito.doAnswer(sendAnswer).when(raw).sendMessage(any(TextMessage.class)); + Mockito.doAnswer(closeAnswer).when(raw).close(); + Mockito.doAnswer(closeAnswer).when(raw).close(any(CloseStatus.class)); + + NostrRelayClient client = + NostrRelayClient.forTestWithDecoratedSession(raw, TEST_AWAIT_TIMEOUT_MS); + + AtomicReference subscribeFailure = new AtomicReference<>(); + Thread subscriber = new Thread(() -> { + try { + client.subscribe(REQ, ignored -> {}, ignored -> {}, null); + } catch (Throwable t) { + subscribeFailure.set(t); + } + }, "subscriber"); + subscriber.setDaemon(true); + subscriber.start(); + + // Wait until the subscribe write is parked in flight on the delegate. + assertTrue(sendInFlight.await(5, TimeUnit.SECONDS), + "subscribe() never reached the delegate sendMessage"); + + AtomicReference closeFailure = new AtomicReference<>(); + Thread closer = new Thread(() -> { + try { + client.close(); + } catch (Throwable t) { + closeFailure.set(t); + } + }, "closer"); + closer.setDaemon(true); + closer.start(); + + // With the fix, close() blocks on the sessionGate WRITE lock, waiting for the + // READ lock held by the parked subscribe to be released, so the delegate close + // has NOT run yet — exactly one op (the write) in flight. + awaitThreadParked(closer, 2_000); + assertEquals(1, delegateOps.get(), + "close() must not reach the delegate while a send is in flight on it"); + + // Release the parked send; close can now proceed in serial order. + releaseSend.countDown(); + subscriber.join(5_000); + closer.join(5_000); + + assertFalse(raceDetected.get(), + "delegate saw a write and a close concurrently — close-vs-write race"); + assertEquals(1, maxDelegateOps.get(), + "at most one delegate write/close may be in flight at any time, was " + + maxDelegateOps.get()); + assertTrue(subscribeFailure.get() == null + || !(subscribeFailure.get() instanceof IllegalStateException), + "subscribe() raised a concurrent-write IllegalStateException: " + subscribeFailure.get()); + assertEquals(null, closeFailure.get(), + "close() failed: " + closeFailure.get()); + } + + // ----------------------------------------------------------------------- + // Helpers + // ----------------------------------------------------------------------- + + private static void enter( + AtomicInteger delegateOps, AtomicInteger maxDelegateOps, AtomicBoolean raceDetected) { + int now = delegateOps.incrementAndGet(); + maxDelegateOps.updateAndGet(prev -> Math.max(prev, now)); + if (now > 1) { + raceDetected.set(true); + } + } + + /** + * Poll until {@code thread} is parked (BLOCKED / WAITING / TIMED_WAITING) — i.e. + * blocked acquiring the sessionGate write lock — or the timeout elapses. + */ + private static void awaitThreadParked(Thread thread, long timeoutMs) throws InterruptedException { + long deadline = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeoutMs); + while (System.nanoTime() < deadline) { + Thread.State s = thread.getState(); + if (s == Thread.State.BLOCKED || s == Thread.State.WAITING + || s == Thread.State.TIMED_WAITING) { + return; + } + Thread.sleep(10); + } + } +} diff --git a/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientTimeoutTest.java b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientTimeoutTest.java index 12f85d72..21778e9f 100644 --- a/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientTimeoutTest.java +++ b/nostr-java-client/src/test/java/nostr/client/springwebsocket/NostrRelayClientTimeoutTest.java @@ -2,6 +2,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; @@ -12,10 +13,17 @@ public class NostrRelayClientTimeoutTest { @Test public void testTimeoutThrowsRelayTimeoutExceptionAndClosesSession() throws Exception { WebSocketSession session = Mockito.mock(WebSocketSession.class); + // The session is still open when the response times out, so the timeout + // path proceeds to close it. + Mockito.when(session.isOpen()).thenReturn(true); try (NostrRelayClient client = new NostrRelayClient(session, 100)) { assertThrows(RelayTimeoutException.class, () -> client.send("test")); } Mockito.verify(session).sendMessage(Mockito.any(TextMessage.class)); - Mockito.verify(session).close(); + // The fix routes every close through close(CloseStatus) — never the no-arg + // close(), which the ConcurrentWebSocketSessionDecorator does not override + // and which would bypass its close/flush coordination (spec-026 US3). + Mockito.verify(session, Mockito.atLeastOnce()).close(Mockito.any(CloseStatus.class)); + Mockito.verify(session, Mockito.never()).close(); } } diff --git a/nostr-java-client/src/test/java/nostr/client/springwebsocket/SpringWebSocketClientTest.java b/nostr-java-client/src/test/java/nostr/client/springwebsocket/SpringWebSocketClientTest.java index 4fe48ea1..7281af30 100644 --- a/nostr-java-client/src/test/java/nostr/client/springwebsocket/SpringWebSocketClientTest.java +++ b/nostr-java-client/src/test/java/nostr/client/springwebsocket/SpringWebSocketClientTest.java @@ -2,6 +2,7 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.springframework.web.socket.CloseStatus; import org.springframework.web.socket.TextMessage; import org.springframework.web.socket.WebSocketSession; @@ -64,7 +65,11 @@ void sendThrowsRelayTimeoutExceptionOnTimeout() throws Exception { assertThrows(RelayTimeoutException.class, () -> client.send("test")); } Mockito.verify(session).sendMessage(any(TextMessage.class)); - Mockito.verify(session, Mockito.atLeastOnce()).close(); + // The fix routes every close through close(CloseStatus) — never the no-arg + // close(), which the ConcurrentWebSocketSessionDecorator does not override + // and which would bypass its close/flush coordination (spec-026 US3). + Mockito.verify(session, Mockito.atLeastOnce()).close(any(CloseStatus.class)); + Mockito.verify(session, Mockito.never()).close(); } // Verifies subscription callbacks are delivered asynchronously and stop after unsubscribe. diff --git a/nostr-java-core/pom.xml b/nostr-java-core/pom.xml index aa188cba..d38bc5e9 100644 --- a/nostr-java-core/pom.xml +++ b/nostr-java-core/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.3 + 2.0.5 ../pom.xml diff --git a/nostr-java-event/pom.xml b/nostr-java-event/pom.xml index 8e1e5371..ba383a77 100644 --- a/nostr-java-event/pom.xml +++ b/nostr-java-event/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.3 + 2.0.5 ../pom.xml diff --git a/nostr-java-identity/pom.xml b/nostr-java-identity/pom.xml index 103a60e6..7001e999 100644 --- a/nostr-java-identity/pom.xml +++ b/nostr-java-identity/pom.xml @@ -4,7 +4,7 @@ xyz.tcheeric nostr-java - 2.0.3 + 2.0.5 ../pom.xml diff --git a/pom.xml b/pom.xml index eadff199..4b0f4314 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ xyz.tcheeric nostr-java - 2.0.3 + 2.0.5 pom nostr-java