From d1fc645ba3a9e45290c93e84432932f5581645d4 Mon Sep 17 00:00:00 2001 From: Palak Chaturvedi Date: Wed, 6 May 2026 13:02:16 +0000 Subject: [PATCH 1/2] WIP: Add per-process shadow variables for buffer pool synchronization Introduce LocalCurrentNBuffers and LocalActiveNBuffers as per-process shadow copies of ShmemCtrl->currentNBuffers and StrategyControl->activeNBuffers respectively. This allows backends to use local copies during normal operation, reducing contention on shared memory, and enables safe mid-operation updates when resize barriers are processed. Key changes: - Add LocalCurrentNBuffers and LocalActiveNBuffers globals, initialized from shared state at backend startup. - Replace direct NBuffers usage with LocalCurrentNBuffers/LocalActiveNBuffers in buffer manager, freelist, pg_buffercache, autoprewarm, and access methods (hash, heap, tableam, xlog). - Coordinator in pg_resize_shared_buffers() updates its own shadows at each phase and asserts consistency at the end. - Each ProcessBarrier*() handler updates the local shadows and logs the transition (old -> new) at DEBUG1 level. - Add PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK barrier so backends update LocalCurrentNBuffers after shared structures are resized during shrink. - Add StrategyGetActiveNBuffers() accessor for freelist.c-private state. --- contrib/pg_buffercache/pg_buffercache_pages.c | 285 ++++++------------ contrib/pg_prewarm/autoprewarm.c | 10 +- src/backend/access/hash/hash.c | 2 +- src/backend/access/heap/heapam.c | 2 +- src/backend/access/table/tableam.c | 2 +- src/backend/access/transam/xlog.c | 6 +- src/backend/storage/buffer/buf_resize.c | 91 +++++- src/backend/storage/buffer/bufmgr.c | 45 ++- src/backend/storage/buffer/freelist.c | 31 +- src/backend/storage/ipc/procsignal.c | 3 + src/backend/utils/init/globals.c | 10 + src/include/storage/buf_internals.h | 1 + src/include/storage/bufmgr.h | 10 +- src/include/storage/procsignal.h | 3 + 14 files changed, 273 insertions(+), 228 deletions(-) diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c index 8a17319ff2a0a..ebc52fc0c6723 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.c +++ b/contrib/pg_buffercache/pg_buffercache_pages.c @@ -15,6 +15,7 @@ #include "port/pg_numa.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" +#include "utils/injection_point.h" #include "utils/rel.h" #include "utils/tuplestore.h" @@ -37,39 +38,6 @@ PG_MODULE_MAGIC_EXT( .version = PG_VERSION ); -/* - * Record structure holding the to be exposed cache data. - */ -typedef struct -{ - uint32 bufferid; - RelFileNumber relfilenumber; - Oid reltablespace; - Oid reldatabase; - ForkNumber forknum; - BlockNumber blocknum; - bool isvalid; - bool isdirty; - uint16 usagecount; - - /* - * An int32 is sufficiently large, as MAX_BACKENDS prevents a buffer from - * being pinned by too many backends and each backend will only pin once - * because of bufmgr.c's PrivateRefCount infrastructure. - */ - int32 pinning_backends; -} BufferCachePagesRec; - - -/* - * Function context for data persisting over repeated calls. - */ -typedef struct -{ - TupleDesc tupdesc; - BufferCachePagesRec *record; -} BufferCachePagesContext; - /* * Record structure holding the to be exposed cache data for OS pages. This * structure is used by pg_buffercache_os_pages(), where NUMA information may @@ -118,153 +86,93 @@ static bool firstNumaTouch = true; Datum pg_buffercache_pages(PG_FUNCTION_ARGS) { - FuncCallContext *funcctx; - Datum result; - MemoryContext oldcontext; - BufferCachePagesContext *fctx; /* User function context. */ - TupleDesc tupledesc; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc expected_tupledesc; - HeapTuple tuple; - int currentNBuffers = pg_atomic_read_u32(&ShmemCtrl->currentNBuffers); + int i; - if (SRF_IS_FIRSTCALL()) - { - int i; - - funcctx = SRF_FIRSTCALL_INIT(); - - /* Switch context when allocating stuff to be used in later calls */ - oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - - /* Create a user function context for cross-call persistence */ - fctx = palloc_object(BufferCachePagesContext); - - /* - * To smoothly support upgrades from version 1.0 of this extension - * transparently handle the (non-)existence of the pinning_backends - * column. We unfortunately have to get the result type for that... - - * we can't use the result type determined by the function definition - * without potentially crashing when somebody uses the old (or even - * wrong) function definition though. - */ - if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM || - expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM) - elog(ERROR, "incorrect number of output arguments"); - - /* Construct a tuple descriptor for the result rows. */ - tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts); - TupleDescInitEntry(tupledesc, (AttrNumber) 1, "bufferid", - INT4OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenode", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reltablespace", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 4, "reldatabase", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 5, "relforknumber", - INT2OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 6, "relblocknumber", - INT8OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 7, "isdirty", - BOOLOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 8, "usage_count", - INT2OID, -1, 0); - - if (expected_tupledesc->natts == NUM_BUFFERCACHE_PAGES_ELEM) - TupleDescInitEntry(tupledesc, (AttrNumber) 9, "pinning_backends", - INT4OID, -1, 0); - - fctx->tupdesc = BlessTupleDesc(tupledesc); - - /* Allocate NBuffers worth of BufferCachePagesRec records. */ - fctx->record = (BufferCachePagesRec *) - MemoryContextAllocHuge(CurrentMemoryContext, - sizeof(BufferCachePagesRec) * currentNBuffers); - - /* Set max calls and remember the user function context. */ - funcctx->max_calls = currentNBuffers; - funcctx->user_fctx = fctx; - - /* Return to original context when allocating transient memory */ - MemoryContextSwitchTo(oldcontext); - - /* - * Scan through all the buffers, saving the relevant fields in the - * fctx->record structure. - * - * We don't hold the partition locks, so we don't get a consistent - * snapshot across all buffers, but we do grab the buffer header - * locks, so the information of each buffer is self-consistent. - */ - for (i = 0; i < currentNBuffers; i++) - { - BufferDesc *bufHdr; - uint64 buf_state; - - CHECK_FOR_INTERRUPTS(); - - /* - * TODO: We should just scan the entire buffer descriptor array - * instead of relying on curent buffer pool size. But that can - * happen if only we setup the descriptor array large enough at - * the server startup time. - */ - if (currentNBuffers != pg_atomic_read_u32(&ShmemCtrl->currentNBuffers)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("number of shared buffers changed during scan of buffer cache"))); - - bufHdr = GetBufferDescriptor(i); - /* Lock each buffer header before inspecting. */ - buf_state = LockBufHdr(bufHdr); + /* + * To smoothly support upgrades from version 1.0 of this extension + * transparently handle the (non-)existence of the pinning_backends + * column. We unfortunately have to get the result type for that... - we + * can't use the result type determined by the function definition without + * potentially crashing when somebody uses the old (or even wrong) + * function definition though. + */ + if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); - fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr); - fctx->record[i].relfilenumber = BufTagGetRelNumber(&bufHdr->tag); - fctx->record[i].reltablespace = bufHdr->tag.spcOid; - fctx->record[i].reldatabase = bufHdr->tag.dbOid; - fctx->record[i].forknum = BufTagGetForkNum(&bufHdr->tag); - fctx->record[i].blocknum = bufHdr->tag.blockNum; - fctx->record[i].usagecount = BUF_STATE_GET_USAGECOUNT(buf_state); - fctx->record[i].pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state); + if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM || + expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM) + elog(ERROR, "incorrect number of output arguments"); - if (buf_state & BM_DIRTY) - fctx->record[i].isdirty = true; - else - fctx->record[i].isdirty = false; + InitMaterializedSRF(fcinfo, 0); - /* Note if the buffer is valid, and has storage created */ - if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID)) - fctx->record[i].isvalid = true; - else - fctx->record[i].isvalid = false; + /* + * Scan through all the buffers, adding one row for each of the buffers to + * the tuplestore. + * + * We don't hold the partition locks, so we don't get a consistent + * snapshot across all buffers, but we do grab the buffer header locks, so + * the information of each buffer is self-consistent. + * + * Use LocalCurrentNBuffers so the scan range tracks the current buffer + * pool size. If a resize barrier is processed mid-scan (via + * CHECK_FOR_INTERRUPTS or an injection point wait), LocalCurrentNBuffers + * changes and we detect it. + */ + INJECTION_POINT("pg-buffercache-scan-start", NULL); + for (i = 0; i < LocalCurrentNBuffers; i++) + { + BufferDesc *bufHdr; + uint64 buf_state; + uint32 bufferid; + RelFileNumber relfilenumber; + Oid reltablespace; + Oid reldatabase; + ForkNumber forknum; + BlockNumber blocknum; + bool isvalid; + bool isdirty; + uint16 usagecount; + int32 pinning_backends; + Datum values[NUM_BUFFERCACHE_PAGES_ELEM]; + bool nulls[NUM_BUFFERCACHE_PAGES_ELEM]; - UnlockBufHdr(bufHdr); - } - } + bufHdr = GetBufferDescriptor(i); + /* Lock each buffer header before inspecting. */ + buf_state = LockBufHdr(bufHdr); + + bufferid = BufferDescriptorGetBuffer(bufHdr); + relfilenumber = BufTagGetRelNumber(&bufHdr->tag); + reltablespace = bufHdr->tag.spcOid; + reldatabase = bufHdr->tag.dbOid; + forknum = BufTagGetForkNum(&bufHdr->tag); + blocknum = bufHdr->tag.blockNum; + usagecount = BUF_STATE_GET_USAGECOUNT(buf_state); + pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state); - funcctx = SRF_PERCALL_SETUP(); + if (buf_state & BM_DIRTY) + isdirty = true; + else + isdirty = false; - /* Get the saved state */ - fctx = funcctx->user_fctx; + /* Note if the buffer is valid, and has storage created */ + if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID)) + isvalid = true; + else + isvalid = false; - if (funcctx->call_cntr < funcctx->max_calls) - { - uint32 i = funcctx->call_cntr; - Datum values[NUM_BUFFERCACHE_PAGES_ELEM]; - bool nulls[NUM_BUFFERCACHE_PAGES_ELEM]; + UnlockBufHdr(bufHdr); - values[0] = Int32GetDatum(fctx->record[i].bufferid); + /* Build the tuple and add it to tuplestore */ + values[0] = Int32GetDatum(bufferid); nulls[0] = false; /* * Set all fields except the bufferid to null if the buffer is unused * or not valid. */ - if (fctx->record[i].blocknum == InvalidBlockNumber || - fctx->record[i].isvalid == false) + if (blocknum == InvalidBlockNumber || isvalid == false) { nulls[1] = true; nulls[2] = true; @@ -278,33 +186,36 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) } else { - values[1] = ObjectIdGetDatum(fctx->record[i].relfilenumber); + values[1] = ObjectIdGetDatum(relfilenumber); nulls[1] = false; - values[2] = ObjectIdGetDatum(fctx->record[i].reltablespace); + values[2] = ObjectIdGetDatum(reltablespace); nulls[2] = false; - values[3] = ObjectIdGetDatum(fctx->record[i].reldatabase); + values[3] = ObjectIdGetDatum(reldatabase); nulls[3] = false; - values[4] = Int16GetDatum(fctx->record[i].forknum); + values[4] = Int16GetDatum(forknum); nulls[4] = false; - values[5] = Int64GetDatum((int64) fctx->record[i].blocknum); + values[5] = Int64GetDatum((int64) blocknum); nulls[5] = false; - values[6] = BoolGetDatum(fctx->record[i].isdirty); + values[6] = BoolGetDatum(isdirty); nulls[6] = false; - values[7] = UInt16GetDatum(fctx->record[i].usagecount); + values[7] = UInt16GetDatum(usagecount); nulls[7] = false; /* unused for v1.0 callers, but the array is always long enough */ - values[8] = Int32GetDatum(fctx->record[i].pinning_backends); + values[8] = Int32GetDatum(pinning_backends); nulls[8] = false; } - /* Build and return the tuple. */ - tuple = heap_form_tuple(fctx->tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); - SRF_RETURN_NEXT(funcctx, result); + /* + * Allow interrupts so that resize barriers can be processed + * mid-scan, updating LocalCurrentNBuffers. + */ + CHECK_FOR_INTERRUPTS(); + INJECTION_POINT("pg-buffercache-scan-loop", NULL); } - else - SRF_RETURN_DONE(funcctx); + + return (Datum) 0; } /* @@ -392,7 +303,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) startptr = (char *) TYPEALIGN_DOWN(os_page_size, BufferGetBlock(1)); endptr = (char *) TYPEALIGN(os_page_size, - (char *) BufferGetBlock(NBuffers) + BLCKSZ); + (char *) BufferGetBlock(LocalCurrentNBuffers) + BLCKSZ); os_page_count = (endptr - startptr) / os_page_size; /* Used to determine the NUMA node for all OS pages at once */ @@ -417,8 +328,8 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) Assert(idx == os_page_count); - elog(DEBUG1, "NUMA: NBuffers=%d os_page_count=" UINT64_FORMAT " " - "os_page_size=%zu", NBuffers, os_page_count, os_page_size); + elog(DEBUG1, "NUMA: LocalCurrentNBuffers=%d os_page_count=" UINT64_FORMAT " " + "os_page_size=%zu", LocalCurrentNBuffers, os_page_count, os_page_size); /* * If we ever get 0xff back from kernel inquiry, then we probably @@ -467,7 +378,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) * without reallocating memory. */ pages_per_buffer = Max(1, BLCKSZ / os_page_size) + 1; - max_entries = NBuffers * pages_per_buffer; + max_entries = LocalCurrentNBuffers * pages_per_buffer; /* Allocate entries for BufferCacheOsPagesRec records. */ fctx->record = (BufferCacheOsPagesRec *) @@ -490,7 +401,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) */ startptr = (char *) TYPEALIGN_DOWN(os_page_size, (char *) BufferGetBlock(1)); idx = 0; - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { char *buffptr = (char *) BufferGetBlock(i + 1); BufferDesc *bufHdr; @@ -636,7 +547,7 @@ pg_buffercache_summary(PG_FUNCTION_ARGS) if (get_call_result_type(fcinfo, NULL, &tupledesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - for (int i = 0; i < NBuffers; i++) + for (int i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr; uint64 buf_state; @@ -697,7 +608,7 @@ pg_buffercache_usage_counts(PG_FUNCTION_ARGS) InitMaterializedSRF(fcinfo, 0); - for (int i = 0; i < NBuffers; i++) + for (int i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); uint64 buf_state = pg_atomic_read_u64(&bufHdr->state); @@ -761,7 +672,7 @@ pg_buffercache_evict(PG_FUNCTION_ARGS) pg_buffercache_superuser_check("pg_buffercache_evict"); - if (buf < 1 || buf > NBuffers) + if (buf < 1 || buf > LocalCurrentNBuffers) elog(ERROR, "bad buffer ID: %d", buf); values[0] = BoolGetDatum(EvictUnpinnedBuffer(buf, &buffer_flushed)); @@ -878,7 +789,7 @@ pg_buffercache_mark_dirty(PG_FUNCTION_ARGS) pg_buffercache_superuser_check("pg_buffercache_mark_dirty"); - if (buf < 1 || buf > NBuffers) + if (buf < 1 || buf > LocalCurrentNBuffers) elog(ERROR, "bad buffer ID: %d", buf); values[0] = BoolGetDatum(MarkDirtyUnpinnedBuffer(buf, &buffer_already_dirty)); diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 89e187425cc77..dbc2a6df113e1 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -371,12 +371,12 @@ apw_load_buffers(void) apw_state->prewarmed_blocks = 0; /* Don't prewarm more than we can fit. */ - if (num_elements > NBuffers) + if (num_elements > LocalCurrentNBuffers) { - num_elements = NBuffers; + num_elements = LocalCurrentNBuffers; ereport(LOG, (errmsg("autoprewarm capping prewarmed blocks to %d (shared_buffers size)", - NBuffers))); + LocalCurrentNBuffers))); } /* Get the info position of the first block of the next database. */ @@ -699,9 +699,9 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged) * memory-efficient data structure.) */ block_info_array = (BlockInfoRecord *) - palloc_extended((sizeof(BlockInfoRecord) * NBuffers), MCXT_ALLOC_HUGE); + palloc_extended((sizeof(BlockInfoRecord) * LocalCurrentNBuffers), MCXT_ALLOC_HUGE); - for (num_blocks = 0, i = 0; i < NBuffers; i++) + for (num_blocks = 0, i = 0; i < LocalCurrentNBuffers; i++) { uint64 buf_state; diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index e88ddb32a054c..72621e8fdbb60 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -164,7 +164,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo) */ sort_threshold = (maintenance_work_mem * (Size) 1024) / BLCKSZ; if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP) - sort_threshold = Min(sort_threshold, NBuffers); + sort_threshold = Min(sort_threshold, LocalCurrentNBuffers); else sort_threshold = Min(sort_threshold, NLocBuffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f30a56ecf5539..8291937d8babc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -390,7 +390,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * if you change this, consider changing that one, too. */ if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) && - scan->rs_nblocks > NBuffers / 4) + scan->rs_nblocks > LocalCurrentNBuffers / 4) { allow_strat = (scan->rs_base.rs_flags & SO_ALLOW_STRAT) != 0; allow_sync = (scan->rs_base.rs_flags & SO_ALLOW_SYNC) != 0; diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 87491796523e8..10e82ba41b195 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -426,7 +426,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) /* compare phs_syncscan initialization to similar logic in initscan */ bpscan->base.phs_syncscan = synchronize_seqscans && !RelationUsesLocalBuffers(rel) && - bpscan->phs_nblocks > NBuffers / 4; + bpscan->phs_nblocks > LocalCurrentNBuffers / 4; SpinLockInit(&bpscan->phs_mutex); bpscan->phs_startblock = InvalidBlockNumber; bpscan->phs_numblock = InvalidBlockNumber; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c6a64a129c114..f0acc6218c818 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6861,7 +6861,7 @@ LogCheckpointEnd(bool restartpoint) "longest=%ld.%03d s, average=%ld.%03d s; distance=%d kB, " "estimate=%d kB; lsn=%X/%08X, redo lsn=%X/%08X", CheckpointStats.ckpt_bufs_written, - (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers, + (double) CheckpointStats.ckpt_bufs_written * 100 / LocalCurrentNBuffers, CheckpointStats.ckpt_slru_written, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, @@ -6885,7 +6885,7 @@ LogCheckpointEnd(bool restartpoint) "longest=%ld.%03d s, average=%ld.%03d s; distance=%d kB, " "estimate=%d kB; lsn=%X/%08X, redo lsn=%X/%08X", CheckpointStats.ckpt_bufs_written, - (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers, + (double) CheckpointStats.ckpt_bufs_written * 100 / LocalCurrentNBuffers, CheckpointStats.ckpt_slru_written, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, @@ -7486,7 +7486,7 @@ CreateCheckPoint(int flags) update_checkpoint_display(flags, false, true); TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written, - NBuffers, + LocalCurrentNBuffers, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, CheckpointStats.ckpt_segs_recycled); diff --git a/src/backend/storage/buffer/buf_resize.c b/src/backend/storage/buffer/buf_resize.c index 836811356725b..3f6914181b331 100644 --- a/src/backend/storage/buffer/buf_resize.c +++ b/src/backend/storage/buffer/buf_resize.c @@ -58,7 +58,7 @@ MarkBufferResizingEnd(int newNBuffers) /* * TODO: should we leave targetNBuffers as is? We are setting it to - * NBuffers in BufferManagerShmemInit(). + * the shared_buffers GUC value in BufferManagerShmemInit(). */ pg_atomic_write_u32(&ShmemCtrl->targetNBuffers, 0); ShmemCtrl->coordinator = -1; @@ -84,6 +84,9 @@ SharedBufferResizeBarrier(ProcSignalBarrierType barrier, const char *barrier_nam case PROCSIGNAL_BARRIER_SHBUF_SHRINK: INJECTION_POINT("pgrsb-shrink-barrier-sent", NULL); break; + case PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK: + INJECTION_POINT("pgrsb-shmem-resize-shrink-barrier-sent", NULL); + break; case PROCSIGNAL_BARRIER_SHBUF_RESIZE_MAP_AND_MEM: INJECTION_POINT("pgrsb-resize-barrier-sent", NULL); break; @@ -211,15 +214,20 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) if (targetNBuffers < currentNBuffers) { /* - * Phase 1: Shrinking - send SHBUF_SHRINK barrier Every backend sets - * activeNBuffers = NewNBuffers to restrict buffer pool allocations to - * the new size + * Phase 1: Shrinking - send SHBUF_SHRINK barrier. Backends update + * LocalActiveNBuffers to restrict buffer pool allocations to the + * new size. */ elog(LOG, "Phase 1: Shrinking buffer pool, restricting allocations to %d buffers", targetNBuffers); StrategyReset(targetNBuffers); SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_SHRINK, CppAsString(PROCSIGNAL_BARRIER_SHBUF_SHRINK)); + /* Coordinator updates its own shadow */ + elog(LOG, "coordinator %d: LocalActiveNBuffers %d -> %d (shrink)", + MyProcPid, LocalActiveNBuffers, targetNBuffers); + LocalActiveNBuffers = targetNBuffers; + /* Evict buffers in the area being shrunk */ elog(LOG, "evicting buffers %u..%u", targetNBuffers + 1, currentNBuffers); if (!EvictExtraBuffers(targetNBuffers, currentNBuffers)) @@ -227,6 +235,12 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) elog(WARNING, "failed to evict extra buffers during shrinking"); StrategyReset(currentNBuffers); SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_RESIZE_FAILED, CppAsString(PROCSIGNAL_BARRIER_SHBUF_RESIZE_FAILED)); + + /* Restore coordinator shadows */ + elog(LOG, "coordinator %d: LocalActiveNBuffers %d -> %d (shrink failed, restoring)", + MyProcPid, LocalActiveNBuffers, currentNBuffers); + LocalActiveNBuffers = currentNBuffers; + MarkBufferResizingEnd(currentNBuffers); pg_atomic_clear_flag(&ShmemCtrl->resize_in_progress); PG_RETURN_BOOL(false); @@ -240,6 +254,19 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) /* Update the current NBuffers. */ pg_atomic_write_u32(&ShmemCtrl->currentNBuffers, targetNBuffers); + + /* + * Emit SHMEM_RESIZE barrier so backends validate the resized + * structures and update LocalCurrentNBuffers after the shared + * currentNBuffers changes. + */ + SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK, CppAsString(PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK)); + + /* Coordinator updates its own shadow */ + elog(LOG, "coordinator %d: LocalCurrentNBuffers %d -> %d (shrink shmem resize)", + MyProcPid, LocalCurrentNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; + } /* Phase 2: SHBUF_RESIZE_MAP_AND_MEM - Both expanding and shrinking */ @@ -276,6 +303,14 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) * expanded range */ elog(LOG, "Phase 3: Expanding buffer pool, enabling allocations up to %d buffers", targetNBuffers); + + /* Coordinator updates its own shadows */ + elog(LOG, "coordinator %d: LocalCurrentNBuffers %d -> %d, LocalActiveNBuffers %d -> %d (expand)", + MyProcPid, LocalCurrentNBuffers, targetNBuffers, LocalActiveNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; + LocalActiveNBuffers = targetNBuffers; + + /* Now safe to update shared values */ StrategyReset(targetNBuffers); pg_atomic_write_u32(&ShmemCtrl->currentNBuffers, targetNBuffers); @@ -289,6 +324,10 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) pg_atomic_clear_flag(&ShmemCtrl->resize_in_progress); + /* Final consistency check */ + Assert(LocalCurrentNBuffers == targetNBuffers); + Assert(LocalActiveNBuffers == targetNBuffers); + elog(LOG, "successfully resized shared buffers to %d", targetNBuffers); PG_RETURN_BOOL(result); @@ -315,6 +354,36 @@ ProcessBarrierShmemShrink(void) elog(LOG, "Phase 1: Processing SHBUF_SHRINK barrier - target buffer pool size = %d, coordinator is %d", targetNBuffers, ShmemCtrl->coordinator); + + /* Update per-process shadow to reflect the restricted allocation range */ + elog(DEBUG1, "backend %d acknowledged SHBUF_SHRINK: LocalActiveNBuffers %d -> %d", + MyProcPid, LocalActiveNBuffers, targetNBuffers); + LocalActiveNBuffers = targetNBuffers; + + return true; +} + +bool +ProcessBarrierShmemResizeShrink(void) +{ + int targetNBuffers = pg_atomic_read_u32(&ShmemCtrl->targetNBuffers); + + Assert(!pg_atomic_unlocked_test_flag(&ShmemCtrl->resize_in_progress)); + + /* + * Delay adjusting the new active size of buffer pool till this process + * becomes ready to resize buffers. + */ + if (delay_shmem_resize) + { + elog(LOG, "delaying SHMEM_RESIZE_SHRINK barrier acknowledgment for %d buffers, coordinator is %d", + targetNBuffers, ShmemCtrl->coordinator); + return false; + } + + elog(DEBUG1, "backend %d acknowledged SHMEM_RESIZE_SHRINK: LocalCurrentNBuffers %d -> %d", + MyProcPid, LocalCurrentNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; return true; } @@ -389,6 +458,12 @@ ProcessBarrierShmemExpand(void) elog(LOG, "Phase 3: Processing SHBUF_EXPAND barrier - targetNBuffers = %d, ShmemCtrl->coordinator = %d", targetNBuffers, ShmemCtrl->coordinator); + /* Update the per-process shadow copies of the current and active buffer counts */ + elog(DEBUG1, "backend %d acknowledged SHBUF_EXPAND: LocalCurrentNBuffers %d -> %d, LocalActiveNBuffers %d -> %d", + MyProcPid, LocalCurrentNBuffers, targetNBuffers, LocalActiveNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; + LocalActiveNBuffers = targetNBuffers; + return true; } @@ -403,6 +478,14 @@ ProcessBarrierShmemResizeFailed(void) elog(LOG, "received proc signal indicating failure to resize shared buffers from %d to %d, restoring to %d, coordinator is %d", currentNBuffers, targetNBuffers, currentNBuffers, ShmemCtrl->coordinator); + /* + * Update per-process shadow to reflect that the resize failed and we are + * restoring the active buffer pool size to the previous value. + */ + elog(DEBUG1, "backend %d acknowledged RESIZE_FAILED: LocalActiveNBuffers %d -> %d", + MyProcPid, LocalActiveNBuffers, currentNBuffers); + LocalActiveNBuffers = currentNBuffers; + return true; } diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index d39e65ef811d0..f9b0a4b5dcaa0 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -91,7 +91,7 @@ * being dropped. For the relations with size below this threshold, we find * the buffers by doing lookups in BufMapping table. */ -#define BUF_DROP_FULL_SCAN_THRESHOLD (uint64) (NBuffers / 32) +#define BUF_DROP_FULL_SCAN_THRESHOLD (uint64) (LocalCurrentNBuffers / 32) /* * This is separated out from PrivateRefCountEntry to allow for copying all @@ -3496,7 +3496,7 @@ BufferSync(int flags) * certainly need to be written for the next checkpoint attempt, too. */ num_to_scan = 0; - for (buf_id = 0; buf_id < NBuffers; buf_id++) + for (buf_id = 0; buf_id < LocalCurrentNBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint64 set_bits = 0; @@ -3538,7 +3538,7 @@ BufferSync(int flags) WritebackContextInit(&wb_context, &checkpoint_flush_after); - TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); + TRACE_POSTGRESQL_BUFFER_SYNC_START(LocalCurrentNBuffers, num_to_scan); /* * Sort buffers that need to be written to reduce the likelihood of random @@ -3722,7 +3722,7 @@ BufferSync(int flags) */ CheckpointStats.ckpt_bufs_written += num_written; - TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan); + TRACE_POSTGRESQL_BUFFER_SYNC_DONE(LocalCurrentNBuffers, num_written, num_to_scan); } #define BGW_DEBUG 1 @@ -4156,6 +4156,21 @@ InitBufferManagerAccess(void) { HASHCTL hash_ctl; + /* + * Initialize per-process shadow copies of the shared buffer pool + * dimensions. These are kept in sync with shared memory values during + * buffer pool resize operations via barrier processing. + */ + LocalCurrentNBuffers = pg_atomic_read_u32(&ShmemCtrl->currentNBuffers); + LocalActiveNBuffers = StrategyGetActiveNBuffers(); + + elog(DEBUG1, "InitBufferManagerAccess: backend %d LocalCurrentNBuffers = %d, LocalActiveNBuffers = %d", + MyProcPid, LocalCurrentNBuffers, LocalActiveNBuffers); + + Assert(LocalCurrentNBuffers > 0); + Assert(LocalActiveNBuffers > 0); + Assert(LocalActiveNBuffers <= LocalCurrentNBuffers); + /* * An advisory limit on the number of pins each backend should hold, based * on shared_buffers and the maximum number of connections possible. @@ -4163,7 +4178,7 @@ InitBufferManagerAccess(void) * allow plenty of pins. LimitAdditionalPins() and * GetAdditionalPinLimit() can be used to check the remaining balance. */ - MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); + MaxProportionalPins = LocalCurrentNBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray)); memset(&PrivateRefCountArrayKeys, 0, sizeof(PrivateRefCountArrayKeys)); @@ -4805,7 +4820,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, return; } - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -4966,7 +4981,7 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators) if (use_bsearch) qsort(locators, n, sizeof(RelFileLocator), rlocator_comparator); - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { RelFileLocator *rlocator = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5093,7 +5108,7 @@ DropDatabaseBuffers(Oid dbid) * database isn't our own. */ - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5179,7 +5194,7 @@ FlushRelationBuffers(Relation rel) return; } - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { uint64 buf_state; @@ -5249,7 +5264,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) if (use_bsearch) qsort(srels, nrels, sizeof(SMgrSortArray), rlocator_comparator); - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { SMgrSortArray *srelent = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5500,7 +5515,7 @@ FlushDatabaseBuffers(Oid dbid) int i; BufferDesc *bufHdr; - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { uint64 buf_state; @@ -7622,7 +7637,7 @@ EvictAllUnpinnedBuffers(int32 *buffers_evicted, int32 *buffers_flushed, *buffers_skipped = 0; *buffers_flushed = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state; @@ -7674,7 +7689,7 @@ EvictRelUnpinnedBuffers(Relation rel, int32 *buffers_evicted, *buffers_evicted = 0; *buffers_flushed = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state = pg_atomic_read_u64(&(desc->state)); @@ -7819,7 +7834,7 @@ MarkDirtyRelUnpinnedBuffers(Relation rel, *buffers_already_dirty = 0; *buffers_skipped = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state = pg_atomic_read_u64(&(desc->state)); @@ -7873,7 +7888,7 @@ MarkDirtyAllUnpinnedBuffers(int32 *buffers_dirtied, *buffers_already_dirty = 0; *buffers_skipped = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state; diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 0aae2c08bc41d..40ee642766164 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -121,12 +121,12 @@ ClockSweepTick(void) victim = pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1); - if (victim >= pg_atomic_read_u32(&StrategyControl->activeNBuffers)) + if (victim >= (uint32) LocalActiveNBuffers) { uint32 originalVictim = victim; /* always wrap what we look up in BufferDescriptors */ - victim = victim % pg_atomic_read_u32(&StrategyControl->activeNBuffers); + victim = victim % (uint32) LocalActiveNBuffers; /* * If we're the one that just caused a wraparound, force @@ -154,7 +154,7 @@ ClockSweepTick(void) */ SpinLockAcquire(&StrategyControl->buffer_strategy_lock); - wrapped = expected % pg_atomic_read_u32(&StrategyControl->activeNBuffers); + wrapped = expected % (uint32) LocalActiveNBuffers; success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer, &expected, wrapped); @@ -239,7 +239,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1); /* Use the "clock sweep" algorithm to find a free buffer */ - trycounter = pg_atomic_read_u32(&StrategyControl->activeNBuffers); + trycounter = LocalActiveNBuffers; for (;;) { @@ -293,7 +293,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state, local_buf_state)) { - trycounter = pg_atomic_read_u32(&StrategyControl->activeNBuffers); + trycounter = LocalActiveNBuffers; break; } } @@ -339,7 +339,7 @@ StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc, uint32 *active SpinLockAcquire(&StrategyControl->buffer_strategy_lock); nextVictimBuffer = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer); - activeNBuffers = pg_atomic_read_u32(&StrategyControl->activeNBuffers); + activeNBuffers = (uint32) LocalActiveNBuffers; result = nextVictimBuffer % activeNBuffers; if (complete_passes) @@ -450,6 +450,17 @@ StrategyReset(int activeNBuffers) SpinLockRelease(&StrategyControl->buffer_strategy_lock); } +/* + * StrategyGetActiveNBuffers -- return the current active buffer count. + * + * This is a simple accessor since StrategyControl is private to this file. + */ +int +StrategyGetActiveNBuffers(void) +{ + return (int) pg_atomic_read_u32(&StrategyControl->activeNBuffers); +} + /* * StrategyInitialize -- initialize the buffer cache replacement * strategy. @@ -629,9 +640,9 @@ GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb) return NULL; /* Cap to 1/8th of shared_buffers */ - ring_buffers = Min(NBuffers / 8, ring_buffers); + ring_buffers = Min(LocalCurrentNBuffers / 8, ring_buffers); - /* NBuffers should never be less than 16, so this shouldn't happen */ + /* LocalCurrentNBuffers should never be less than 16, so this shouldn't happen */ Assert(ring_buffers > 0); /* Allocate the object and initialize all elements to zeroes */ @@ -680,7 +691,7 @@ int GetAccessStrategyPinLimit(BufferAccessStrategy strategy) { if (strategy == NULL) - return NBuffers; + return LocalCurrentNBuffers; switch (strategy->btype) { @@ -757,7 +768,7 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint64 *buf_state) */ bufnum = strategy->buffers[strategy->current]; if (bufnum == InvalidBuffer || - bufnum > pg_atomic_read_u32(&StrategyControl->activeNBuffers)) + bufnum > (uint32) LocalActiveNBuffers) return NULL; buf = GetBufferDescriptor(bufnum - 1); diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index df9bb7a0f1a91..eec0666227bf3 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -626,6 +626,9 @@ ProcessProcSignalBarrier(void) case PROCSIGNAL_BARRIER_SHBUF_SHRINK: processed = ProcessBarrierShmemShrink(); break; + case PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK: + processed = ProcessBarrierShmemResizeShrink(); + break; case PROCSIGNAL_BARRIER_SHBUF_RESIZE_MAP_AND_MEM: processed = ProcessBarrierShmemResizeMapAndMem(); break; diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index be9d3909c0f5e..83d36989adeb4 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -143,6 +143,16 @@ int NBuffers = 0; int NBuffersPending = 16384; bool finalMaxNBuffers = false; int MaxNBuffers = 0; + +/* + * Per-process shadow copies of shared buffer pool dimensions, kept in sync + * with the shared memory values during buffer pool resize operations via + * barrier processing. Initialized at backend startup in + * InitBufferManagerAccess(). + */ +int LocalCurrentNBuffers = 0; +int LocalActiveNBuffers = 0; + int MaxConnections = 100; int max_worker_processes = 8; int max_parallel_workers = 8; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index c3daf4e2b3b25..1516661de0058 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -576,6 +576,7 @@ extern void StrategyNotifyBgWriter(int bgwprocno); extern Size StrategyShmemSize(void); extern void StrategyInitialize(bool init); extern void StrategyReset(int activeNBuffers); +extern int StrategyGetActiveNBuffers(void); /* buf_table.c */ extern Size BufTableShmemSize(int size); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 9ced25c77e366..74f74acab66b9 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -162,6 +162,13 @@ typedef struct WritebackContext WritebackContext; extern PGDLLIMPORT int NBuffers; extern PGDLLIMPORT int NBuffersPending; +/* + * Per-process shadow copies of shared buffer pool dimensions. + * Updated during buffer pool resize via barrier handlers. + */ +extern PGDLLIMPORT int LocalCurrentNBuffers; +extern PGDLLIMPORT int LocalActiveNBuffers; + /* in bufmgr.c */ extern PGDLLIMPORT bool zero_damaged_pages; extern PGDLLIMPORT int bgwriter_lru_maxpages; @@ -427,7 +434,7 @@ extern void FreeAccessStrategy(BufferAccessStrategy strategy); static inline bool BufferIsValid(Buffer bufnum) { - Assert(bufnum <= (Buffer) pg_atomic_read_u32(&ShmemCtrl->currentNBuffers)); + Assert(bufnum <= (Buffer) LocalCurrentNBuffers); Assert(bufnum >= -NLocBuffer); return bufnum != InvalidBuffer; @@ -484,6 +491,7 @@ BufferGetPage(Buffer buffer) /* buf_resize.c */ extern Datum pg_resize_shared_buffers(PG_FUNCTION_ARGS); extern bool ProcessBarrierShmemShrink(void); +extern bool ProcessBarrierShmemResizeShrink(void); extern bool ProcessBarrierShmemResizeMapAndMem(void); extern bool ProcessBarrierShmemExpand(void); extern bool ProcessBarrierShmemResizeFailed(void); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 9dc9819a72bc9..0b8ec0a04dddb 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -58,6 +58,9 @@ typedef enum * XLogLogicalInfo */ PROCSIGNAL_BARRIER_SHBUF_SHRINK, /* shrink buffer pool - restrict * allocations to new size */ + PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK, /* buffer structures resized - + * validate and update + * LocalCurrentNBuffers */ PROCSIGNAL_BARRIER_SHBUF_RESIZE_MAP_AND_MEM, /* remap shared memory * segments and update * structure pointers */ From 2789ff51c11d389ddd13cd617fd835e4ebfc53ec Mon Sep 17 00:00:00 2001 From: Palak Chaturvedi Date: Wed, 6 May 2026 13:41:31 +0000 Subject: [PATCH 2/2] Fix review issues in shadow variable synchronization - Revert ClockSweepTick() and StrategySyncStart() to use shared atomic reads of activeNBuffers instead of LocalActiveNBuffers. All backends in the CAS loop on nextVictimBuffer must use the same modulus to avoid inconsistent wrap-around during the barrier propagation window. LocalActiveNBuffers is still used for the purely-local trycounter in StrategyGetBuffer() and the bounds check in GetBufferFromRing(). - Fix expand path ordering: update shared state (StrategyReset and ShmemCtrl->currentNBuffers) before coordinator's local shadows, so there is no window where the local view is ahead of shared state. - Fix trailing whitespace in ProcessBarrierShmemShrink(). - Fix comment alignment for PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK in procsignal.h. - Add detailed comment in GetBufferFromRing() explaining why LocalActiveNBuffers is safe for the ring buffer bounds check. - Update ClockSweepTick() comment to explain why the shared atomic is used instead of LocalActiveNBuffers. --- src/backend/storage/buffer/buf_resize.c | 10 +++++----- src/backend/storage/buffer/freelist.c | 24 +++++++++++++++++------- src/include/storage/procsignal.h | 4 ++-- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/src/backend/storage/buffer/buf_resize.c b/src/backend/storage/buffer/buf_resize.c index 3f6914181b331..f889a4d4c9e31 100644 --- a/src/backend/storage/buffer/buf_resize.c +++ b/src/backend/storage/buffer/buf_resize.c @@ -304,16 +304,16 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) */ elog(LOG, "Phase 3: Expanding buffer pool, enabling allocations up to %d buffers", targetNBuffers); + /* Update shared values first, then coordinator's shadows */ + StrategyReset(targetNBuffers); + pg_atomic_write_u32(&ShmemCtrl->currentNBuffers, targetNBuffers); + /* Coordinator updates its own shadows */ elog(LOG, "coordinator %d: LocalCurrentNBuffers %d -> %d, LocalActiveNBuffers %d -> %d (expand)", MyProcPid, LocalCurrentNBuffers, targetNBuffers, LocalActiveNBuffers, targetNBuffers); LocalCurrentNBuffers = targetNBuffers; LocalActiveNBuffers = targetNBuffers; - /* Now safe to update shared values */ - StrategyReset(targetNBuffers); - pg_atomic_write_u32(&ShmemCtrl->currentNBuffers, targetNBuffers); - SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_EXPAND, CppAsString(PROCSIGNAL_BARRIER_SHBUF_EXPAND)); } @@ -354,7 +354,7 @@ ProcessBarrierShmemShrink(void) elog(LOG, "Phase 1: Processing SHBUF_SHRINK barrier - target buffer pool size = %d, coordinator is %d", targetNBuffers, ShmemCtrl->coordinator); - + /* Update per-process shadow to reflect the restricted allocation range */ elog(DEBUG1, "backend %d acknowledged SHBUF_SHRINK: LocalActiveNBuffers %d -> %d", MyProcPid, LocalActiveNBuffers, targetNBuffers); diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 40ee642766164..2f950cc6bb705 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -114,19 +114,20 @@ ClockSweepTick(void) /* * Atomically move hand ahead one buffer - if there's several processes * doing this, this can lead to buffers being returned slightly out of - * apparent order. We need to read both the current position of hand and - * the current buffer allocation limit together consistently. They may be - * reset by concurrent resize. + * apparent order. We read the shared activeNBuffers here (not the + * per-process LocalActiveNBuffers) because all backends participating + * in the CAS loop on nextVictimBuffer must use the same modulus to + * avoid inconsistent wrap-around. */ victim = pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1); - if (victim >= (uint32) LocalActiveNBuffers) + if (victim >= pg_atomic_read_u32(&StrategyControl->activeNBuffers)) { uint32 originalVictim = victim; /* always wrap what we look up in BufferDescriptors */ - victim = victim % (uint32) LocalActiveNBuffers; + victim = victim % pg_atomic_read_u32(&StrategyControl->activeNBuffers); /* * If we're the one that just caused a wraparound, force @@ -154,7 +155,7 @@ ClockSweepTick(void) */ SpinLockAcquire(&StrategyControl->buffer_strategy_lock); - wrapped = expected % (uint32) LocalActiveNBuffers; + wrapped = expected % pg_atomic_read_u32(&StrategyControl->activeNBuffers); success = pg_atomic_compare_exchange_u32(&StrategyControl->nextVictimBuffer, &expected, wrapped); @@ -339,7 +340,7 @@ StrategySyncStart(uint32 *complete_passes, uint32 *num_buf_alloc, uint32 *active SpinLockAcquire(&StrategyControl->buffer_strategy_lock); nextVictimBuffer = pg_atomic_read_u32(&StrategyControl->nextVictimBuffer); - activeNBuffers = (uint32) LocalActiveNBuffers; + activeNBuffers = pg_atomic_read_u32(&StrategyControl->activeNBuffers); result = nextVictimBuffer % activeNBuffers; if (complete_passes) @@ -766,6 +767,15 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint64 *buf_state) * possible to notice when we touch the first of those objects and the * last of objects. See if this can fixed. */ + /* + * During a shrink, a ring buffer might hold a buffer ID from the old + * (larger) range. Using LocalActiveNBuffers (updated via barrier) is + * safe here: if this backend hasn't acknowledged SHBUF_SHRINK yet, + * LocalActiveNBuffers is still the old size and the stale entry passes; + * the buffer descriptor is still valid since LocalCurrentNBuffers is + * also still old. Once the barrier is acknowledged, LocalActiveNBuffers + * reflects the new size and stale entries are correctly rejected. + */ bufnum = strategy->buffers[strategy->current]; if (bufnum == InvalidBuffer || bufnum > (uint32) LocalActiveNBuffers) diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 0b8ec0a04dddb..63a278c4b0823 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -59,8 +59,8 @@ typedef enum PROCSIGNAL_BARRIER_SHBUF_SHRINK, /* shrink buffer pool - restrict * allocations to new size */ PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK, /* buffer structures resized - - * validate and update - * LocalCurrentNBuffers */ + * validate and update + * LocalCurrentNBuffers */ PROCSIGNAL_BARRIER_SHBUF_RESIZE_MAP_AND_MEM, /* remap shared memory * segments and update * structure pointers */