ZK Watch-Based Dirty Flag Optimization#128
Conversation
ngngwr
left a comment
There was a problem hiding this comment.
Code Review: ZK Watch-Based Dirty Flag Optimization
This is a meaningful performance optimization that eliminates redundant getPropertyStats ZK round-trips per pipeline cycle. The overall structure is sound. I found several issues ranging from a silent stale-read correctness bug to watch lifecycle gaps; details are in the inline comments below.
| T property = cachedPropertyMap.get(key); | ||
|
|
||
| if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) { | ||
| if (property != null && property.getBucketSize() == 0) { |
There was a problem hiding this comment.
[Critical] Stale-read: watched non-bucketed property is reused without verifying the cached stat is non-null
When a path is in _watchedPaths and no watch has fired, the code unconditionally reuses the cached property. But property.getStat() is not consulted — the cached stat could be null if the property was stored without a stat (e.g. created by a code path that does not populate it). The original code guarded against this via property.getStat().equals(stat); skipping that check entirely here means a property with a null stat will be silently returned as-if-valid.
Additionally, if cachedPropertyMap.get(key) returns null here (key in cache map but value absent), property is null and the else branch sends the key to statCheckKeys — the key then ends up being stat-checked but not reloaded (it falls into the stat != null but property == null branch, which calls reloadKeys.add(key)). That is actually correct, but the asymmetry is fragile. At minimum, the null-stat guard from the original code should be preserved:
| if (property != null && property.getBucketSize() == 0) { | |
| } else if (_watchedPaths.contains(path)) { | |
| T property = cachedPropertyMap.get(key); | |
| if (property != null && property.getBucketSize() == 0 && property.getStat() != null) { | |
| // Active watch, no change notification, non-bucketed → safe to reuse with no ZK call. | |
| refreshedPropertyMap.put(key, property); | |
| } else { | |
| // Bucketed property or null stat: fall through to stat check. | |
| statCheckKeys.add(key); | |
| } |
| List<PropertyKey> statCheckKeys = new ArrayList<>(); | ||
| for (PropertyKey key : cachedKeys) { | ||
| String path = key.getPath(); | ||
| if (_dirtyPaths.remove(path)) { |
There was a problem hiding this comment.
[Critical] Race: dirty flag can be consumed before the watch is re-registered, causing a missed update window
When _dirtyPaths.remove(path) is true, the path is added to reloadKeys. The reload loop calls subscribeWatch only after accessor.getProperty returns (line 169). Between the _dirtyPaths.remove at line 119 and the subscribeWatch at line 169, there is a window in which a ZK change fires the watch, ZkClient calls handleDataChange, and the event is dropped because _dirtyPaths was already cleared and the new notification is added after the remove but before the re-subscribe.
Because ZkClient re-registers a one-shot watch automatically during fireDataChangedEvents → installWatchOnlyPathExist, the watch itself is not lost at the ZkClient layer. However, _dirtyPaths is populated by the listener — if the ZK event thread fires between lines 119 and 169 (on the controller thread), the dirty flag is re-added correctly to _dirtyPaths. The actual danger is the opposite direction: the path is removed from _watchedPaths implicitly after reload fails (znode was deleted), and the unsubscription loop at line 181-188 may concurrently iterate _watchedPaths while the event thread adds to _dirtyPaths. This is safe with ConcurrentHashMap, but worth documenting explicitly.
More critically: if accessor.getProperty returns null for a reloaded key (line 171 warn path), subscribeWatch is not called, so _watchedPaths no longer contains this path. On the next cycle the key goes to statCheckKeys which is correct — this path is safe, but the comment at line 168 says "re-register a watch after a successful load" which implies this is intentional.
| // Single shared listener instance registered on every watched path. | ||
| // handleDataChange / handleDataDeleted are called on the ZkClient event thread; | ||
| // ConcurrentHashMap.newKeySet ensures the add is thread-safe. | ||
| private final IZkDataListener _watchListener = new IZkDataListener() { |
There was a problem hiding this comment.
[Critical] ZK session expiry silently invalidates all watches without invalidating _watchedPaths
ZK watches are server-side and are discarded on session expiry. ZkClient re-registers its listener map automatically after reconnect (via fireAllEvents at ZkClient.java:1581 which calls all registered IZkDataListener callbacks to simulate a change event), so listeners in ZkClient's internal _dataListener map will fire. However, ZkClient.unsubscribeAll() at line 473 of ZkClient.java clears _dataListener — if anything calls that during reconnection, every path in _watchedPaths would be silently orphaned: the cache would treat them as clean (no dirty flag, path is in _watchedPaths) and reuse stale data indefinitely.
The safer pattern is to implement IZkStateListener.handleNewSession in AbstractDataCache and call _watchedPaths.clear() (and _dirtyPaths.clear()) on session establishment so that the next refreshProperties cycle stat-checks and re-watches everything from scratch. Without this, a session expiry + reconnect + clean cache state can silently serve stale data.
| if (property != null) { | ||
| refreshedPropertyMap.put(key, property); | ||
| // Register (or re-register) a watch after a successful load so the next change is detected. | ||
| subscribeWatch(baseAccessor, key.getPath()); |
There was a problem hiding this comment.
[Major] Stale data for one full cycle after a node deletion
When a ZK node is deleted and handleDataDeleted fires, the path is marked dirty. On the next refresh it is added to reloadKeys (line 121) and accessor.getProperty is called. If the node is still absent, getProperty returns null — the warn path at line 171 is taken and subscribeWatch is not called (correctly). The path is also not added to refreshedPropertyMap.
The unsubscription loop at lines 181-188 then removes the watch (correct). However, on the same refresh cycle, the caller receives a refreshedPropertyMap that does not contain the deleted key — which is correct — but reloadedKeys does contain the deleted key (added at line 121 before reloadedKeys.addAll). Depending on how CurrentStateSnapshot interprets a key present in reloadedKeys but absent from the returned map, this could cause a NPE or incorrect delta in downstream pipeline stages. Validate that all callers handle this case.
| Iterator<String> watchIter = _watchedPaths.iterator(); | ||
| while (watchIter.hasNext()) { | ||
| String watchedPath = watchIter.next(); | ||
| if (!activePaths.contains(watchedPath)) { |
There was a problem hiding this comment.
[Major] Watch unsubscription loop iterates _watchedPaths while _dirtyPaths add can race on a path being removed
The ZkClient event thread can call handleDataChange(path, data) for a path that is simultaneously being unsubscribed by the controller thread in this loop. The sequence:
- Controller thread:
activePaths.contains(watchedPath)→ false → proceeds to unsubscribe. - ZK event thread:
handleDataChange(watchedPath)→_dirtyPaths.add(watchedPath). - Controller thread:
baseAccessor.unsubscribeDataChanges+watchIter.remove()+_dirtyPaths.remove(watchedPath).
Step 3 removes the dirty flag that was just added in step 2, so the change is permanently lost. The path is no longer in _watchedPaths and no longer dirty, so it will never be reloaded. Since the path is also absent from activePaths it won't appear in the next refreshedPropertyMap — this is likely correct if the node was deleted. But if it was modified and happened to not be in activePaths for a transient reason, this would silently discard a valid change event.
Consider removing from _dirtyPaths only before unsubscribeDataChanges, not after, to avoid this race.
| * Subscribe a ZK data watch on {@code path} if one is not already registered. | ||
| * Idempotent: a second call for the same path is a no-op. | ||
| */ | ||
| private void subscribeWatch(BaseDataAccessor<ZNRecord> baseAccessor, String path) { |
There was a problem hiding this comment.
[Major] subscribeWatch is not idempotent when subscribeDataChanges returns false
ZkClient.subscribeDataChanges(path, listener, false) can return false when the path does not exist on the ZK server at the time of subscription (line 333-339 in ZkClient.java). In that case ZkClient removes the listener it just added and returns false. But this code calls the void-returning overload at line 352 of ZkClient.java (subscribeDataChanges(path, listener) → subscribeDataChanges(path, listener, false)) whose return value is ignored by BaseDataAccessor.subscribeDataChanges signature.
If watch installation fails for a non-existent path, _watchedPaths.add(path) still returns true and the path is recorded as watched. On the next refresh cycle, the code at line 122 sees it in _watchedPaths, trusts it as clean, and returns the stale cached value — but there is actually no active watch on ZK.
Fix: check the return value of subscribeDataChanges and only call _watchedPaths.add when it returns true. This may require using the ZkClient API directly or checking via BaseDataAccessor if it exposes the return value.
| private void subscribeWatch(BaseDataAccessor<ZNRecord> baseAccessor, String path) { | |
| private void subscribeWatch(BaseDataAccessor<ZNRecord> baseAccessor, String path) { | |
| if (!_watchedPaths.contains(path)) { | |
| boolean installed = baseAccessor.subscribeDataChanges(path, _watchListener); | |
| if (installed) { | |
| _watchedPaths.add(path); | |
| } | |
| } | |
| } |
Code reviewFound 1 issue:
After The code comment at line 120 even acknowledges this intent: Minimal fix: remove the path from if (_dirtyPaths.remove(path)) {
_watchedPaths.remove(path); // ← allow subscribeWatch to re-register below
reloadKeys.add(key);
}This is confirmed by the
🤖 Generated with Claude Code - If this code review was useful, please react with 👍. Otherwise, react with 👎. |
| String path = key.getPath(); | ||
| if (_dirtyPaths.remove(path)) { | ||
| // Watch fired → unconditional reload; re-watch happens after successful getProperty. | ||
| reloadKeys.add(key); |
There was a problem hiding this comment.
[Critical] Dirty-path branch: _watchedPaths not cleared — watch permanently lost after delete+recreate
After handleDataDeleted fires, _dirtyPaths.remove(path) is called and the key is queued for reload — but _watchedPaths still contains the path. After reload succeeds, subscribeWatch calls _watchedPaths.add(path) which returns false (path already present), so subscribeDataChanges is never called. ZK watches are one-shot; the deletion watch was consumed. Result: the path appears watched but has no active ZK watch, and any future write to that path is permanently missed.
Fix: Add _watchedPaths.remove(path) in the dirty-path branch:
if (_dirtyPaths.remove(path)) {
_watchedPaths.remove(path); // add this
reloadKeys.add(key);
}(claude and codex review)
🤖 Code Review SummaryFiles reviewed: 1 ( Issue Breakdown
Key RiskIssues 1–3 can cause the cache to serve permanently stale data in production — particularly dangerous during rolling restarts, ZK session timeouts, or instance removals. Reviewed by Claude Code and Codex |
- Clear _watchedPaths in dirty-path branch so subscribeWatch actually re-registers the ZK data watch after reload; without this, watches are permanently lost after a ZNode delete+recreate. - Restore null-stat guard (property.getStat() != null) in the fast-path cache reuse condition to prevent returning a stale property whose stat was never populated. - Move _dirtyPaths.remove before unsubscribeDataChanges in the cleanup loop to close the race where a concurrent ZK event thread notification is silently discarded. - Add clearWatchesOnSessionReset() for callers to force a full re-watch on ZK session re-establishment. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Issues
(apache#200 - Link your issue number here: You can write "Fixes #XXX". Please use the proper keyword so that the issue gets closed automatically. See https://docs.github.com/en/github/managing-your-work-on-github/linking-a-pull-request-to-an-issue
Any of the following keywords can be used: close, closes, closed, fix, fixes, fixed, resolve, resolves, resolved)
Description
(Write a concise description including what, why, how)
Tests
(List the names of added unit/integration tests)
(If CI test fails due to known issue, please specify the issue and test PR locally. Then copy & paste the result of "mvn test" to here.)
Changes that Break Backward Compatibility (Optional)
(Consider including all behavior changes for public methods or API. Also include these changes in merge description so that other developers are aware of these changes. This allows them to make relevant code changes in feature branches accounting for the new method/API behavior.)
Documentation (Optional)
(Link the GitHub wiki you added)
Commits
Code Quality
(helix-style-intellij.xml if IntelliJ IDE is used)