Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
285 changes: 98 additions & 187 deletions contrib/pg_buffercache/pg_buffercache_pages.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "port/pg_numa.h"
#include "storage/buf_internals.h"
#include "storage/bufmgr.h"
#include "utils/injection_point.h"
#include "utils/rel.h"
#include "utils/tuplestore.h"

Expand All @@ -37,39 +38,6 @@ PG_MODULE_MAGIC_EXT(
.version = PG_VERSION
);

/*
* Record structure holding the to be exposed cache data.
*/
typedef struct
{
uint32 bufferid;
RelFileNumber relfilenumber;
Oid reltablespace;
Oid reldatabase;
ForkNumber forknum;
BlockNumber blocknum;
bool isvalid;
bool isdirty;
uint16 usagecount;

/*
* An int32 is sufficiently large, as MAX_BACKENDS prevents a buffer from
* being pinned by too many backends and each backend will only pin once
* because of bufmgr.c's PrivateRefCount infrastructure.
*/
int32 pinning_backends;
} BufferCachePagesRec;


/*
* Function context for data persisting over repeated calls.
*/
typedef struct
{
TupleDesc tupdesc;
BufferCachePagesRec *record;
} BufferCachePagesContext;

/*
* Record structure holding the to be exposed cache data for OS pages. This
* structure is used by pg_buffercache_os_pages(), where NUMA information may
Expand Down Expand Up @@ -118,153 +86,93 @@ static bool firstNumaTouch = true;
Datum
pg_buffercache_pages(PG_FUNCTION_ARGS)
{
FuncCallContext *funcctx;
Datum result;
MemoryContext oldcontext;
BufferCachePagesContext *fctx; /* User function context. */
TupleDesc tupledesc;
ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
TupleDesc expected_tupledesc;
HeapTuple tuple;
int currentNBuffers = pg_atomic_read_u32(&ShmemCtrl->currentNBuffers);
int i;

if (SRF_IS_FIRSTCALL())
{
int i;

funcctx = SRF_FIRSTCALL_INIT();

/* Switch context when allocating stuff to be used in later calls */
oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);

/* Create a user function context for cross-call persistence */
fctx = palloc_object(BufferCachePagesContext);

/*
* To smoothly support upgrades from version 1.0 of this extension
* transparently handle the (non-)existence of the pinning_backends
* column. We unfortunately have to get the result type for that... -
* we can't use the result type determined by the function definition
* without potentially crashing when somebody uses the old (or even
* wrong) function definition though.
*/
if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");

if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM ||
expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM)
elog(ERROR, "incorrect number of output arguments");

/* Construct a tuple descriptor for the result rows. */
tupledesc = CreateTemplateTupleDesc(expected_tupledesc->natts);
TupleDescInitEntry(tupledesc, (AttrNumber) 1, "bufferid",
INT4OID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 2, "relfilenode",
OIDOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 3, "reltablespace",
OIDOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 4, "reldatabase",
OIDOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 5, "relforknumber",
INT2OID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 6, "relblocknumber",
INT8OID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 7, "isdirty",
BOOLOID, -1, 0);
TupleDescInitEntry(tupledesc, (AttrNumber) 8, "usage_count",
INT2OID, -1, 0);

if (expected_tupledesc->natts == NUM_BUFFERCACHE_PAGES_ELEM)
TupleDescInitEntry(tupledesc, (AttrNumber) 9, "pinning_backends",
INT4OID, -1, 0);

fctx->tupdesc = BlessTupleDesc(tupledesc);

/* Allocate NBuffers worth of BufferCachePagesRec records. */
fctx->record = (BufferCachePagesRec *)
MemoryContextAllocHuge(CurrentMemoryContext,
sizeof(BufferCachePagesRec) * currentNBuffers);

/* Set max calls and remember the user function context. */
funcctx->max_calls = currentNBuffers;
funcctx->user_fctx = fctx;

/* Return to original context when allocating transient memory */
MemoryContextSwitchTo(oldcontext);

/*
* Scan through all the buffers, saving the relevant fields in the
* fctx->record structure.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header
* locks, so the information of each buffer is self-consistent.
*/
for (i = 0; i < currentNBuffers; i++)
{
BufferDesc *bufHdr;
uint64 buf_state;

CHECK_FOR_INTERRUPTS();

/*
* TODO: We should just scan the entire buffer descriptor array
* instead of relying on curent buffer pool size. But that can
* happen if only we setup the descriptor array large enough at
* the server startup time.
*/
if (currentNBuffers != pg_atomic_read_u32(&ShmemCtrl->currentNBuffers))
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("number of shared buffers changed during scan of buffer cache")));

bufHdr = GetBufferDescriptor(i);
/* Lock each buffer header before inspecting. */
buf_state = LockBufHdr(bufHdr);
/*
* To smoothly support upgrades from version 1.0 of this extension
* transparently handle the (non-)existence of the pinning_backends
* column. We unfortunately have to get the result type for that... - we
* can't use the result type determined by the function definition without
* potentially crashing when somebody uses the old (or even wrong)
* function definition though.
*/
if (get_call_result_type(fcinfo, NULL, &expected_tupledesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");

fctx->record[i].bufferid = BufferDescriptorGetBuffer(bufHdr);
fctx->record[i].relfilenumber = BufTagGetRelNumber(&bufHdr->tag);
fctx->record[i].reltablespace = bufHdr->tag.spcOid;
fctx->record[i].reldatabase = bufHdr->tag.dbOid;
fctx->record[i].forknum = BufTagGetForkNum(&bufHdr->tag);
fctx->record[i].blocknum = bufHdr->tag.blockNum;
fctx->record[i].usagecount = BUF_STATE_GET_USAGECOUNT(buf_state);
fctx->record[i].pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state);
if (expected_tupledesc->natts < NUM_BUFFERCACHE_PAGES_MIN_ELEM ||
expected_tupledesc->natts > NUM_BUFFERCACHE_PAGES_ELEM)
elog(ERROR, "incorrect number of output arguments");

if (buf_state & BM_DIRTY)
fctx->record[i].isdirty = true;
else
fctx->record[i].isdirty = false;
InitMaterializedSRF(fcinfo, 0);

/* Note if the buffer is valid, and has storage created */
if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID))
fctx->record[i].isvalid = true;
else
fctx->record[i].isvalid = false;
/*
* Scan through all the buffers, adding one row for each of the buffers to
* the tuplestore.
*
* We don't hold the partition locks, so we don't get a consistent
* snapshot across all buffers, but we do grab the buffer header locks, so
* the information of each buffer is self-consistent.
*
* Use LocalCurrentNBuffers so the scan range tracks the current buffer
* pool size. If a resize barrier is processed mid-scan (via
* CHECK_FOR_INTERRUPTS or an injection point wait), LocalCurrentNBuffers
* changes and we detect it.
*/
INJECTION_POINT("pg-buffercache-scan-start", NULL);
for (i = 0; i < LocalCurrentNBuffers; i++)
{
BufferDesc *bufHdr;
uint64 buf_state;
uint32 bufferid;
RelFileNumber relfilenumber;
Oid reltablespace;
Oid reldatabase;
ForkNumber forknum;
BlockNumber blocknum;
bool isvalid;
bool isdirty;
uint16 usagecount;
int32 pinning_backends;
Datum values[NUM_BUFFERCACHE_PAGES_ELEM];
bool nulls[NUM_BUFFERCACHE_PAGES_ELEM];

UnlockBufHdr(bufHdr);
}
}
bufHdr = GetBufferDescriptor(i);
/* Lock each buffer header before inspecting. */
buf_state = LockBufHdr(bufHdr);

bufferid = BufferDescriptorGetBuffer(bufHdr);
relfilenumber = BufTagGetRelNumber(&bufHdr->tag);
reltablespace = bufHdr->tag.spcOid;
reldatabase = bufHdr->tag.dbOid;
forknum = BufTagGetForkNum(&bufHdr->tag);
blocknum = bufHdr->tag.blockNum;
usagecount = BUF_STATE_GET_USAGECOUNT(buf_state);
pinning_backends = BUF_STATE_GET_REFCOUNT(buf_state);

funcctx = SRF_PERCALL_SETUP();
if (buf_state & BM_DIRTY)
isdirty = true;
else
isdirty = false;

/* Get the saved state */
fctx = funcctx->user_fctx;
/* Note if the buffer is valid, and has storage created */
if ((buf_state & BM_VALID) && (buf_state & BM_TAG_VALID))
isvalid = true;
else
isvalid = false;

if (funcctx->call_cntr < funcctx->max_calls)
{
uint32 i = funcctx->call_cntr;
Datum values[NUM_BUFFERCACHE_PAGES_ELEM];
bool nulls[NUM_BUFFERCACHE_PAGES_ELEM];
UnlockBufHdr(bufHdr);

values[0] = Int32GetDatum(fctx->record[i].bufferid);
/* Build the tuple and add it to tuplestore */
values[0] = Int32GetDatum(bufferid);
nulls[0] = false;

/*
* Set all fields except the bufferid to null if the buffer is unused
* or not valid.
*/
if (fctx->record[i].blocknum == InvalidBlockNumber ||
fctx->record[i].isvalid == false)
if (blocknum == InvalidBlockNumber || isvalid == false)
{
nulls[1] = true;
nulls[2] = true;
Expand All @@ -278,33 +186,36 @@ pg_buffercache_pages(PG_FUNCTION_ARGS)
}
else
{
values[1] = ObjectIdGetDatum(fctx->record[i].relfilenumber);
values[1] = ObjectIdGetDatum(relfilenumber);
nulls[1] = false;
values[2] = ObjectIdGetDatum(fctx->record[i].reltablespace);
values[2] = ObjectIdGetDatum(reltablespace);
nulls[2] = false;
values[3] = ObjectIdGetDatum(fctx->record[i].reldatabase);
values[3] = ObjectIdGetDatum(reldatabase);
nulls[3] = false;
values[4] = Int16GetDatum(fctx->record[i].forknum);
values[4] = Int16GetDatum(forknum);
nulls[4] = false;
values[5] = Int64GetDatum((int64) fctx->record[i].blocknum);
values[5] = Int64GetDatum((int64) blocknum);
nulls[5] = false;
values[6] = BoolGetDatum(fctx->record[i].isdirty);
values[6] = BoolGetDatum(isdirty);
nulls[6] = false;
values[7] = UInt16GetDatum(fctx->record[i].usagecount);
values[7] = UInt16GetDatum(usagecount);
nulls[7] = false;
/* unused for v1.0 callers, but the array is always long enough */
values[8] = Int32GetDatum(fctx->record[i].pinning_backends);
values[8] = Int32GetDatum(pinning_backends);
nulls[8] = false;
}

/* Build and return the tuple. */
tuple = heap_form_tuple(fctx->tupdesc, values, nulls);
result = HeapTupleGetDatum(tuple);
tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);

SRF_RETURN_NEXT(funcctx, result);
/*
* Allow interrupts so that resize barriers can be processed
* mid-scan, updating LocalCurrentNBuffers.
*/
CHECK_FOR_INTERRUPTS();
INJECTION_POINT("pg-buffercache-scan-loop", NULL);
}
else
SRF_RETURN_DONE(funcctx);

return (Datum) 0;
}

/*
Expand Down Expand Up @@ -392,7 +303,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa)
startptr = (char *) TYPEALIGN_DOWN(os_page_size,
BufferGetBlock(1));
endptr = (char *) TYPEALIGN(os_page_size,
(char *) BufferGetBlock(NBuffers) + BLCKSZ);
(char *) BufferGetBlock(LocalCurrentNBuffers) + BLCKSZ);
os_page_count = (endptr - startptr) / os_page_size;

/* Used to determine the NUMA node for all OS pages at once */
Expand All @@ -417,8 +328,8 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa)

Assert(idx == os_page_count);

elog(DEBUG1, "NUMA: NBuffers=%d os_page_count=" UINT64_FORMAT " "
"os_page_size=%zu", NBuffers, os_page_count, os_page_size);
elog(DEBUG1, "NUMA: LocalCurrentNBuffers=%d os_page_count=" UINT64_FORMAT " "
"os_page_size=%zu", LocalCurrentNBuffers, os_page_count, os_page_size);

/*
* If we ever get 0xff back from kernel inquiry, then we probably
Expand Down Expand Up @@ -467,7 +378,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa)
* without reallocating memory.
*/
pages_per_buffer = Max(1, BLCKSZ / os_page_size) + 1;
max_entries = NBuffers * pages_per_buffer;
max_entries = LocalCurrentNBuffers * pages_per_buffer;

/* Allocate entries for BufferCacheOsPagesRec records. */
fctx->record = (BufferCacheOsPagesRec *)
Expand All @@ -490,7 +401,7 @@ pg_buffercache_os_pages_internal(FunctionCallInfo fcinfo, bool include_numa)
*/
startptr = (char *) TYPEALIGN_DOWN(os_page_size, (char *) BufferGetBlock(1));
idx = 0;
for (i = 0; i < NBuffers; i++)
for (i = 0; i < LocalCurrentNBuffers; i++)
{
char *buffptr = (char *) BufferGetBlock(i + 1);
BufferDesc *bufHdr;
Expand Down Expand Up @@ -636,7 +547,7 @@ pg_buffercache_summary(PG_FUNCTION_ARGS)
if (get_call_result_type(fcinfo, NULL, &tupledesc) != TYPEFUNC_COMPOSITE)
elog(ERROR, "return type must be a row type");

for (int i = 0; i < NBuffers; i++)
for (int i = 0; i < LocalCurrentNBuffers; i++)
{
BufferDesc *bufHdr;
uint64 buf_state;
Expand Down Expand Up @@ -697,7 +608,7 @@ pg_buffercache_usage_counts(PG_FUNCTION_ARGS)

InitMaterializedSRF(fcinfo, 0);

for (int i = 0; i < NBuffers; i++)
for (int i = 0; i < LocalCurrentNBuffers; i++)
{
BufferDesc *bufHdr = GetBufferDescriptor(i);
uint64 buf_state = pg_atomic_read_u64(&bufHdr->state);
Expand Down Expand Up @@ -761,7 +672,7 @@ pg_buffercache_evict(PG_FUNCTION_ARGS)

pg_buffercache_superuser_check("pg_buffercache_evict");

if (buf < 1 || buf > NBuffers)
if (buf < 1 || buf > LocalCurrentNBuffers)
elog(ERROR, "bad buffer ID: %d", buf);

values[0] = BoolGetDatum(EvictUnpinnedBuffer(buf, &buffer_flushed));
Expand Down Expand Up @@ -878,7 +789,7 @@ pg_buffercache_mark_dirty(PG_FUNCTION_ARGS)

pg_buffercache_superuser_check("pg_buffercache_mark_dirty");

if (buf < 1 || buf > NBuffers)
if (buf < 1 || buf > LocalCurrentNBuffers)
elog(ERROR, "bad buffer ID: %d", buf);

values[0] = BoolGetDatum(MarkDirtyUnpinnedBuffer(buf, &buffer_already_dirty));
Expand Down
Loading