Skip to content

Commit d1a9289

Browse files
badrishcCopilot
andauthored
Fix two rare CI failures: ListPushPopStressTest host crash and VectorManager cleanup vs Reset() AVE (#1765)
Two independent rare CI failures, both surfacing as `Test host process crashed` and aborting the whole test run. ## 1. `ClusterVectorSetTests.MigrateVectorSetWhileModifyingAsync` — fatal `AccessViolationException` in `VectorManager` cleanup task ### Symptom ``` Passed Garnet.test.cluster.ClusterVectorSetTests.MigrateVectorSetWhileModifyingAsync [12 s] Fatal error. System.AccessViolationException: Attempted to read or write protected memory. at Tsavorite.core.LogRecord.get_Info() at Tsavorite.core.LogRecord.get_AllocatedSize() at Tsavorite.core.ObjectScanIterator`2[...].GetPhysicalAddressAndAllocatedSize(...) at Tsavorite.core.ObjectScanIterator`2[...].GetNext() at Tsavorite.core.TsavoriteKVIterator`6[...].PushNext[...](...) at Tsavorite.core.TsavoriteKV`2[...].Iterate[...](MainSessionFunctions, ...) at Garnet.server.VectorManager+<RunCleanupTaskAsync>d__24.MoveNext() The active test run was aborted. Reason: Test host process crashed ``` The AVE is a Corrupted-State Exception — `catch (Exception)` in `RunCleanupTaskAsync` cannot suppress it; the runtime fails fast and the test host crashes. ### Root cause `Recovery.Reset()` → `hlogBase.Reset()` (in `AllocatorBase` and the per-allocator overrides `SpanByte` / `Object` / `TsavoriteLog`) frees pages by synchronously invoking `OnPagesClosed(...)` and a `for (i in BufferSize) FreePage(i)` loop. Both paths ultimately call `ReturnPage(index)`, which sets: ```csharp pageArrays[index] = default; pagePointers[index] = default; // ★ becomes 0 ``` `Reset()`'s docstring promised *"WARNING: assumes that threads have drained out at this point."* But Garnet's cluster re-attach paths invoke it on a running store: * `libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs:100` * `libs/cluster/Server/Replication/ReplicaOps/ReplicaDiskbasedSync.cs:136` In both files `storeWrapper.Reset()` is called **before** `SuspendPrimaryOnlyTasksAsync()`, and even that suspend only drains `TaskManager` tasks — `VectorManager.cleanupTask` is independent and never drained. Once `pagePointers[i] = 0`, the iterator's `GetPhysicalAddress` returns `0 + offset` — a tiny kernel-page address — and dereferencing it in `*(RecordInfo*)physicalAddress` raises a fatal AVE. ### The exact interleaving Production scenario in `MigrateVectorSetWhileModifyingAsync`: 1. Source primary migrates a slot containing a vector set → drops the index → `CleanupDroppedIndex` queues a cleanup-task scan on the source primary. 2. The drop AOF entry replicates to the source's replica, which replays it and **also** queues a cleanup-task scan on the replica. 3. Cluster topology change (post-migration, gossip, or any reason) triggers a replica re-attach → `ReplicaDisklessSync.ReplicateAttachAsync` / `ReplicaDiskbasedSync.ReplicateAttachAsync` calls `storeWrapper.Reset()`. 4. The replica's cleanup task is still mid-iterate over the main store → AVE. Thread-level interleaving: ``` Thread A: VectorManager cleanup task Thread B: storeWrapper.Reset() ───────────────────────────────────────── ───────────────────────────────── loop session.Iterate(callbacks) PushNext → ObjectScanIterator.GetNext() epoch.Resume() ◄── enter at epoch E headAddress = HeadAddress (still old value) LoadPageIfNeeded(...) (cur >= head → in-mem) physicalAddress = pagePointers[pageIdx] + offset Recovery.Reset() hlogBase.Reset() HeadAddress ← TailAddress OnPagesClosed(...) FreePage(p) ReturnPage(p) pagePointers[p] = 0 ◄── ★ // override loop: for i in BufferSize: FreePage(i) ReturnPage(i) pagePointers[i] = 0 *(RecordInfo*)physicalAddress ◄── ☠ AVE (LogRecord.GetInfo / LogRecord.AllocatedSize) ``` ### Why epoch protection didn't catch this Tsavorite's normal eviction path defers page-freeing through: ```csharp epoch.BumpCurrentEpoch(() => OnPagesClosed(newAddr)); ``` `BumpCurrentEpoch` queues the action and only fires it after `SafeToReclaimEpoch` has advanced past the prior epoch — i.e. after every thread that was holding the prior epoch has either suspended or moved on. That's why scan iterators are safe against normal eviction. `Reset()` skipped that mechanism in two places: 1. `AllocatorBase.Reset()` invoked `OnPagesClosed(newBeginAddress)` directly. 2. The per-allocator overrides had a `for (i in BufferSize) FreePage(i)` loop that ran **after** `base.Reset()` returned — also without epoch protection. **This second loop is the actual point of failure**: even if `OnPagesClosed` were deferred, the leftover (tail) page is freed by the override loop while a reader could still be reading it. ### The fix (Tsavorite layer) `AllocatorBase.Reset()` defers ALL page-close + page-free work through `BumpCurrentEpoch` and waits on a `ManualResetEventSlim` signalled by the deferred action — no polling: ```csharp using var resetComplete = new ManualResetEventSlim(initialState: false); // If caller was already epoch-protected, our prior epoch is what the action // will be waiting on — release it before waiting and re-acquire after. var wasProtected = epoch.ThisInstanceProtected(); if (!wasProtected) epoch.Resume(); // BumpCurrentEpoch requires a protected caller try { epoch.BumpCurrentEpoch(() => { try { if (headShifted) OnPagesClosed(newBeginAddress); FreeAllAllocatedPages(); } finally { resetComplete.Set(); } // never deadlock if action throws }); } finally { epoch.Suspend(); } // unconditionally so the action can fire resetComplete.Wait(); if (wasProtected) epoch.Resume(); ``` Each per-allocator override (`SpanByte` / `Object` / `TsavoriteLog`) moves its `FreePage(i)` loop into a new `FreeAllAllocatedPages()` virtual so the loop runs inside the deferred action: ```csharp public override void Reset() { base.Reset(); Initialize(); } protected override void FreeAllAllocatedPages() { for (int index = 0; index < BufferSize; index++) if (IsAllocated(index)) FreePage(index); } ``` ### Why this is safe * The deferred action runs only after `SafeToReclaimEpoch ≥ priorEpoch`, i.e. after every iterator that was inside `GetNext` at the moment `Reset()` was called has either suspended or advanced. By the time `pagePointers[i] = 0` executes, no thread is reading `pagePointers[i]`. * Iterators that re-enter `GetNext` after `HeadAddress` was shifted see `currentAddress < headAddress` and route through the buffered disk frame instead of `pagePointers` — so they don't touch the cleared array. * `Reset()` blocks until the deferred work has actually run, preserving its synchronous contract (the override's `Initialize()` after `Reset()` observes a fully freed page set). ### Test vs. product Strictly, `Reset()`'s docstring put the burden on callers. The cluster re-attach paths violate that — they call `Reset()` before draining the `VectorManager` cleanup task, and `SuspendPrimaryOnlyTasksAsync()` doesn't cover it. The alternative would be to drain every background reader at every `Reset()` callsite, but we chose to make `Reset()` itself epoch-safe because the contract was implicit, callsites are scattered, and Tsavorite already has the right primitive (`epoch.BumpCurrentEpoch`) — the normal eviction path uses it. This makes the safety property **enforced** rather than **assumed**, and protects any future caller / background reader. ### Repro `test/Garnet.test/VectorCleanupVsResetRaceTests.cs` — adds 4 000 vectors, drops the set (queues a full-keyspace cleanup scan), then spams `storeWrapper.Reset()` for 5 s. * **Without the fix:** crashes the host on every iteration with the exact production stack (`LogRecord.get_Info` → `ObjectScanIterator.GetNext` → `VectorManager.RunCleanupTaskAsync`). * **With the fix:** all 5 `[Repeat]` iterations pass (~2 700 resets per iteration concurrent with the cleanup iterator), no AVE. ## 2. `RespListTests.ListPushPopStressTest` — host crash on rare `RedisTimeoutException` ### Symptom ``` Unhandled exception. StackExchange.Redis.RedisTimeoutException: Timeout performing LPUSH (30000ms) at StackExchange.Redis.ConnectionMultiplexer.ExecuteSyncImpl[T](...) at StackExchange.Redis.RedisDatabase.ListLeftPush(...) at Garnet.test.RespListTests.<>c__DisplayClass39_1.<ListPushPopStressTest>b__0() The active test run was aborted. Reason: Test host process crashed ``` ### Root cause (two compounding issues) 1. **Worker threads created via `new Thread(() => ...)` had no try/catch.** In modern .NET an unhandled exception in a manually-created `Thread` terminates the process, so a single transient `RedisTimeoutException` aborted the entire test run. 2. **All 20 sync workers shared a single `ConnectionMultiplexer`.** Every command went through one socket and one background writer. Under CI load + lowMemory eviction overhead the writer falls behind and accumulates queued messages until SyncTimeout (30s) trips. The failure diagnostics confirmed this: `mc: 1/1, qs: 20, bw: SpinningDown`. ### Fix * Pre-create one `ConnectionMultiplexer` per worker on the main thread. Each thread now owns its own socket, eliminating the single-writer bottleneck. Pre-creating also avoids a 20-way connect storm racing `ConnectTimeout`. * Wrap each worker body in try/catch; capture exceptions into a `ConcurrentBag`, signal stop, exit cleanly. No more host crash. * Throw the aggregate **before** the post-checks so a real timeout isn't masked by secondary "list not empty" assertion noise. * Route the deadline-exceeded path through the failure bag too. ## Files ``` libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs | 76 +++++++++++++++++++++++-- libs/storage/Tsavorite/cs/src/core/Allocator/ObjectAllocatorImpl.cs | 7 ++- libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs | 7 ++- libs/storage/Tsavorite/cs/src/core/Allocator/TsavoriteLogAllocatorImpl.cs | 7 ++- test/Garnet.test/RespListTests.cs | 124 +++++++++++++++++++++++++-------------- test/Garnet.test/VectorCleanupVsResetRaceTests.cs | new ``` Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent eaa5734 commit d1a9289

7 files changed

Lines changed: 382 additions & 59 deletions

File tree

.github/copilot-instructions.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,73 @@ To add a new Garnet server setting:
205205
- `LightEpoch` instances track ownership — only dispose if owned
206206
- In parallel tests, share a `LightEpoch` instance across `GarnetClient` instances
207207

208+
### Epoch Protection and Log Address Invariants
209+
210+
Tsavorite uses **epoch-based memory reclamation** (`LightEpoch`) so writers can publish new values and reclaim old memory only after every reader has moved past it. Any change to the allocator, recovery, scan iterators, transient locking, or callbacks fired from the drain list must respect the rules below.
211+
212+
**Key files**: `libs/storage/Tsavorite/cs/src/core/Epochs/LightEpoch.cs`, `libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs`, `libs/storage/Tsavorite/cs/src/core/ClientSession/ClientSession.cs`.
213+
214+
#### Epoch protection model (`LightEpoch`)
215+
216+
- Each thread acquires a per-instance entry via `epoch.Resume()` (calls `Acquire`) and releases it via `epoch.Suspend()` (calls `Release`). Inside the protected region, the thread's `localCurrentEpoch` is advanced on every `ProtectAndDrain()` call and on entry.
217+
- `Resume()` is **non-reentrant**`Acquire` asserts if the thread is already protected on this instance. Use `ResumeIfNotProtected()` (returns `true` if it acquired) when code may be entered under an existing hold; pair with a matching `Suspend()` only on the path that took it.
218+
- `BasicContext.{RMW, Upsert, Read, Delete}` wrap the call in `UnsafeResumeThread()` / `UnsafeSuspendThread()` (in `ClientSession`) via try/finally. Custom code that calls `epoch.ProtectAndDrain()` (e.g., spin-waiters in `EpochOperations.SpinWaitUntilClosed`/`SpinWaitUntilRecordIsClosed`, `TransientLocking.LockForScan`) **must already hold the epoch** — the `Debug.Assert(entry > 0, "Trying to refresh unacquired epoch")` in `LightEpoch.ProtectAndDrain` fires otherwise.
219+
- `BumpCurrentEpoch(Action)` increments the global epoch and queues `Action` against the *prior* epoch. The action fires on whatever thread next observes that epoch as safe-to-reclaim — typically inside `ProtectAndDrain``Drain`. Therefore actions must be **thread-agnostic** (no thread-affine state) and **safe to fire synchronously** from the bumping thread itself: `BumpCurrentEpoch(Action)` calls `ProtectAndDrain` internally and may execute the action it just queued.
220+
221+
#### Log address layout and invariants
222+
223+
The seven log addresses on `AllocatorBase` advance monotonically and obey:
224+
225+
```
226+
BeginAddress <= ClosedUntilAddress <= SafeHeadAddress <= HeadAddress
227+
<= FlushedUntilAddress
228+
<= SafeReadOnlyAddress <= ReadOnlyAddress <= TailAddress
229+
```
230+
231+
| Address | Meaning |
232+
|---------|---------|
233+
| `BeginAddress` | Lowest valid address. Advancing it logically retires older addresses but **does not delete on-disk files** by itself; physical truncation only happens when a `ShiftBeginAddress` caller passes `truncateLog: true` (typically a checkpoint commit), and even then the device may defer file removal. |
234+
| `ClosedUntilAddress` | Highest address whose page buffer has been freed (`pagePointers[idx] = 0`). |
235+
| `SafeHeadAddress` | High-water set by `OnPagesClosed` *before* freeing — readers see it lead `ClosedUntilAddress`. |
236+
| `HeadAddress` | Lowest in-memory address. May advance while you hold the epoch, **but any address that was `>= HeadAddress` at any point during your protected region cannot be evicted until you `Suspend`**. Capped at `FlushedUntilAddress` — eviction never gets ahead of disk durability. |
237+
| `FlushedUntilAddress` | All bytes below have been written to disk. Updated by flush completion callbacks invoked from `AsyncFlushPagesForReadOnly`. Lags `SafeReadOnlyAddress` (a page is only flushed once it has become safely read-only). |
238+
| `SafeReadOnlyAddress` | Below this, no writer can be in-place mutating. Set by `OnPagesMarkedReadOnly` after writers have drained; same call also kicks off the flush that will later advance `FlushedUntilAddress`. |
239+
| `ReadOnlyAddress` | Maximum address of the immutable region. Records below are flushed/in-flush. |
240+
| `TailAddress` | Next address to allocate; published via the `PageOffset` CAS in `HandlePageOverflow`. |
241+
242+
#### Cascade pattern: publish → epoch barrier → post-drain action
243+
244+
Address advancement uses a **publish → bump → action** cascade so that the post-barrier work runs only after every prior holder has observed the new value:
245+
246+
1. **Publish** the new address into the visible field via `MonotonicUpdate`.
247+
2. **`BumpCurrentEpoch(Action)`** queues the post-barrier work against the prior epoch; it fires once every thread that observed the old value has either `Suspend`ed or `ProtectAndDrain`ed.
248+
3. The **action** does the work that requires "all prior holders have moved past": flush pages, advance the `Safe*` companion, close pages, free buffers, truncate disk segments.
249+
250+
The two cascades you encounter on the runtime hot path:
251+
252+
- **Read-only / flush cycle**`ShiftReadOnlyAddress(newRO)` publishes `ReadOnlyAddress`, then `BumpCurrentEpoch(OnPagesMarkedReadOnly)`. The action advances `SafeReadOnlyAddress` and issues `AsyncFlushPagesForReadOnly`; flush completion later advances `FlushedUntilAddress` via `FlushCallback`. Triggered by `PageAlignedShiftReadOnlyAddress` whenever the tail moves far enough past the read-only region.
253+
- **Eviction / close cycle**`ShiftHeadAddress(desiredHA)` publishes `HeadAddress`, then `BumpCurrentEpoch(OnPagesClosed)`. The action advances `SafeHeadAddress` and `ClosedUntilAddress`, and frees page buffers via the per-allocator `FreePage` (defined in `SpanByteAllocatorImpl` / `ObjectAllocatorImpl`). Triggered when `FlushedUntilAddress` moves past `HeadAddress + (some delta)`, or explicitly via `ShiftHeadAddressToBlocking`.
254+
255+
Other cascades:
256+
257+
- **`ShiftBeginAddress(newBA, truncateLog)`** — publishes `BeginAddress` (and cascades through `ShiftReadOnlyAddress` + `ShiftHeadAddress` if needed). When `truncateLog: true`, also bumps with `TruncateUntilAddress` to drop on-disk segments below the new begin; when `false` (the common case) on-disk segments are left in place to be reclaimed at the next checkpoint commit. Disk file removal itself is asynchronous — even after `TruncateUntilAddress` returns, the device may defer the actual unlink.
258+
- **`ShiftReadOnlyAddressWithWait(newRO, wait)`** — convenience wrapper that uses `ResumeIfNotProtected`/`Suspend` to launch the shift and (optionally) blocks the caller on `FlushedUntilAddress < newRO`.
259+
260+
#### Rules when changing allocator/iterator/callback code
261+
262+
1. **Holding the epoch implies stability**: an address observed `>= HeadAddress` during the protected region cannot be evicted before `Suspend()`. Re-acquire after suspend and re-validate.
263+
2. **`Suspend` and `Resume` must be balanced** on every code path. The only suspend inside the basic op path is the `ALLOCATE_FAILED` retry in `HandleRetryStatus`, balanced via try/finally.
264+
3. **Drain-list actions run on arbitrary threads** that hold the epoch. Do not capture thread-static state; do not call code that asserts on a specific thread.
265+
4. **Multi-phase mutations** that need to advance several addresses with barriers between them should use one `BumpCurrentEpoch(Action)` per phase with a `ManualResetEventSlim` to wait. **Drop the prior epoch before waiting** on the MRE — otherwise the drain list cannot make progress (the action you queued cannot fire while you hold the epoch it is gating on). Re-acquire to issue the next bump. `AllocatorBase.Reset` is an example: phase 1 publishes `ReadOnlyAddress` and waits for writers to drain before advancing `SafeReadOnlyAddress`/`FlushedUntilAddress`; phase 2 publishes `HeadAddress` and waits for readers to drain before closing/freeing pages.
266+
5. **Address publication ordering**: when one operation advances multiple addresses, advance the more permissive ones (`HeadAddress`, `ReadOnlyAddress`) before the more restrictive ones (`BeginAddress`). The full invariant `BeginAddress <= ClosedUntilAddress <= SafeHeadAddress <= HeadAddress <= FlushedUntilAddress <= SafeReadOnlyAddress <= ReadOnlyAddress <= TailAddress` must hold throughout, and stale readers caching the older value will route through safer paths (e.g., disk-frame branch in `LoadPageIfNeeded` rather than dereferencing freed `pagePointers`). `AllocatorBase.Reset` publishes `BeginAddress` last for this reason — an iterator with a stale `nextAddress` then routes through the disk-frame path instead of the in-memory page that has just been freed.
267+
6. **Page pointers**: after `OnPagesClosed``FreePage`, `pagePointers[idx] = 0`. Iterators must not dereference a page pointer outside the epoch protection that observed `addr >= HeadAddress`.
268+
7. **Scan iterators and `BufferAndLoad`**: `ScanIteratorBase.BufferAndLoad` may internally call `BumpCurrentEpoch`, `ProtectAndDrain`, or `Suspend`+`Resume` on IO, any of which advances the iterator thread's `localCurrentEpoch` and may synchronously fire deferred drain-list actions. Reads stay safe because the IO frame is iterator-owned (allocated in the iterator's constructor) and `headAddress` advances monotonically — `LoadPageIfNeeded` only routes a record to the in-log path when it was `>= HeadAddress` at the time of sampling, so the snapshot's routing decision is always conservative.
269+
270+
#### Tests that exercise these paths
271+
272+
- `BasicLockTests.FunctionsLockTest` (in `libs/storage/Tsavorite/cs/test/BasicLockTests.cs`) — multi-threaded RMW/Upsert under contention; exercises Resume/Suspend balance and `ProtectAndDrain`.
273+
- Cluster checkpoint/flush tests under `test/Garnet.test.cluster/` — exercise the full address cascade with live clients.
274+
208275
### Scratch Buffer Conventions
209276

210277
`StorageSession` has two scratch buffer types — use the right one:

libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs

Lines changed: 95 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -243,32 +243,107 @@ protected abstract void ReadAsync<TContext>(ulong alignedSourceAddress, IntPtr d
243243
/// <summary>Write page to device (async)</summary>
244244
protected abstract void WriteAsync<TContext>(long flushPage, DeviceIOCompletionCallback callback, PageAsyncFlushResult<TContext> asyncResult);
245245

246-
/// <summary>Reset the hybrid log. WARNING: assumes that threads have drained out at this point.</summary>
246+
/// <summary>
247+
/// Reset the hybrid log. Safe against concurrent iterators / readers / writers via a
248+
/// two-phase epoch cascade that mirrors the normal flush + close paths:
249+
///
250+
/// Phase 1: publish new ReadOnlyAddress synchronously, then under
251+
/// BumpCurrentEpoch — i.e. after writers caching the OLD ReadOnlyAddress
252+
/// have drained — publish SafeReadOnlyAddress and FlushedUntilAddress.
253+
/// Mirrors OnPagesMarkedReadOnly's invariant that "by the time
254+
/// SafeReadOnlyAddress advances, no thread is mutating below it".
255+
///
256+
/// Phase 2: publish new HeadAddress synchronously (now safe — writers have observed
257+
/// the new ReadOnlyAddress, so no writer holds a cached old ReadOnlyAddress
258+
/// that would leave HeadAddress > cached ReadOnlyAddress). Then under
259+
/// BumpCurrentEpoch — i.e. after readers caching the OLD HeadAddress have
260+
/// drained — close pages (advancing SafeHeadAddress and ClosedUntilAddress)
261+
/// and free pages. Mirrors OnPagesClosed's invariant.
262+
///
263+
/// Final: publish new BeginAddress synchronously. Publishing it last (rather than
264+
/// up front) means an iterator with a stale nextAddress sees
265+
/// currentAddress &gt; OLD BeginAddress and does not snap forward into the
266+
/// just-freed in-memory range — instead the currentAddress &lt; NEW HeadAddress
267+
/// check routes it through LoadPageIfNeeded's disk-frame branch (frame is
268+
/// iterator-owned, disk segment is intact). The invariant
269+
/// BeginAddress &lt;= HeadAddress holds throughout.
270+
/// </summary>
247271
[MethodImpl(MethodImplOptions.NoInlining)]
248272
public virtual void Reset()
249273
{
250274
var newBeginAddress = GetTailAddress();
251275

252-
// Shift read-only addresses to tail without flushing
276+
// To use BumpCurrentEpoch we must be epoch-protected; conversely to wait for the
277+
// queued action to drain we must NOT be holding the prior epoch. We toggle the
278+
// protection per phase. If the caller arrived already protected, restore at the end.
279+
var wasProtected = epoch.ThisInstanceProtected();
280+
if (wasProtected)
281+
epoch.Suspend();
282+
283+
// -------- Phase 1: ReadOnly -> wait for writer drain -> SafeReadOnly + FlushedUntil --------
253284
_ = MonotonicUpdate(ref ReadOnlyAddress, newBeginAddress, out _);
254-
_ = MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out _);
255285

256-
// Shift head address to tail
257-
if (MonotonicUpdate(ref HeadAddress, newBeginAddress, out _))
286+
using (var phase1Done = new ManualResetEventSlim(initialState: false))
258287
{
259-
// Close addresses
260-
OnPagesClosed(newBeginAddress);
288+
epoch.Resume();
289+
try
290+
{
291+
epoch.BumpCurrentEpoch(() =>
292+
{
293+
try
294+
{
295+
_ = MonotonicUpdate(ref SafeReadOnlyAddress, newBeginAddress, out _);
296+
_ = MonotonicUpdate(ref FlushedUntilAddress, newBeginAddress, out _);
297+
}
298+
finally { phase1Done.Set(); }
299+
});
300+
}
301+
finally { epoch.Suspend(); }
302+
phase1Done.Wait();
303+
}
304+
305+
// -------- Phase 2: HeadAddress -> wait for reader drain -> OnPagesClosed + FreeAllPages --------
306+
var headShifted = MonotonicUpdate(ref HeadAddress, newBeginAddress, out _);
261307

262-
// Wait for pages to get closed
263-
while (ClosedUntilAddress < newBeginAddress)
308+
using (var phase2Done = new ManualResetEventSlim(initialState: false))
309+
{
310+
epoch.Resume();
311+
try
264312
{
265-
_ = Thread.Yield();
266-
if (epoch.ThisInstanceProtected())
267-
epoch.ProtectAndDrain();
313+
epoch.BumpCurrentEpoch(() =>
314+
{
315+
try
316+
{
317+
if (headShifted)
318+
OnPagesClosed(newBeginAddress);
319+
320+
// Wait for ClosedUntilAddress to catch up to newBeginAddress before
321+
// freeing remaining pages. Two scenarios make this necessary:
322+
// (a) headShifted==true: OnPagesClosed may have returned immediately
323+
// because another thread already owned OnPagesClosedWorker for our
324+
// range — that worker is still freeing pages on the other thread.
325+
// (b) headShifted==false: a concurrent Reset (or other ShiftHeadAddress
326+
// caller) already advanced HeadAddress past newBeginAddress and its
327+
// OnPagesClosedWorker may still be running.
328+
// In both cases, calling FreeAllAllocatedPages while the worker is mid-flight
329+
// would race with its FreePage calls and corrupt page state.
330+
while (ClosedUntilAddress < newBeginAddress)
331+
_ = Thread.Yield();
332+
333+
FreeAllAllocatedPages();
334+
}
335+
finally { phase2Done.Set(); }
336+
});
268337
}
338+
finally { epoch.Suspend(); }
339+
phase2Done.Wait();
269340
}
270341

271-
// Update begin address to tail
342+
// Restore caller's epoch state if they were protected on entry.
343+
if (wasProtected)
344+
epoch.Resume();
345+
346+
// -------- Final: publish BeginAddress (see XML doc on Reset for why this happens last) --------
272347
_ = MonotonicUpdate(ref BeginAddress, newBeginAddress, out _);
273348

274349
flushEvent.Initialize();
@@ -281,6 +356,13 @@ public virtual void Reset()
281356
device.Reset();
282357
}
283358

359+
/// <summary>
360+
/// Free any pages still allocated after <see cref="OnPagesClosed"/> has run. Subclasses
361+
/// override to call their per-allocator FreePage. Invoked from inside Reset's
362+
/// epoch.BumpCurrentEpoch action so it is safe against concurrent iterators.
363+
/// </summary>
364+
protected virtual void FreeAllAllocatedPages() { }
365+
284366
/// <summary>Asynchronously wraps <see cref="TruncateUntilAddressBlocking(long)"/>.</summary>
285367
internal void TruncateUntilAddress(long toAddress) => _ = Task.Run(() => TruncateUntilAddressBlocking(toAddress));
286368

libs/storage/Tsavorite/cs/src/core/Allocator/ObjectAllocatorImpl.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,17 @@ protected internal override void Initialize()
117117
public override void Reset()
118118
{
119119
base.Reset();
120+
Initialize();
121+
}
122+
123+
/// <inheritdoc />
124+
protected override void FreeAllAllocatedPages()
125+
{
120126
for (var index = 0; index < BufferSize; index++)
121127
{
122128
if (IsAllocated(index))
123129
FreePage(index);
124130
}
125-
Initialize();
126131
}
127132

128133
/// <summary>Allocate memory page, pinned in memory, and in sector aligned form, if possible</summary>

libs/storage/Tsavorite/cs/src/core/Allocator/SpanByteAllocatorImpl.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,17 @@ public SpanByteAllocatorImpl(AllocatorSettings settings, TStoreFunctions storeFu
2626
public override void Reset()
2727
{
2828
base.Reset();
29+
Initialize();
30+
}
31+
32+
/// <inheritdoc />
33+
protected override void FreeAllAllocatedPages()
34+
{
2935
for (int index = 0; index < BufferSize; index++)
3036
{
3137
if (IsAllocated(index))
3238
FreePage(index);
3339
}
34-
Initialize();
3540
}
3641

3742
/// <summary>Allocate memory page, pinned in memory, and in sector aligned form, if possible</summary>

libs/storage/Tsavorite/cs/src/core/Allocator/TsavoriteLogAllocatorImpl.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,17 @@ public TsavoriteLogAllocatorImpl(AllocatorSettings settings)
2727
public override void Reset()
2828
{
2929
base.Reset();
30+
Initialize();
31+
}
32+
33+
/// <inheritdoc />
34+
protected override void FreeAllAllocatedPages()
35+
{
3036
for (var index = 0; index < BufferSize; index++)
3137
{
3238
if (IsAllocated(index))
3339
FreePage(index);
3440
}
35-
Initialize();
3641
}
3742

3843
/// <summary>

0 commit comments

Comments
 (0)