diff --git a/contrib/pg_buffercache/pg_buffercache_pages.c b/contrib/pg_buffercache/pg_buffercache_pages.c index 8a17319ff2a0a..ebc52fc0c6723 100644 --- a/contrib/pg_buffercache/pg_buffercache_pages.c +++ b/contrib/pg_buffercache/pg_buffercache_pages.c @@ -15,6 +15,7 @@ #include "port/pg_numa.h" #include "storage/buf_internals.h" #include "storage/bufmgr.h" +#include "utils/injection_point.h" #include "utils/rel.h" #include "utils/tuplestore.h" @@ -37,39 +38,6 @@ PG_MODULE_MAGIC_EXT( .version = PG_VERSION ); -/* - * Record structure holding the to be exposed cache data. - */ -typedef struct -{ - uint32 bufferid; - RelFileNumber relfilenumber; - Oid reltablespace; - Oid reldatabase; - ForkNumber forknum; - BlockNumber blocknum; - bool isvalid; - bool isdirty; - uint16 usagecount; - - /* - * An int32 is sufficiently large, as MAX_BACKENDS prevents a buffer from - * being pinned by too many backends and each backend will only pin once - * because of bufmgr.c's PrivateRefCount infrastructure. - */ - int32 pinning_backends; -} BufferCachePagesRec; - - -/* - * Function context for data persisting over repeated calls. - */ -typedef struct -{ - TupleDesc tupdesc; - BufferCachePagesRec *record; -} BufferCachePagesContext; - /* * Record structure holding the to be exposed cache data for OS pages. This * structure is used by pg_buffercache_os_pages(), where NUMA information may @@ -118,153 +86,93 @@ static bool firstNumaTouch = true; Datum pg_buffercache_pages(PG_FUNCTION_ARGS) { - FuncCallContext *funcctx; - Datum result; - MemoryContext oldcontext; - BufferCachePagesContext *fctx; /* User function context. */ - TupleDesc tupledesc; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; TupleDesc expected_tupledesc; - HeapTuple tuple; - int currentNBuffers = pg_atomic_read_u32(&ShmemCtrl->currentNBuffers); + int i; - if (SRF_IS_FIRSTCALL()) - { - int i; - - funcctx = SRF_FIRSTCALL_INIT(); - - /* Switch context when allocating stuff to be used in later calls */ - oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx); - - /* Create a user function context for cross-call persistence */ - fctx = palloc_object(BufferCachePagesContext); - - /* - * To smoothly support upgrades from version 1.0 of this extension - * transparently handle the (non-)existence of the pinning_backends - * column. We unfortunately have to get the result type for that... - - * we can't use the result type determined by the function definition - * without potentially crashing when somebody uses the old (or even - * wrong) function definition though. - */ - if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) - elog(ERROR, "return type must be a row type"); - - if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM || - expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM) - elog(ERROR, "incorrect number of output arguments"); - - /* Construct a tuple descriptor for the result rows. */ - tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts); - TupleDescInitEntry(tupledesc, (AttrNumber) 1, "bufferid", - INT4OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenode", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reltablespace", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 4, "reldatabase", - OIDOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 5, "relforknumber", - INT2OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 6, "relblocknumber", - INT8OID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 7, "isdirty", - BOOLOID, -1, 0); - TupleDescInitEntry(tupledesc, (AttrNumber) 8, "usage_count", - INT2OID, -1, 0); - - if (expected_tupledesc->natts == NUM_BUFFERCACHE_PAGES_ELEM) - TupleDescInitEntry(tupledesc, (AttrNumber) 9, "pinning_backends", - INT4OID, -1, 0); - - fctx->tupdesc = BlessTupleDesc(tupledesc); - - /* Allocate NBuffers worth of BufferCachePagesRec records. */ - fctx->record = (BufferCachePagesRec *) - MemoryContextAllocHuge(CurrentMemoryContext, - sizeof(BufferCachePagesRec) * currentNBuffers); - - /* Set max calls and remember the user function context. */ - funcctx->max_calls = currentNBuffers; - funcctx->user_fctx = fctx; - - /* Return to original context when allocating transient memory */ - MemoryContextSwitchTo(oldcontext); - - /* - * Scan through all the buffers, saving the relevant fields in the - * fctx->record structure. - * - * We don't hold the partition locks, so we don't get a consistent - * snapshot across all buffers, but we do grab the buffer header - * locks, so the information of each buffer is self-consistent. - */ - for (i = 0; i < currentNBuffers; i++) - { - BufferDesc *bufHdr; - uint64 buf_state; - - CHECK_FOR_INTERRUPTS(); - - /* - * TODO: We should just scan the entire buffer descriptor array - * instead of relying on curent buffer pool size. But that can - * happen if only we setup the descriptor array large enough at - * the server startup time. - */ - if (currentNBuffers != pg_atomic_read_u32(&ShmemCtrl->currentNBuffers)) - ereport(ERROR, - (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("number of shared buffers changed during scan of buffer cache"))); - - bufHdr = GetBufferDescriptor(i); - /* Lock each buffer header before inspecting. */ - buf_state = LockBufHdr(bufHdr); + /* + * To smoothly support upgrades from version 1.0 of this extension + * transparently handle the (non-)existence of the pinning_backends + * column. We unfortunately have to get the result type for that... - we + * can't use the result type determined by the function definition without + * potentially crashing when somebody uses the old (or even wrong) + * function definition though. + */ + if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); - fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr); - fctx->record[i].relfilenumber = BufTagGetRelNumber(&bufHdr->tag); - fctx->record[i].reltablespace = bufHdr->tag.spcOid; - fctx->record[i].reldatabase = bufHdr->tag.dbOid; - fctx->record[i].forknum = BufTagGetForkNum(&bufHdr->tag); - fctx->record[i].blocknum = bufHdr->tag.blockNum; - fctx->record[i].usagecount = BUF_STATE_GET_USAGECOUNT(buf_state); - fctx->record[i].pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state); + if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM || + expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM) + elog(ERROR, "incorrect number of output arguments"); - if (buf_state & BM_DIRTY) - fctx->record[i].isdirty = true; - else - fctx->record[i].isdirty = false; + InitMaterializedSRF(fcinfo, 0); - /* Note if the buffer is valid, and has storage created */ - if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID)) - fctx->record[i].isvalid = true; - else - fctx->record[i].isvalid = false; + /* + * Scan through all the buffers, adding one row for each of the buffers to + * the tuplestore. + * + * We don't hold the partition locks, so we don't get a consistent + * snapshot across all buffers, but we do grab the buffer header locks, so + * the information of each buffer is self-consistent. + * + * Use LocalCurrentNBuffers so the scan range tracks the current buffer + * pool size. If a resize barrier is processed mid-scan (via + * CHECK_FOR_INTERRUPTS or an injection point wait), LocalCurrentNBuffers + * changes and we detect it. + */ + INJECTION_POINT("pg-buffercache-scan-start", NULL); + for (i = 0; i < LocalCurrentNBuffers; i++) + { + BufferDesc *bufHdr; + uint64 buf_state; + uint32 bufferid; + RelFileNumber relfilenumber; + Oid reltablespace; + Oid reldatabase; + ForkNumber forknum; + BlockNumber blocknum; + bool isvalid; + bool isdirty; + uint16 usagecount; + int32 pinning_backends; + Datum values[NUM_BUFFERCACHE_PAGES_ELEM]; + bool nulls[NUM_BUFFERCACHE_PAGES_ELEM]; - UnlockBufHdr(bufHdr); - } - } + bufHdr = GetBufferDescriptor(i); + /* Lock each buffer header before inspecting. */ + buf_state = LockBufHdr(bufHdr); + + bufferid = BufferDescriptorGetBuffer(bufHdr); + relfilenumber = BufTagGetRelNumber(&bufHdr->tag); + reltablespace = bufHdr->tag.spcOid; + reldatabase = bufHdr->tag.dbOid; + forknum = BufTagGetForkNum(&bufHdr->tag); + blocknum = bufHdr->tag.blockNum; + usagecount = BUF_STATE_GET_USAGECOUNT(buf_state); + pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state); - funcctx = SRF_PERCALL_SETUP(); + if (buf_state & BM_DIRTY) + isdirty = true; + else + isdirty = false; - /* Get the saved state */ - fctx = funcctx->user_fctx; + /* Note if the buffer is valid, and has storage created */ + if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID)) + isvalid = true; + else + isvalid = false; - if (funcctx->call_cntr < funcctx->max_calls) - { - uint32 i = funcctx->call_cntr; - Datum values[NUM_BUFFERCACHE_PAGES_ELEM]; - bool nulls[NUM_BUFFERCACHE_PAGES_ELEM]; + UnlockBufHdr(bufHdr); - values[0] = Int32GetDatum(fctx->record[i].bufferid); + /* Build the tuple and add it to tuplestore */ + values[0] = Int32GetDatum(bufferid); nulls[0] = false; /* * Set all fields except the bufferid to null if the buffer is unused * or not valid. */ - if (fctx->record[i].blocknum == InvalidBlockNumber || - fctx->record[i].isvalid == false) + if (blocknum == InvalidBlockNumber || isvalid == false) { nulls[1] = true; nulls[2] = true; @@ -278,33 +186,36 @@ pg_buffercache_pages(PG_FUNCTION_ARGS) } else { - values[1] = ObjectIdGetDatum(fctx->record[i].relfilenumber); + values[1] = ObjectIdGetDatum(relfilenumber); nulls[1] = false; - values[2] = ObjectIdGetDatum(fctx->record[i].reltablespace); + values[2] = ObjectIdGetDatum(reltablespace); nulls[2] = false; - values[3] = ObjectIdGetDatum(fctx->record[i].reldatabase); + values[3] = ObjectIdGetDatum(reldatabase); nulls[3] = false; - values[4] = Int16GetDatum(fctx->record[i].forknum); + values[4] = Int16GetDatum(forknum); nulls[4] = false; - values[5] = Int64GetDatum((int64) fctx->record[i].blocknum); + values[5] = Int64GetDatum((int64) blocknum); nulls[5] = false; - values[6] = BoolGetDatum(fctx->record[i].isdirty); + values[6] = BoolGetDatum(isdirty); nulls[6] = false; - values[7] = UInt16GetDatum(fctx->record[i].usagecount); + values[7] = UInt16GetDatum(usagecount); nulls[7] = false; /* unused for v1.0 callers, but the array is always long enough */ - values[8] = Int32GetDatum(fctx->record[i].pinning_backends); + values[8] = Int32GetDatum(pinning_backends); nulls[8] = false; } - /* Build and return the tuple. */ - tuple = heap_form_tuple(fctx->tupdesc, values, nulls); - result = HeapTupleGetDatum(tuple); + tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls); - SRF_RETURN_NEXT(funcctx, result); + /* + * Allow interrupts so that resize barriers can be processed + * mid-scan, updating LocalCurrentNBuffers. + */ + CHECK_FOR_INTERRUPTS(); + INJECTION_POINT("pg-buffercache-scan-loop", NULL); } - else - SRF_RETURN_DONE(funcctx); + + return (Datum) 0; } /* @@ -392,7 +303,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) startptr = (char *) TYPEALIGN_DOWN(os_page_size, BufferGetBlock(1)); endptr = (char *) TYPEALIGN(os_page_size, - (char *) BufferGetBlock(NBuffers) + BLCKSZ); + (char *) BufferGetBlock(LocalCurrentNBuffers) + BLCKSZ); os_page_count = (endptr - startptr) / os_page_size; /* Used to determine the NUMA node for all OS pages at once */ @@ -417,8 +328,8 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) Assert(idx == os_page_count); - elog(DEBUG1, "NUMA: NBuffers=%d os_page_count=" UINT64_FORMAT " " - "os_page_size=%zu", NBuffers, os_page_count, os_page_size); + elog(DEBUG1, "NUMA: LocalCurrentNBuffers=%d os_page_count=" UINT64_FORMAT " " + "os_page_size=%zu", LocalCurrentNBuffers, os_page_count, os_page_size); /* * If we ever get 0xff back from kernel inquiry, then we probably @@ -467,7 +378,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) * without reallocating memory. */ pages_per_buffer = Max(1, BLCKSZ / os_page_size) + 1; - max_entries = NBuffers * pages_per_buffer; + max_entries = LocalCurrentNBuffers * pages_per_buffer; /* Allocate entries for BufferCacheOsPagesRec records. */ fctx->record = (BufferCacheOsPagesRec *) @@ -490,7 +401,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa) */ startptr = (char *) TYPEALIGN_DOWN(os_page_size, (char *) BufferGetBlock(1)); idx = 0; - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { char *buffptr = (char *) BufferGetBlock(i + 1); BufferDesc *bufHdr; @@ -636,7 +547,7 @@ pg_buffercache_summary(PG_FUNCTION_ARGS) if (get_call_result_type(fcinfo, NULL, &tupledesc) != TYPEFUNC_COMPOSITE) elog(ERROR, "return type must be a row type"); - for (int i = 0; i < NBuffers; i++) + for (int i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr; uint64 buf_state; @@ -697,7 +608,7 @@ pg_buffercache_usage_counts(PG_FUNCTION_ARGS) InitMaterializedSRF(fcinfo, 0); - for (int i = 0; i < NBuffers; i++) + for (int i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); uint64 buf_state = pg_atomic_read_u64(&bufHdr->state); @@ -761,7 +672,7 @@ pg_buffercache_evict(PG_FUNCTION_ARGS) pg_buffercache_superuser_check("pg_buffercache_evict"); - if (buf < 1 || buf > NBuffers) + if (buf < 1 || buf > LocalCurrentNBuffers) elog(ERROR, "bad buffer ID: %d", buf); values[0] = BoolGetDatum(EvictUnpinnedBuffer(buf, &buffer_flushed)); @@ -878,7 +789,7 @@ pg_buffercache_mark_dirty(PG_FUNCTION_ARGS) pg_buffercache_superuser_check("pg_buffercache_mark_dirty"); - if (buf < 1 || buf > NBuffers) + if (buf < 1 || buf > LocalCurrentNBuffers) elog(ERROR, "bad buffer ID: %d", buf); values[0] = BoolGetDatum(MarkDirtyUnpinnedBuffer(buf, &buffer_already_dirty)); diff --git a/contrib/pg_prewarm/autoprewarm.c b/contrib/pg_prewarm/autoprewarm.c index 89e187425cc77..dbc2a6df113e1 100644 --- a/contrib/pg_prewarm/autoprewarm.c +++ b/contrib/pg_prewarm/autoprewarm.c @@ -371,12 +371,12 @@ apw_load_buffers(void) apw_state->prewarmed_blocks = 0; /* Don't prewarm more than we can fit. */ - if (num_elements > NBuffers) + if (num_elements > LocalCurrentNBuffers) { - num_elements = NBuffers; + num_elements = LocalCurrentNBuffers; ereport(LOG, (errmsg("autoprewarm capping prewarmed blocks to %d (shared_buffers size)", - NBuffers))); + LocalCurrentNBuffers))); } /* Get the info position of the first block of the next database. */ @@ -699,9 +699,9 @@ apw_dump_now(bool is_bgworker, bool dump_unlogged) * memory-efficient data structure.) */ block_info_array = (BlockInfoRecord *) - palloc_extended((sizeof(BlockInfoRecord) * NBuffers), MCXT_ALLOC_HUGE); + palloc_extended((sizeof(BlockInfoRecord) * LocalCurrentNBuffers), MCXT_ALLOC_HUGE); - for (num_blocks = 0, i = 0; i < NBuffers; i++) + for (num_blocks = 0, i = 0; i < LocalCurrentNBuffers; i++) { uint64 buf_state; diff --git a/src/backend/access/hash/hash.c b/src/backend/access/hash/hash.c index e88ddb32a054c..72621e8fdbb60 100644 --- a/src/backend/access/hash/hash.c +++ b/src/backend/access/hash/hash.c @@ -164,7 +164,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo) */ sort_threshold = (maintenance_work_mem * (Size) 1024) / BLCKSZ; if (index->rd_rel->relpersistence != RELPERSISTENCE_TEMP) - sort_threshold = Min(sort_threshold, NBuffers); + sort_threshold = Min(sort_threshold, LocalCurrentNBuffers); else sort_threshold = Min(sort_threshold, NLocBuffer); diff --git a/src/backend/access/heap/heapam.c b/src/backend/access/heap/heapam.c index f30a56ecf5539..8291937d8babc 100644 --- a/src/backend/access/heap/heapam.c +++ b/src/backend/access/heap/heapam.c @@ -390,7 +390,7 @@ initscan(HeapScanDesc scan, ScanKey key, bool keep_startblock) * if you change this, consider changing that one, too. */ if (!RelationUsesLocalBuffers(scan->rs_base.rs_rd) && - scan->rs_nblocks > NBuffers / 4) + scan->rs_nblocks > LocalCurrentNBuffers / 4) { allow_strat = (scan->rs_base.rs_flags & SO_ALLOW_STRAT) != 0; allow_sync = (scan->rs_base.rs_flags & SO_ALLOW_SYNC) != 0; diff --git a/src/backend/access/table/tableam.c b/src/backend/access/table/tableam.c index 87491796523e8..10e82ba41b195 100644 --- a/src/backend/access/table/tableam.c +++ b/src/backend/access/table/tableam.c @@ -426,7 +426,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan) /* compare phs_syncscan initialization to similar logic in initscan */ bpscan->base.phs_syncscan = synchronize_seqscans && !RelationUsesLocalBuffers(rel) && - bpscan->phs_nblocks > NBuffers / 4; + bpscan->phs_nblocks > LocalCurrentNBuffers / 4; SpinLockInit(&bpscan->phs_mutex); bpscan->phs_startblock = InvalidBlockNumber; bpscan->phs_numblock = InvalidBlockNumber; diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index c6a64a129c114..f0acc6218c818 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -6861,7 +6861,7 @@ LogCheckpointEnd(bool restartpoint) "longest=%ld.%03d s, average=%ld.%03d s; distance=%d kB, " "estimate=%d kB; lsn=%X/%08X, redo lsn=%X/%08X", CheckpointStats.ckpt_bufs_written, - (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers, + (double) CheckpointStats.ckpt_bufs_written * 100 / LocalCurrentNBuffers, CheckpointStats.ckpt_slru_written, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, @@ -6885,7 +6885,7 @@ LogCheckpointEnd(bool restartpoint) "longest=%ld.%03d s, average=%ld.%03d s; distance=%d kB, " "estimate=%d kB; lsn=%X/%08X, redo lsn=%X/%08X", CheckpointStats.ckpt_bufs_written, - (double) CheckpointStats.ckpt_bufs_written * 100 / NBuffers, + (double) CheckpointStats.ckpt_bufs_written * 100 / LocalCurrentNBuffers, CheckpointStats.ckpt_slru_written, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, @@ -7486,7 +7486,7 @@ CreateCheckPoint(int flags) update_checkpoint_display(flags, false, true); TRACE_POSTGRESQL_CHECKPOINT_DONE(CheckpointStats.ckpt_bufs_written, - NBuffers, + LocalCurrentNBuffers, CheckpointStats.ckpt_segs_added, CheckpointStats.ckpt_segs_removed, CheckpointStats.ckpt_segs_recycled); diff --git a/src/backend/storage/buffer/buf_resize.c b/src/backend/storage/buffer/buf_resize.c index 836811356725b..f889a4d4c9e31 100644 --- a/src/backend/storage/buffer/buf_resize.c +++ b/src/backend/storage/buffer/buf_resize.c @@ -58,7 +58,7 @@ MarkBufferResizingEnd(int newNBuffers) /* * TODO: should we leave targetNBuffers as is? We are setting it to - * NBuffers in BufferManagerShmemInit(). + * the shared_buffers GUC value in BufferManagerShmemInit(). */ pg_atomic_write_u32(&ShmemCtrl->targetNBuffers, 0); ShmemCtrl->coordinator = -1; @@ -84,6 +84,9 @@ SharedBufferResizeBarrier(ProcSignalBarrierType barrier, const char *barrier_nam case PROCSIGNAL_BARRIER_SHBUF_SHRINK: INJECTION_POINT("pgrsb-shrink-barrier-sent", NULL); break; + case PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK: + INJECTION_POINT("pgrsb-shmem-resize-shrink-barrier-sent", NULL); + break; case PROCSIGNAL_BARRIER_SHBUF_RESIZE_MAP_AND_MEM: INJECTION_POINT("pgrsb-resize-barrier-sent", NULL); break; @@ -211,15 +214,20 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) if (targetNBuffers < currentNBuffers) { /* - * Phase 1: Shrinking - send SHBUF_SHRINK barrier Every backend sets - * activeNBuffers = NewNBuffers to restrict buffer pool allocations to - * the new size + * Phase 1: Shrinking - send SHBUF_SHRINK barrier. Backends update + * LocalActiveNBuffers to restrict buffer pool allocations to the + * new size. */ elog(LOG, "Phase 1: Shrinking buffer pool, restricting allocations to %d buffers", targetNBuffers); StrategyReset(targetNBuffers); SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_SHRINK, CppAsString(PROCSIGNAL_BARRIER_SHBUF_SHRINK)); + /* Coordinator updates its own shadow */ + elog(LOG, "coordinator %d: LocalActiveNBuffers %d -> %d (shrink)", + MyProcPid, LocalActiveNBuffers, targetNBuffers); + LocalActiveNBuffers = targetNBuffers; + /* Evict buffers in the area being shrunk */ elog(LOG, "evicting buffers %u..%u", targetNBuffers + 1, currentNBuffers); if (!EvictExtraBuffers(targetNBuffers, currentNBuffers)) @@ -227,6 +235,12 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) elog(WARNING, "failed to evict extra buffers during shrinking"); StrategyReset(currentNBuffers); SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_RESIZE_FAILED, CppAsString(PROCSIGNAL_BARRIER_SHBUF_RESIZE_FAILED)); + + /* Restore coordinator shadows */ + elog(LOG, "coordinator %d: LocalActiveNBuffers %d -> %d (shrink failed, restoring)", + MyProcPid, LocalActiveNBuffers, currentNBuffers); + LocalActiveNBuffers = currentNBuffers; + MarkBufferResizingEnd(currentNBuffers); pg_atomic_clear_flag(&ShmemCtrl->resize_in_progress); PG_RETURN_BOOL(false); @@ -240,6 +254,19 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) /* Update the current NBuffers. */ pg_atomic_write_u32(&ShmemCtrl->currentNBuffers, targetNBuffers); + + /* + * Emit SHMEM_RESIZE barrier so backends validate the resized + * structures and update LocalCurrentNBuffers after the shared + * currentNBuffers changes. + */ + SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK, CppAsString(PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK)); + + /* Coordinator updates its own shadow */ + elog(LOG, "coordinator %d: LocalCurrentNBuffers %d -> %d (shrink shmem resize)", + MyProcPid, LocalCurrentNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; + } /* Phase 2: SHBUF_RESIZE_MAP_AND_MEM - Both expanding and shrinking */ @@ -276,9 +303,17 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) * expanded range */ elog(LOG, "Phase 3: Expanding buffer pool, enabling allocations up to %d buffers", targetNBuffers); + + /* Update shared values first, then coordinator's shadows */ StrategyReset(targetNBuffers); pg_atomic_write_u32(&ShmemCtrl->currentNBuffers, targetNBuffers); + /* Coordinator updates its own shadows */ + elog(LOG, "coordinator %d: LocalCurrentNBuffers %d -> %d, LocalActiveNBuffers %d -> %d (expand)", + MyProcPid, LocalCurrentNBuffers, targetNBuffers, LocalActiveNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; + LocalActiveNBuffers = targetNBuffers; + SharedBufferResizeBarrier(PROCSIGNAL_BARRIER_SHBUF_EXPAND, CppAsString(PROCSIGNAL_BARRIER_SHBUF_EXPAND)); } @@ -289,6 +324,10 @@ pg_resize_shared_buffers(PG_FUNCTION_ARGS) pg_atomic_clear_flag(&ShmemCtrl->resize_in_progress); + /* Final consistency check */ + Assert(LocalCurrentNBuffers == targetNBuffers); + Assert(LocalActiveNBuffers == targetNBuffers); + elog(LOG, "successfully resized shared buffers to %d", targetNBuffers); PG_RETURN_BOOL(result); @@ -316,6 +355,36 @@ ProcessBarrierShmemShrink(void) elog(LOG, "Phase 1: Processing SHBUF_SHRINK barrier - target buffer pool size = %d, coordinator is %d", targetNBuffers, ShmemCtrl->coordinator); + /* Update per-process shadow to reflect the restricted allocation range */ + elog(DEBUG1, "backend %d acknowledged SHBUF_SHRINK: LocalActiveNBuffers %d -> %d", + MyProcPid, LocalActiveNBuffers, targetNBuffers); + LocalActiveNBuffers = targetNBuffers; + + return true; +} + +bool +ProcessBarrierShmemResizeShrink(void) +{ + int targetNBuffers = pg_atomic_read_u32(&ShmemCtrl->targetNBuffers); + + Assert(!pg_atomic_unlocked_test_flag(&ShmemCtrl->resize_in_progress)); + + /* + * Delay adjusting the new active size of buffer pool till this process + * becomes ready to resize buffers. + */ + if (delay_shmem_resize) + { + elog(LOG, "delaying SHMEM_RESIZE_SHRINK barrier acknowledgment for %d buffers, coordinator is %d", + targetNBuffers, ShmemCtrl->coordinator); + return false; + } + + elog(DEBUG1, "backend %d acknowledged SHMEM_RESIZE_SHRINK: LocalCurrentNBuffers %d -> %d", + MyProcPid, LocalCurrentNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; + return true; } @@ -389,6 +458,12 @@ ProcessBarrierShmemExpand(void) elog(LOG, "Phase 3: Processing SHBUF_EXPAND barrier - targetNBuffers = %d, ShmemCtrl->coordinator = %d", targetNBuffers, ShmemCtrl->coordinator); + /* Update the per-process shadow copies of the current and active buffer counts */ + elog(DEBUG1, "backend %d acknowledged SHBUF_EXPAND: LocalCurrentNBuffers %d -> %d, LocalActiveNBuffers %d -> %d", + MyProcPid, LocalCurrentNBuffers, targetNBuffers, LocalActiveNBuffers, targetNBuffers); + LocalCurrentNBuffers = targetNBuffers; + LocalActiveNBuffers = targetNBuffers; + return true; } @@ -403,6 +478,14 @@ ProcessBarrierShmemResizeFailed(void) elog(LOG, "received proc signal indicating failure to resize shared buffers from %d to %d, restoring to %d, coordinator is %d", currentNBuffers, targetNBuffers, currentNBuffers, ShmemCtrl->coordinator); + /* + * Update per-process shadow to reflect that the resize failed and we are + * restoring the active buffer pool size to the previous value. + */ + elog(DEBUG1, "backend %d acknowledged RESIZE_FAILED: LocalActiveNBuffers %d -> %d", + MyProcPid, LocalActiveNBuffers, currentNBuffers); + LocalActiveNBuffers = currentNBuffers; + return true; } diff --git a/src/backend/storage/buffer/bufmgr.c b/src/backend/storage/buffer/bufmgr.c index d39e65ef811d0..f9b0a4b5dcaa0 100644 --- a/src/backend/storage/buffer/bufmgr.c +++ b/src/backend/storage/buffer/bufmgr.c @@ -91,7 +91,7 @@ * being dropped. For the relations with size below this threshold, we find * the buffers by doing lookups in BufMapping table. */ -#define BUF_DROP_FULL_SCAN_THRESHOLD (uint64) (NBuffers / 32) +#define BUF_DROP_FULL_SCAN_THRESHOLD (uint64) (LocalCurrentNBuffers / 32) /* * This is separated out from PrivateRefCountEntry to allow for copying all @@ -3496,7 +3496,7 @@ BufferSync(int flags) * certainly need to be written for the next checkpoint attempt, too. */ num_to_scan = 0; - for (buf_id = 0; buf_id < NBuffers; buf_id++) + for (buf_id = 0; buf_id < LocalCurrentNBuffers; buf_id++) { BufferDesc *bufHdr = GetBufferDescriptor(buf_id); uint64 set_bits = 0; @@ -3538,7 +3538,7 @@ BufferSync(int flags) WritebackContextInit(&wb_context, &checkpoint_flush_after); - TRACE_POSTGRESQL_BUFFER_SYNC_START(NBuffers, num_to_scan); + TRACE_POSTGRESQL_BUFFER_SYNC_START(LocalCurrentNBuffers, num_to_scan); /* * Sort buffers that need to be written to reduce the likelihood of random @@ -3722,7 +3722,7 @@ BufferSync(int flags) */ CheckpointStats.ckpt_bufs_written += num_written; - TRACE_POSTGRESQL_BUFFER_SYNC_DONE(NBuffers, num_written, num_to_scan); + TRACE_POSTGRESQL_BUFFER_SYNC_DONE(LocalCurrentNBuffers, num_written, num_to_scan); } #define BGW_DEBUG 1 @@ -4156,6 +4156,21 @@ InitBufferManagerAccess(void) { HASHCTL hash_ctl; + /* + * Initialize per-process shadow copies of the shared buffer pool + * dimensions. These are kept in sync with shared memory values during + * buffer pool resize operations via barrier processing. + */ + LocalCurrentNBuffers = pg_atomic_read_u32(&ShmemCtrl->currentNBuffers); + LocalActiveNBuffers = StrategyGetActiveNBuffers(); + + elog(DEBUG1, "InitBufferManagerAccess: backend %d LocalCurrentNBuffers = %d, LocalActiveNBuffers = %d", + MyProcPid, LocalCurrentNBuffers, LocalActiveNBuffers); + + Assert(LocalCurrentNBuffers > 0); + Assert(LocalActiveNBuffers > 0); + Assert(LocalActiveNBuffers <= LocalCurrentNBuffers); + /* * An advisory limit on the number of pins each backend should hold, based * on shared_buffers and the maximum number of connections possible. @@ -4163,7 +4178,7 @@ InitBufferManagerAccess(void) * allow plenty of pins. LimitAdditionalPins() and * GetAdditionalPinLimit() can be used to check the remaining balance. */ - MaxProportionalPins = NBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); + MaxProportionalPins = LocalCurrentNBuffers / (MaxBackends + NUM_AUXILIARY_PROCS); memset(&PrivateRefCountArray, 0, sizeof(PrivateRefCountArray)); memset(&PrivateRefCountArrayKeys, 0, sizeof(PrivateRefCountArrayKeys)); @@ -4805,7 +4820,7 @@ DropRelationBuffers(SMgrRelation smgr_reln, ForkNumber *forkNum, return; } - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -4966,7 +4981,7 @@ DropRelationsAllBuffers(SMgrRelation *smgr_reln, int nlocators) if (use_bsearch) qsort(locators, n, sizeof(RelFileLocator), rlocator_comparator); - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { RelFileLocator *rlocator = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5093,7 +5108,7 @@ DropDatabaseBuffers(Oid dbid) * database isn't our own. */ - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5179,7 +5194,7 @@ FlushRelationBuffers(Relation rel) return; } - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { uint64 buf_state; @@ -5249,7 +5264,7 @@ FlushRelationsAllBuffers(SMgrRelation *smgrs, int nrels) if (use_bsearch) qsort(srels, nrels, sizeof(SMgrSortArray), rlocator_comparator); - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { SMgrSortArray *srelent = NULL; BufferDesc *bufHdr = GetBufferDescriptor(i); @@ -5500,7 +5515,7 @@ FlushDatabaseBuffers(Oid dbid) int i; BufferDesc *bufHdr; - for (i = 0; i < NBuffers; i++) + for (i = 0; i < LocalCurrentNBuffers; i++) { uint64 buf_state; @@ -7622,7 +7637,7 @@ EvictAllUnpinnedBuffers(int32 *buffers_evicted, int32 *buffers_flushed, *buffers_skipped = 0; *buffers_flushed = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state; @@ -7674,7 +7689,7 @@ EvictRelUnpinnedBuffers(Relation rel, int32 *buffers_evicted, *buffers_evicted = 0; *buffers_flushed = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state = pg_atomic_read_u64(&(desc->state)); @@ -7819,7 +7834,7 @@ MarkDirtyRelUnpinnedBuffers(Relation rel, *buffers_already_dirty = 0; *buffers_skipped = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state = pg_atomic_read_u64(&(desc->state)); @@ -7873,7 +7888,7 @@ MarkDirtyAllUnpinnedBuffers(int32 *buffers_dirtied, *buffers_already_dirty = 0; *buffers_skipped = 0; - for (int buf = 1; buf <= NBuffers; buf++) + for (int buf = 1; buf <= LocalCurrentNBuffers; buf++) { BufferDesc *desc = GetBufferDescriptor(buf - 1); uint64 buf_state; diff --git a/src/backend/storage/buffer/freelist.c b/src/backend/storage/buffer/freelist.c index 0aae2c08bc41d..2f950cc6bb705 100644 --- a/src/backend/storage/buffer/freelist.c +++ b/src/backend/storage/buffer/freelist.c @@ -114,9 +114,10 @@ ClockSweepTick(void) /* * Atomically move hand ahead one buffer - if there's several processes * doing this, this can lead to buffers being returned slightly out of - * apparent order. We need to read both the current position of hand and - * the current buffer allocation limit together consistently. They may be - * reset by concurrent resize. + * apparent order. We read the shared activeNBuffers here (not the + * per-process LocalActiveNBuffers) because all backends participating + * in the CAS loop on nextVictimBuffer must use the same modulus to + * avoid inconsistent wrap-around. */ victim = pg_atomic_fetch_add_u32(&StrategyControl->nextVictimBuffer, 1); @@ -239,7 +240,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r pg_atomic_fetch_add_u32(&StrategyControl->numBufferAllocs, 1); /* Use the "clock sweep" algorithm to find a free buffer */ - trycounter = pg_atomic_read_u32(&StrategyControl->activeNBuffers); + trycounter = LocalActiveNBuffers; for (;;) { @@ -293,7 +294,7 @@ StrategyGetBuffer(BufferAccessStrategy strategy, uint64 *buf_state, bool *from_r if (pg_atomic_compare_exchange_u64(&buf->state, &old_buf_state, local_buf_state)) { - trycounter = pg_atomic_read_u32(&StrategyControl->activeNBuffers); + trycounter = LocalActiveNBuffers; break; } } @@ -450,6 +451,17 @@ StrategyReset(int activeNBuffers) SpinLockRelease(&StrategyControl->buffer_strategy_lock); } +/* + * StrategyGetActiveNBuffers -- return the current active buffer count. + * + * This is a simple accessor since StrategyControl is private to this file. + */ +int +StrategyGetActiveNBuffers(void) +{ + return (int) pg_atomic_read_u32(&StrategyControl->activeNBuffers); +} + /* * StrategyInitialize -- initialize the buffer cache replacement * strategy. @@ -629,9 +641,9 @@ GetAccessStrategyWithSize(BufferAccessStrategyType btype, int ring_size_kb) return NULL; /* Cap to 1/8th of shared_buffers */ - ring_buffers = Min(NBuffers / 8, ring_buffers); + ring_buffers = Min(LocalCurrentNBuffers / 8, ring_buffers); - /* NBuffers should never be less than 16, so this shouldn't happen */ + /* LocalCurrentNBuffers should never be less than 16, so this shouldn't happen */ Assert(ring_buffers > 0); /* Allocate the object and initialize all elements to zeroes */ @@ -680,7 +692,7 @@ int GetAccessStrategyPinLimit(BufferAccessStrategy strategy) { if (strategy == NULL) - return NBuffers; + return LocalCurrentNBuffers; switch (strategy->btype) { @@ -755,9 +767,18 @@ GetBufferFromRing(BufferAccessStrategy strategy, uint64 *buf_state) * possible to notice when we touch the first of those objects and the * last of objects. See if this can fixed. */ + /* + * During a shrink, a ring buffer might hold a buffer ID from the old + * (larger) range. Using LocalActiveNBuffers (updated via barrier) is + * safe here: if this backend hasn't acknowledged SHBUF_SHRINK yet, + * LocalActiveNBuffers is still the old size and the stale entry passes; + * the buffer descriptor is still valid since LocalCurrentNBuffers is + * also still old. Once the barrier is acknowledged, LocalActiveNBuffers + * reflects the new size and stale entries are correctly rejected. + */ bufnum = strategy->buffers[strategy->current]; if (bufnum == InvalidBuffer || - bufnum > pg_atomic_read_u32(&StrategyControl->activeNBuffers)) + bufnum > (uint32) LocalActiveNBuffers) return NULL; buf = GetBufferDescriptor(bufnum - 1); diff --git a/src/backend/storage/ipc/procsignal.c b/src/backend/storage/ipc/procsignal.c index df9bb7a0f1a91..eec0666227bf3 100644 --- a/src/backend/storage/ipc/procsignal.c +++ b/src/backend/storage/ipc/procsignal.c @@ -626,6 +626,9 @@ ProcessProcSignalBarrier(void) case PROCSIGNAL_BARRIER_SHBUF_SHRINK: processed = ProcessBarrierShmemShrink(); break; + case PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK: + processed = ProcessBarrierShmemResizeShrink(); + break; case PROCSIGNAL_BARRIER_SHBUF_RESIZE_MAP_AND_MEM: processed = ProcessBarrierShmemResizeMapAndMem(); break; diff --git a/src/backend/utils/init/globals.c b/src/backend/utils/init/globals.c index be9d3909c0f5e..83d36989adeb4 100644 --- a/src/backend/utils/init/globals.c +++ b/src/backend/utils/init/globals.c @@ -143,6 +143,16 @@ int NBuffers = 0; int NBuffersPending = 16384; bool finalMaxNBuffers = false; int MaxNBuffers = 0; + +/* + * Per-process shadow copies of shared buffer pool dimensions, kept in sync + * with the shared memory values during buffer pool resize operations via + * barrier processing. Initialized at backend startup in + * InitBufferManagerAccess(). + */ +int LocalCurrentNBuffers = 0; +int LocalActiveNBuffers = 0; + int MaxConnections = 100; int max_worker_processes = 8; int max_parallel_workers = 8; diff --git a/src/include/storage/buf_internals.h b/src/include/storage/buf_internals.h index c3daf4e2b3b25..1516661de0058 100644 --- a/src/include/storage/buf_internals.h +++ b/src/include/storage/buf_internals.h @@ -576,6 +576,7 @@ extern void StrategyNotifyBgWriter(int bgwprocno); extern Size StrategyShmemSize(void); extern void StrategyInitialize(bool init); extern void StrategyReset(int activeNBuffers); +extern int StrategyGetActiveNBuffers(void); /* buf_table.c */ extern Size BufTableShmemSize(int size); diff --git a/src/include/storage/bufmgr.h b/src/include/storage/bufmgr.h index 9ced25c77e366..74f74acab66b9 100644 --- a/src/include/storage/bufmgr.h +++ b/src/include/storage/bufmgr.h @@ -162,6 +162,13 @@ typedef struct WritebackContext WritebackContext; extern PGDLLIMPORT int NBuffers; extern PGDLLIMPORT int NBuffersPending; +/* + * Per-process shadow copies of shared buffer pool dimensions. + * Updated during buffer pool resize via barrier handlers. + */ +extern PGDLLIMPORT int LocalCurrentNBuffers; +extern PGDLLIMPORT int LocalActiveNBuffers; + /* in bufmgr.c */ extern PGDLLIMPORT bool zero_damaged_pages; extern PGDLLIMPORT int bgwriter_lru_maxpages; @@ -427,7 +434,7 @@ extern void FreeAccessStrategy(BufferAccessStrategy strategy); static inline bool BufferIsValid(Buffer bufnum) { - Assert(bufnum <= (Buffer) pg_atomic_read_u32(&ShmemCtrl->currentNBuffers)); + Assert(bufnum <= (Buffer) LocalCurrentNBuffers); Assert(bufnum >= -NLocBuffer); return bufnum != InvalidBuffer; @@ -484,6 +491,7 @@ BufferGetPage(Buffer buffer) /* buf_resize.c */ extern Datum pg_resize_shared_buffers(PG_FUNCTION_ARGS); extern bool ProcessBarrierShmemShrink(void); +extern bool ProcessBarrierShmemResizeShrink(void); extern bool ProcessBarrierShmemResizeMapAndMem(void); extern bool ProcessBarrierShmemExpand(void); extern bool ProcessBarrierShmemResizeFailed(void); diff --git a/src/include/storage/procsignal.h b/src/include/storage/procsignal.h index 9dc9819a72bc9..63a278c4b0823 100644 --- a/src/include/storage/procsignal.h +++ b/src/include/storage/procsignal.h @@ -58,6 +58,9 @@ typedef enum * XLogLogicalInfo */ PROCSIGNAL_BARRIER_SHBUF_SHRINK, /* shrink buffer pool - restrict * allocations to new size */ + PROCSIGNAL_BARRIER_SHBUF_SHMEM_RESIZE_SHRINK, /* buffer structures resized - + * validate and update + * LocalCurrentNBuffers */ PROCSIGNAL_BARRIER_SHBUF_RESIZE_MAP_AND_MEM, /* remap shared memory * segments and update * structure pointers */