Skip to content

fix: event source tracking#470

Draft
nogringo wants to merge 4 commits into
masterfrom
test/469-event-source-tracking
Draft

fix: event source tracking#470
nogringo wants to merge 4 commits into
masterfrom
test/469-event-source-tracking

Conversation

@nogringo
Copy link
Copy Markdown
Collaborator

@nogringo nogringo commented Feb 26, 2026

Tests to demonstrate #469

Summary by CodeRabbit

  • New Features

    • Relay content filtering with configurable banned-word blocking.
    • Persistent tracking and merging of event source lists across relays and broadcasts.
  • Improvements

    • Broadcast flow now updates cached events with per-relay success sources; stream handling supports multiple listeners and avoids duplicate sources.
    • Request/stream processing aggregates sources and re-emits only when source sets grow.
  • Tests

    • Added tests for broadcast source attribution and multi-relay source merging.

@nogringo nogringo force-pushed the test/469-event-source-tracking branch from ede4168 to 6172c98 Compare March 25, 2026 11:04
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Mar 25, 2026

📝 Walkthrough

Walkthrough

Added relay-side content filtering via a configurable bannedWord, updated broadcast flow to track and merge per-relay sources into cached events, and introduced tests that validate broadcast-source tracking and source merging across multiple mock relays.

Changes

Cohort / File(s) Summary
MockRelay Content Filtering
packages/ndk/test/mocks/mock_relay.dart
Added String? bannedWord field and constructor parameter; relay now checks event.content for bannedWord after signature verification and replies OK(false, "blocked: content contains banned word"), aborting further processing for blocked events.
Broadcast logic & network reaction
packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart, packages/ndk/lib/domain_layer/entities/broadcast_state.dart, packages/ndk/lib/domain_layer/usecases/relay_manager.dart
BroadcastState network controller made a broadcast stream; added listeners to update cached event.sources when a relay reports successful OK responses; relay-manager also merges/deduplicates relay URLs into event.sources for inbound EVENT and OK handling.
Stream response dedup & cache integration
packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart, packages/ndk/lib/domain_layer/usecases/requests/requests.dart
Replaced global seen-set with per-event source sets (Map<id,Set>); StreamResponseCleaner now accepts optional CacheManager and updates cache when source sets grow; requests logic updated to aggregate and re-emit events when their source sets expand.
Integration tests
packages/ndk/test/usecases/broadcast_sources_test.dart, packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart
Added two tests: one confirming cached event has non-empty sources after broadcast; another validating source merging across three mock relays (one using banned-word filtering) and that final cached event has exactly two sources.
Minor formatting
packages/ndk/lib/presentation_layer/init.dart
Removed an extra blank line; no behavioral change.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant NDK
    participant EventVerifier
    participant MockRelay
    participant Cache

    Client->>NDK: broadcast(event)
    NDK->>EventVerifier: verify(event)
    EventVerifier-->>NDK: verified
    NDK->>MockRelay: EVENT message
    MockRelay->>MockRelay: check bannedWord in content
    alt Content contains banned word
        MockRelay-->>NDK: OK(false, "blocked: content contains banned word")
        NDK-->>Client: broadcastDone (failed)
    else Content allowed
        MockRelay->>Cache: store event (relay-side)
        MockRelay-->>NDK: OK(true)
        NDK->>Cache: load event, merge sources, save updated event
        NDK-->>Client: broadcastDone (success)
    end
Loading
sequenceDiagram
    participant Client
    participant NDK
    participant RelayA
    participant RelayB
    participant RelayC
    participant Cache

    Client->>NDK: broadcast(event)
    NDK->>RelayA: EVENT
    NDK->>RelayB: EVENT
    NDK->>RelayC: EVENT
    Note over RelayA,RelayC: RelayA or RelayC may block via bannedWord
    RelayA-->>NDK: OK(false) or OK(true)
    RelayB-->>NDK: OK(true)
    RelayC-->>NDK: OK(false) or OK(true)
    NDK->>Cache: for each OK(true), load event, merge relay URL into event.sources, save
    Client->>NDK: request(event_id)
    Cache-->>NDK: event with merged sources
    NDK-->>Client: event (sources show only relays that accepted it)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • frnandu
  • 1-leo

Poem

🐰 A little relay hops with care,
Blocking words that shouldn't fare,
Events leap out, then find their tracks,
Sources merged and stored in stacks,
Hooray — the network hums, safe and fair!

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main objective of the pull request: implementing event source tracking across multiple relays.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch test/469-event-source-tracking

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (2)
packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart (1)

48-52: Consider using addTearDown for robust cleanup.

If the assertion on line 46 fails, ndk.destroy() and the relay stopServer() calls won't execute, potentially leaving orphan servers. Using addTearDown ensures cleanup runs regardless of test outcome.

Proposed refactor using addTearDown
     await relay1.startServer();
     await relay2.startServer();
     await relay3.startServer();
+    addTearDown(() async {
+      await relay1.stopServer();
+      await relay2.stopServer();
+      await relay3.stopServer();
+    });

     final ndk = Ndk(NdkConfig(
       eventVerifier: MockEventVerifier(),
       cache: MemCacheManager(),
       bootstrapRelays: [relay1.url, relay2.url, relay3.url],
     ));
+    addTearDown(() => ndk.destroy());

     // ... rest of test ...

     expect(events.first.sources.length, equals(2));
-
-    await ndk.destroy();
-    await relay1.stopServer();
-    await relay2.stopServer();
-    await relay3.stopServer();
   });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart`
around lines 48 - 52, Replace the direct cleanup calls at the end of the test
with an addTearDown registration so cleanup always runs even if assertions fail:
register an async tearDown via addTearDown(() async { await ndk.destroy(); await
relay1.stopServer(); await relay2.stopServer(); await relay3.stopServer(); }); —
locate the cleanup calls around ndk.destroy() and relay*.stopServer() and move
them into that addTearDown callback (ensuring the callback is registered before
assertions run).
packages/ndk/test/usecases/broadcast_sources_test.dart (1)

41-43: Consider using addTearDown for robust cleanup.

Same as the other test file—if assertions fail, cleanup won't execute. Consider wrapping ndk.destroy() and relay.stopServer() in addTearDown callbacks.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ndk/test/usecases/broadcast_sources_test.dart` around lines 41 - 43,
The test currently calls await ndk.destroy() and await relay.stopServer()
directly at the end, which can be skipped if assertions fail; instead register
cleanup with addTearDown so they always run — e.g., in the test body call
addTearDown(() async => await ndk.destroy()) and addTearDown(() async => await
relay.stopServer()), ensuring you reference the existing ndk.destroy() and
relay.stopServer() methods so teardown always executes even on failures.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/ndk/test/usecases/broadcast_sources_test.dart`:
- Line 9: The test name string has a typo: change the test declaration
test("braodcast should update source", ...) to test("broadcast should update
source", ...) so the test description reads correctly; update the string in the
test function in broadcast_sources_test.dart (the test call named test(...)
containing "braodcast should update source") to "broadcast should update
source".
- Around line 34-39: The test fails because relay OK responses for broadcasts
aren't updating the cached event's sources; modify the relay response handling
in RelayManager (the code path handling RelayBroadcastResponse in
_processDecodedMessage) to load the event from cache, append the relay URL to
its sources (if not present), and call cacheManager.saveEvent(updatedEvent) so
broadcasts update sources similar to _handleIncomingEvent; ensure you reference
the same event.id and avoid duplicating sources when saving.

In
`@packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart`:
- Line 14: The test has a copy-paste error where MockRelay instantiation for the
variable relay3 uses the name "relay 2"; update the MockRelay call for relay3 to
use the correct name (e.g., "relay 3") so the MockRelay(name: ...) for relay3 is
distinct from relay2 and improves test clarity and debugging.

---

Nitpick comments:
In `@packages/ndk/test/usecases/broadcast_sources_test.dart`:
- Around line 41-43: The test currently calls await ndk.destroy() and await
relay.stopServer() directly at the end, which can be skipped if assertions fail;
instead register cleanup with addTearDown so they always run — e.g., in the test
body call addTearDown(() async => await ndk.destroy()) and addTearDown(() async
=> await relay.stopServer()), ensuring you reference the existing ndk.destroy()
and relay.stopServer() methods so teardown always executes even on failures.

In
`@packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart`:
- Around line 48-52: Replace the direct cleanup calls at the end of the test
with an addTearDown registration so cleanup always runs even if assertions fail:
register an async tearDown via addTearDown(() async { await ndk.destroy(); await
relay1.stopServer(); await relay2.stopServer(); await relay3.stopServer(); }); —
locate the cleanup calls around ndk.destroy() and relay*.stopServer() and move
them into that addTearDown callback (ensuring the callback is registered before
assertions run).

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 52333132-7d7d-46f2-a5b9-c09c5b9229e2

📥 Commits

Reviewing files that changed from the base of the PR and between b9db040 and 6172c98.

📒 Files selected for processing (3)
  • packages/ndk/test/mocks/mock_relay.dart
  • packages/ndk/test/usecases/broadcast_sources_test.dart
  • packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart

Comment thread packages/ndk/test/usecases/broadcast_sources_test.dart Outdated
Comment thread packages/ndk/test/usecases/broadcast_sources_test.dart
Comment thread packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart Outdated
@nogringo nogringo changed the title test: event source tracking fix: event source tracking Mar 25, 2026
@nogringo nogringo requested review from 1-leo and frnandu March 25, 2026 13:19
@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 25, 2026

Codecov Report

❌ Patch coverage is 92.15686% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.37%. Comparing base (b9db040) to head (9c7d8f0).
⚠️ Report is 173 commits behind head on master.

Files with missing lines Patch % Lines
...k/lib/domain_layer/usecases/requests/requests.dart 75.00% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #470      +/-   ##
==========================================
+ Coverage   73.36%   73.37%   +0.01%     
==========================================
  Files         195      195              
  Lines        8969     9012      +43     
==========================================
+ Hits         6580     6613      +33     
- Misses       2389     2399      +10     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Comment thread packages/ndk/lib/domain_layer/usecases/relay_manager.dart Outdated
Comment thread packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart Outdated
Comment thread packages/ndk/lib/domain_layer/usecases/relay_manager.dart
@nogringo nogringo requested review from 1-leo and frnandu March 28, 2026 23:32
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart`:
- Around line 74-91: The listener on broadcastState.networkController.stream
currently does a read-modify-write using the cached event per-relay, causing
races; instead, when response.broadcastSuccessful is true use the in-memory
broadcastState.event.sources (the monotonic union maintained by relay_manager)
as the source of truth: load the cached event via
_cacheManager.loadEvent(currentEvent.id), merge its sources with
broadcastState.event.sources (ensuring uniqueness), create an updatedEvent via
cachedEvent.copyWith(sources: mergedSources) and call
_cacheManager.saveEvent(updatedEvent); keep checks for currentEvent and
cachedEvent but stop merging only response.relayUrl so the cache reflects
broadcastState.event.sources.

In `@packages/ndk/lib/domain_layer/usecases/requests/requests.dart`:
- Line 278: The code unconditionally passes _cacheWrite.cacheManager (via the
cacheManager parameter) which allows cache mutations even when cacheWrite is
false; change the call site that currently sets cacheManager:
_cacheWrite.cacheManager to pass the cache manager only when caching is enabled
(e.g., cacheManager: _cacheWrite.cacheWrite ? _cacheWrite.cacheManager : null),
so that downstream saveEvent() in stream_response_cleaner (which calls
saveEvent() at lines 86-95) will not mutate sources for requests with
cacheWrite: false.
- Around line 377-390: The code mutates the stored Set by doing
existingSources..addAll(event.sources), so the size comparison always fails;
instead create a new Set clone when merging sources (e.g., newSources =
{...existingSources, ...event.sources}) and compare newSources.length to
existingSources.length, and only update seenEvents[event.id] and call
aggregatedController.add(event.copyWith(...)) when newSources is larger; apply
this change for both occurrences handling seenEvents, existingSources,
aggregatedController, and event.copyWith in the file.

In
`@packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart`:
- Around line 14-15: The async cache merge in StreamResponseCleaner currently
fires unordered updates using _cacheManager which can race with the initial
cache write in the Requests use-case; implement a per-event-ID queue (map of
eventId -> Future) inside StreamResponseCleaner to chain cache update operations
so each event's writes run sequentially, on each queued turn read the current
cache entry, merge its sources with the incoming event's sources to produce a
merged snapshot, then write that merged snapshot back via _cacheManager; ensure
the queue entry is replaced with the next Future only after the write completes
or errors so subsequent updates for the same eventId never run concurrently or
lose sources.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 658e055b-17bd-4060-b722-4c03beddfc9b

📥 Commits

Reviewing files that changed from the base of the PR and between 6172c98 and 9c7d8f0.

📒 Files selected for processing (8)
  • packages/ndk/lib/domain_layer/entities/broadcast_state.dart
  • packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart
  • packages/ndk/lib/domain_layer/usecases/relay_manager.dart
  • packages/ndk/lib/domain_layer/usecases/requests/requests.dart
  • packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart
  • packages/ndk/lib/presentation_layer/init.dart
  • packages/ndk/test/usecases/broadcast_sources_test.dart
  • packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart
💤 Files with no reviewable changes (1)
  • packages/ndk/lib/presentation_layer/init.dart
🚧 Files skipped from review as they are similar to previous changes (2)
  • packages/ndk/test/usecases/stream_response_cleaner/event_sources_merge_test.dart
  • packages/ndk/test/usecases/broadcast_sources_test.dart

Comment on lines +74 to +91
// Listen for broadcast responses to update cache sources
broadcastState.networkController.stream.listen((response) async {
if (response.broadcastSuccessful) {
final currentEvent = broadcastState.event;
if (currentEvent != null) {
final cachedEvent = await _cacheManager.loadEvent(currentEvent.id);
if (cachedEvent != null) {
final updatedSources = {
...cachedEvent.sources,
response.relayUrl
}.toList();
final updatedEvent =
cachedEvent.copyWith(sources: updatedSources);
await _cacheManager.saveEvent(updatedEvent);
}
}
}
});
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -type f -name "broadcast.dart" | grep -E "domain_layer|usecases"

Repository: relaystr/ndk

Length of output: 121


🏁 Script executed:

find . -type f -name "relay_manager.dart" | grep -E "domain_layer|usecases"

Repository: relaystr/ndk

Length of output: 115


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart | sed -n '50,110p'

Repository: relaystr/ndk

Length of output: 2890


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/usecases/relay_manager.dart | sed -n '490,530p'

Repository: relaystr/ndk

Length of output: 2053


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart | sed -n '1,20p'

Repository: relaystr/ndk

Length of output: 844


🏁 Script executed:

wc -l packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart

Repository: relaystr/ndk

Length of output: 123


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart | sed -n '93,100p'

Repository: relaystr/ndk

Length of output: 333


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/usecases/relay_manager.dart | sed -n '500,530p'

Repository: relaystr/ndk

Length of output: 1503


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/entities/broadcast_state.dart | head -30

Repository: relaystr/ndk

Length of output: 1066


🏁 Script executed:

rg -A 3 "class BroadcastState" packages/ndk/lib/domain_layer/entities/broadcast_state.dart

Repository: relaystr/ndk

Length of output: 219


Use the in-memory broadcast state to avoid losing relay sources to cache read-modify-write races.

Lines 75–91 listen to broadcast responses and load the cached event once per relay, merge only that relay's URL, and save. If two relays respond before the first cache write completes, both listeners read the same stale cached snapshot, and the second save loses the first relay's source. Since relay_manager.dart (lines 504–517) already maintains broadcastState.event with the monotonic union of all successful relays before emitting the response, merge the cached sources with broadcastState.event.sources instead of patching the cache one relay at a time.

Suggested fix
-    broadcastState.networkController.stream.listen((response) async {
-      if (response.broadcastSuccessful) {
-        final currentEvent = broadcastState.event;
-        if (currentEvent != null) {
-          final cachedEvent = await _cacheManager.loadEvent(currentEvent.id);
-          if (cachedEvent != null) {
-            final updatedSources = {
-              ...cachedEvent.sources,
-              response.relayUrl
-            }.toList();
-            final updatedEvent =
-                cachedEvent.copyWith(sources: updatedSources);
-            await _cacheManager.saveEvent(updatedEvent);
-          }
-        }
-      }
-    });
+    broadcastState.networkController.stream.listen((response) async {
+      if (!mySaveToCache || !response.broadcastSuccessful) {
+        return;
+      }
+
+      final eventId = broadcastState.event?.id;
+      if (eventId == null) {
+        return;
+      }
+
+      final cachedEvent = await _cacheManager.loadEvent(eventId);
+      final latestEvent = broadcastState.event;
+      if (latestEvent == null) {
+        return;
+      }
+
+      final updatedSources = {
+        ...?cachedEvent?.sources,
+        ...latestEvent.sources,
+      }.toList();
+
+      await _cacheManager.saveEvent(
+        latestEvent.copyWith(sources: updatedSources),
+      );
+    });
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ndk/lib/domain_layer/usecases/broadcast/broadcast.dart` around lines
74 - 91, The listener on broadcastState.networkController.stream currently does
a read-modify-write using the cached event per-relay, causing races; instead,
when response.broadcastSuccessful is true use the in-memory
broadcastState.event.sources (the monotonic union maintained by relay_manager)
as the source of truth: load the cached event via
_cacheManager.loadEvent(currentEvent.id), merge its sources with
broadcastState.event.sources (ensuring uniqueness), create an updatedEvent via
cachedEvent.copyWith(sources: mergedSources) and call
_cacheManager.saveEvent(updatedEvent); keep checks for currentEvent and
cachedEvent but stop merging only response.relayUrl so the cache reflects
broadcastState.event.sources.

trackingSet: state.returnedIds,
outController: state.controller,
eventOutFilters: _eventOutFilters,
cacheManager: _cacheWrite.cacheManager,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Honor cacheWrite at Line 278.

packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart Lines 86-95 now call saveEvent(). Passing _cacheWrite.cacheManager unconditionally here means a request with cacheWrite: false can still mutate cached sources when the same event arrives again.

💡 Suggested fix
-      cacheManager: _cacheWrite.cacheManager,
+      cacheManager: request.cacheWrite ? _cacheWrite.cacheManager : null,
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ndk/lib/domain_layer/usecases/requests/requests.dart` at line 278,
The code unconditionally passes _cacheWrite.cacheManager (via the cacheManager
parameter) which allows cache mutations even when cacheWrite is false; change
the call site that currently sets cacheManager: _cacheWrite.cacheManager to pass
the cache manager only when caching is enabled (e.g., cacheManager:
_cacheWrite.cacheWrite ? _cacheWrite.cacheManager : null), so that downstream
saveEvent() in stream_response_cleaner (which calls saveEvent() at lines 86-95)
will not mutate sources for requests with cacheWrite: false.

Comment on lines +377 to +390
final existingSources = seenEvents[event.id];
if (existingSources == null) {
// First time seeing this event
seenEvents[event.id] = event.sources.toSet();
aggregatedController.add(event);
} else {
// Merge sources if this event has new sources
if (event.sources.isNotEmpty) {
final newSources = existingSources..addAll(event.sources);
if (newSources.length > (seenEvents[event.id]?.length ?? 0)) {
seenEvents[event.id] = newSources;
aggregatedController.add(event.copyWith(sources: newSources.toList()));
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

fd -t f -i 'requests.dart' | head -5

Repository: relaystr/ndk

Length of output: 191


🏁 Script executed:

wc -l packages/ndk/lib/domain_layer/usecases/requests/requests.dart

Repository: relaystr/ndk

Length of output: 121


🏁 Script executed:

sed -n '370,400p' packages/ndk/lib/domain_layer/usecases/requests/requests.dart | cat -n

Repository: relaystr/ndk

Length of output: 1470


🏁 Script executed:

sed -n '460,485p' packages/ndk/lib/domain_layer/usecases/requests/requests.dart | cat -n

Repository: relaystr/ndk

Length of output: 1413


🏁 Script executed:

# Let me verify the exact context around these lines and understand the data flow
sed -n '375,405p' packages/ndk/lib/domain_layer/usecases/requests/requests.dart | cat -n

Repository: relaystr/ndk

Length of output: 1385


🏁 Script executed:

cat > /tmp/test_dart.txt << 'EOF'
In Dart:
- The cascade operator (..) mutates the object and returns self
- existingSources is a reference to the Set in seenEvents
- existingSources..addAll(event.sources) mutates that Set and returns it
- So seenEvents[event.id] points to the same mutated Set
- Comparison happens after mutation, so both sides have same length
- This means the condition is always false when sources are added
EOF
cat /tmp/test_dart.txt

Repository: relaystr/ndk

Length of output: 450


🏁 Script executed:

# Let me check the actual line numbers in the original review to map them correctly
sed -n '377,390p' packages/ndk/lib/domain_layer/usecases/requests/requests.dart | cat -n

Repository: relaystr/ndk

Length of output: 810


🏁 Script executed:

sed -n '468,480p' packages/ndk/lib/domain_layer/usecases/requests/requests.dart | cat -n

Repository: relaystr/ndk

Length of output: 843


🏁 Script executed:

cat << 'EOF'
Dart Set behavior with cascade operator:

Given:
  final existingSources = seenEvents[event.id];  // reference to Set in map

When executing:
  final newSources = existingSources..addAll(event.sources);

The cascade operator (..) on existingSources:
1. Calls addAll() which MUTATES existingSources in-place
2. Returns the mutated existingSources (same object reference)
3. Assigns it to newSources

Result: newSources and existingSources reference the SAME Set object

When checking:
  if (newSources.length > (seenEvents[event.id]?.length ?? 0))

Both sides reference the SAME mutated Set, so:
- newSources.length == seenEvents[event.id].length (always)
- Condition is always FALSE
- Event is never emitted with merged sources

The suggested fix {...existingSources, ...event.sources} creates a NEW Set
via spread syntax, avoiding mutation and allowing proper comparison.
EOF

Repository: relaystr/ndk

Length of output: 928


Clone the tracked set before comparing sizes.

existingSources..addAll(event.sources) mutates the same Set stored in seenEvents, causing the length comparison on line 386 to always be false since both sides reference the identical mutated object. This prevents merged events from being emitted and blocks relay discovery from expanding when a new relay appears through duplicates.

Replace with final newSources = {...existingSources, ...event.sources}; to create a new Set, and simplify the comparison to if (newSources.length > existingSources.length). This pattern appears identically at both locations (lines 377-390 and 468-480).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@packages/ndk/lib/domain_layer/usecases/requests/requests.dart` around lines
377 - 390, The code mutates the stored Set by doing
existingSources..addAll(event.sources), so the size comparison always fails;
instead create a new Set clone when merging sources (e.g., newSources =
{...existingSources, ...event.sources}) and compare newSources.length to
existingSources.length, and only update seenEvents[event.id] and call
aggregatedController.add(event.copyWith(...)) when newSources is larger; apply
this change for both occurrences handling seenEvents, existingSources,
aggregatedController, and event.copyWith in the file.

Comment on lines +14 to 15
final CacheManager? _cacheManager;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

fd -t f "stream_response_cleaner.dart"

Repository: relaystr/ndk

Length of output: 147


🏁 Script executed:

fd -t f "requests.dart" packages/ndk/lib/domain_layer/usecases/

Repository: relaystr/ndk

Length of output: 117


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart

Repository: relaystr/ndk

Length of output: 4389


🏁 Script executed:

sed -n '260,285p' packages/ndk/lib/domain_layer/usecases/requests/requests.dart

Repository: relaystr/ndk

Length of output: 756


🏁 Script executed:

# Also check the CacheManager interface to understand the contract
fd -t f "cache_manager.dart" packages/ndk/lib

Repository: relaystr/ndk

Length of output: 195


🏁 Script executed:

cat -n packages/ndk/lib/domain_layer/repositories/cache_manager.dart

Repository: relaystr/ndk

Length of output: 8290


Queue cache writes per event ID to ensure sources never shrink.

Line 61 fires an unordered async cache update that can silently drop on cache miss (line 91) or cause earlier writes to complete last and shrink stored sources. In packages/ndk/lib/domain_layer/usecases/requests/requests.dart lines 264-279, the initial cache write and this merge run independently off the same verified stream, enabling out-of-order completion. Add a per-event-ID queue to chain updates sequentially and save the merged event snapshot (sources from both cache and current event).

Suggested fix
   final List<EventFilter> _eventOutFilters;
   final CacheManager? _cacheManager;
+  final Map<String, Future<void>> _pendingCacheUpdates = {};
@@
-            final mergedEvent = event.copyWith(sources: newSources.toList());
-            _outController.add(mergedEvent);
-            // Update cache with merged sources
-            _updateCacheSources(event.id, newSources);
+            final mergedEvent = event.copyWith(sources: newSources.toList());
+            _outController.add(mergedEvent);
+            _updateCacheSources(mergedEvent);
@@
-  void _updateCacheSources(String eventId, Set<String> sources) {
-    if (_cacheManager == null) return;
-    
-    // Load existing event from cache and update sources
-    _cacheManager.loadEvent(eventId).then((cachedEvent) {
-      if (cachedEvent != null) {
-        final updatedEvent = cachedEvent.copyWith(sources: sources.toList());
-        _cacheManager.saveEvent(updatedEvent);
-      }
-    });
+  Future<void> _updateCacheSources(Nip01Event event) {
+    if (_cacheManager == null) return Future.value();
+
+    final previous = _pendingCacheUpdates[event.id] ?? Future.value();
+    final queued = previous.then((_) async {
+      final cachedEvent = await _cacheManager!.loadEvent(event.id);
+      final mergedSources = {
+        ...?cachedEvent?.sources,
+        ...event.sources,
+      }.toList();
+
+      await _cacheManager!.saveEvent(
+        (cachedEvent ?? event).copyWith(sources: mergedSources),
+      );
+    });
+
+    _pendingCacheUpdates[event.id] = queued;
+    return queued;
+  }

Also applies to: 58-61, 85-95

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In
`@packages/ndk/lib/domain_layer/usecases/stream_response_cleaner/stream_response_cleaner.dart`
around lines 14 - 15, The async cache merge in StreamResponseCleaner currently
fires unordered updates using _cacheManager which can race with the initial
cache write in the Requests use-case; implement a per-event-ID queue (map of
eventId -> Future) inside StreamResponseCleaner to chain cache update operations
so each event's writes run sequentially, on each queued turn read the current
cache entry, merge its sources with the incoming event's sources to produce a
merged snapshot, then write that merged snapshot back via _cacheManager; ensure
the queue entry is replaced with the next Future only after the write completes
or errors so subsequent updates for the same eventId never run concurrently or
lose sources.

@1-leo
Copy link
Copy Markdown
Contributor

1-leo commented Apr 1, 2026

Focus on query broadcast is not as important (for now)

  • write test to measure performance impact
  • => if impact significant store source tracking in separate db schema
  • race conditions concerns with multiple relays writing to cache at the same time
  • maybe feature flag to turn this feature off

Idea:
Cache buffer for source tracking storing every ~1m to persistence layer

@frnandu frnandu marked this pull request as draft May 6, 2026 08:13
@frnandu
Copy link
Copy Markdown
Collaborator

frnandu commented May 6, 2026

because of the race conditions concerns I'm changing this to a draft, low priority so we can re-discuss possible solutions later

Copy link
Copy Markdown
Collaborator

@frnandu frnandu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

race conditions concerns, we need to re-think solution

@1-leo
Copy link
Copy Markdown
Contributor

1-leo commented May 27, 2026

could use MutexSimple for race condition

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants