Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions src/backend/access/transam/xlogrecovery.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,24 @@ const char *recoveryTargetName;
XLogRecPtr recoveryTargetLSN;
int recovery_min_apply_delay = 0;

/*
* If true, when WAL replay on a standby is about to invalidate an otherwise-
* active logical replication slot because a catalog PRUNE_ON_ACCESS record's
* snapshotConflictHorizon has overtaken the slot's catalog_xmin, pause replay
* instead. Replay auto-resumes once the consumer has drained the slot past
* the pause point (or the slot is dropped, advanced, or otherwise no longer
* blocking); pg_wal_replay_resume() also forces continuation. See
* MaybePauseOnLogicalSlotConflict() in standby.c.
*
* Motivated by blueprints/LOGICAL_DECODING_ARCHIVED_WALS.md §4.2.3 / US-4:
* an archive-only logical-decoding standby cannot feed hot_standby_feedback
* to the primary, so it has no natural way to keep the primary's catalog
* horizon pinned. Without this GUC, any logical slot created on such a
* standby is invalidated the first time replay applies a catalog vacuum
* record whose horizon exceeds the slot's catalog_xmin.
*/
bool recovery_pause_on_logical_slot_conflict = false;

/* options formerly taken from recovery.conf for XLOG streaming */
char *PrimaryConnInfo = NULL;
char *PrimarySlotName = NULL;
Expand Down Expand Up @@ -363,7 +381,8 @@ static bool recoveryStopsAfter(XLogReaderState *record);
static char *getRecoveryStopReason(void);
static void recoveryPausesHere(bool endOfRecovery);
static bool recoveryApplyDelay(XLogReaderState *record);
static void ConfirmRecoveryPaused(void);
/* Exposed for the logical-slot-conflict recovery-pause logic in standby.c. */
void ConfirmRecoveryPaused(void);

static XLogRecord *ReadRecord(XLogPrefetcher *xlogprefetcher,
int emode, bool fetching_ckpt,
Expand All @@ -386,7 +405,8 @@ static int XLogFileRead(XLogSegNo segno, TimeLineID tli,
XLogSource source, bool notfoundOk);
static int XLogFileReadAnyTLI(XLogSegNo segno, XLogSource source);

static bool CheckForStandbyTrigger(void);
/* Exposed for the logical-slot-conflict recovery-pause logic in standby.c. */
bool CheckForStandbyTrigger(void);
static void SetPromoteIsTriggered(void);
static bool HotStandbyActiveInReplay(void);

Expand Down Expand Up @@ -3083,7 +3103,7 @@ SetRecoveryPause(bool recoveryPause)
* Confirm the recovery pause by setting the recovery pause state to
* RECOVERY_PAUSED.
*/
static void
void
ConfirmRecoveryPaused(void)
{
/* If recovery pause is requested then set it paused */
Expand Down Expand Up @@ -4438,7 +4458,11 @@ SetPromoteIsTriggered(void)
/*
* Check whether a promote request has arrived.
*/
static bool
/*
* Non-static: MaybePauseOnLogicalSlotConflict needs this to break its wait
* loop on promotion, same as recoveryPausesHere does.
*/
bool
CheckForStandbyTrigger(void)
{
if (LocalPromoteIsTriggered)
Expand Down
285 changes: 285 additions & 0 deletions src/backend/storage/ipc/standby.c
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "pgstat.h"
#include "postmaster/startup.h"
#include "replication/slot.h"
#include "storage/condition_variable.h"
#include "storage/bufmgr.h"
#include "storage/lwlock.h"
#include "storage/proc.h"
#include "storage/procarray.h"
#include "storage/sinvaladt.h"
Expand Down Expand Up @@ -503,8 +506,290 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
* reached, e.g. due to using a physical replication slot.
*/
if (IsLogicalDecodingEnabled() && isCatalogRel)
{
MaybePauseOnLogicalSlotConflict(locator.dbOid, snapshotConflictHorizon);
InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
snapshotConflictHorizon);
}
}

/*
* Returns true if at least one non-synced logical slot in `dboid` still
* blocks replay past snapshotConflictHorizon.
*
* "Blocks" means: the slot is in use, not invalidated, snapbuild-consistent
* (effective_catalog_xmin is valid — skipping in-progress slots avoids a
* deadlock with DecodingContextFindStartpoint), and its catalog_xmin
* precedes-or-equals the horizon.
*
* Use PrecedesOrEquals (not Precedes) to match DetermineSlotInvalidationCause.
* Otherwise a slot whose catalog_xmin was just advanced to exactly horizon by
* a previous pause-and-advance cycle fails to re-pause on the next prune
* record with the same horizon, yet would still be invalidated by the
* fall-through InvalidateObsoleteReplicationSlots call.
*
* Synced slots are skipped: writing their fields from the startup process
* would race the slot-sync worker, and ALTER / DROP_REPLICATION_SLOT errors
* out on a synced slot so the operator-facing recipe does not apply.
*
* When conflict_lsn is valid (in-wait auto-resume check), slots whose
* confirmed_flush_lsn has reached conflict_lsn are treated as not blocking:
* the consumer has caught up to the pause point and the post-wait advance
* code will bump their catalog_xmin past the horizon. Pass InvalidXLogRecPtr
* for the initial pause-or-not decision (we don't yet have a pause point).
*/
static bool
AnySlotStillBlocksConflict(Oid dboid, TransactionId snapshotConflictHorizon,
XLogRecPtr conflict_lsn)
{
int i;
bool blocking = false;

LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
for (i = 0; i < max_replication_slots; i++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
Oid slot_db;
TransactionId slot_xmin;
TransactionId slot_effective_xmin;
XLogRecPtr slot_confirmed;
bool is_candidate;

if (!s->in_use)
continue;

SpinLockAcquire(&s->mutex);
slot_db = s->data.database;
slot_xmin = s->data.catalog_xmin;
slot_effective_xmin = s->effective_catalog_xmin;
slot_confirmed = s->data.confirmed_flush;
is_candidate = (s->data.invalidated == RS_INVAL_NONE &&
slot_db != InvalidOid &&
TransactionIdIsValid(slot_effective_xmin) &&
!s->data.synced);
SpinLockRelease(&s->mutex);

if (!is_candidate)
continue;
if (slot_db != dboid)
continue;
if (!TransactionIdIsValid(slot_xmin))
continue;
if (!TransactionIdPrecedesOrEquals(slot_xmin, snapshotConflictHorizon))
continue;
if (conflict_lsn != InvalidXLogRecPtr &&
slot_confirmed >= conflict_lsn)
continue;

blocking = true;
break;
}
LWLockRelease(ReplicationSlotControlLock);

return blocking;
}

/*
* If recovery_pause_on_logical_slot_conflict is enabled, and replay is about
* to apply a catalog PRUNE_ON_ACCESS record whose snapshotConflictHorizon
* would cause the invalidation of at least one non-invalidated logical slot
* in the same database, request a recovery pause and wait until the conflict
* is resolved.
*
* The wait exits in any of:
* - Auto-resume: a periodic re-scan finds no slot still blocking. Any of
* draining past the pause LSN, dropping the slot, pg_replication_slot_
* advance(), or out-of-band invalidation (e.g. max_slot_wal_keep_size
* applied by the checkpointer, which runs even while startup is paused
* here) will satisfy this. The post-wait advance then bumps catalog_xmin
* on drained slots so the fall-through InvalidateObsoleteReplicationSlots()
* is a no-op.
* - Manual resume: pg_wal_replay_resume() flips the state to NOT_PAUSED.
* Any slot still blocking at that point is invalidated by the
* fall-through — the "give up on this slot" escape hatch.
* - Promote: CheckForStandbyTrigger() consumes PROMOTE_SIGNAL_FILE and we
* return early so the startup process can finish promotion.
*
* The two parameters identify which slots, if any, this prune record can
* conflict with:
* - dboid: logical slots are per-database, so only slots belonging to this
* database can be invalidated by a catalog prune happening here; slots in
* other databases are never affected and must be ignored.
* - snapshotConflictHorizon: the xid threshold carried by the
* PRUNE_ON_ACCESS record. A slot conflicts iff its catalog_xmin
* precedes-or-equals this horizon (i.e. it still needs catalog rows the
* prune is about to remove).
*
* Only invoked from ResolveRecoveryConflictWithSnapshot(), before any buffer
* locks are taken, so pausing here does not deadlock with anything.
*/
void
MaybePauseOnLogicalSlotConflict(Oid dboid, TransactionId snapshotConflictHorizon)
{
XLogRecPtr conflict_lsn;
bool user_requested_pause;

if (!recovery_pause_on_logical_slot_conflict)
return;
if (!TransactionIdIsValid(snapshotConflictHorizon))
return;

if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon,
InvalidXLogRecPtr))
return;

/*
* Remember whether an operator had already paused recovery (e.g. via
* pg_wal_replay_pause()) before this conflict fired. If so, our
* auto-resume below must not clear that pause out from under them — the
* user's pause wins.
*/
user_requested_pause = (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED);

conflict_lsn = GetXLogReplayRecPtr(NULL);

ereport(LOG,
(errmsg("recovery paused: WAL redo at %X/%X would invalidate a logical replication slot",
LSN_FORMAT_ARGS(conflict_lsn)),
errdetail("snapshotConflictHorizon %u exceeds catalog_xmin of at least one active logical slot in database %u.",
snapshotConflictHorizon, dboid),
errhint("Recovery will resume automatically once the slot is drained past %X/%X, dropped, advanced, or invalidated for another reason; pg_wal_replay_resume() forces continuation (invalidating any remaining blocking slot).",
LSN_FORMAT_ARGS(conflict_lsn))));

SetRecoveryPause(true);

while (GetRecoveryPauseState() != RECOVERY_NOT_PAUSED)
{
ProcessStartupProcInterrupts();

/*
* If the operator gave up on the slot and triggered a promotion
* instead, bail out of the wait so the startup process can proceed
* with the promotion path. Must use CheckForStandbyTrigger (which
* actually consumes PROMOTE_SIGNAL_FILE), not PromoteIsTriggered
* (which only reads a flag populated by the former). Mirrors the
* same escape in recoveryPausesHere().
*/
if (CheckForStandbyTrigger())
{
ConditionVariableCancelSleep();
return;
}

/*
* Auto-resume: if nothing is still blocking this conflict, clear
* the pause and let the loop condition exit. The post-wait advance
* will bump catalog_xmin on any slot that drained past conflict_lsn
* so the fall-through InvalidateObsoleteReplicationSlots() is a
* no-op. Slots invalidated out of band (dropped, WAL-removed,
* etc.) are simply not in the scan anymore.
*/
if (!AnySlotStillBlocksConflict(dboid, snapshotConflictHorizon,
conflict_lsn))
{
/*
* Only clear the pause we set ourselves. If the operator had
* already paused recovery before the conflict fired, leave their
* pause in place — auto-resume must not silently override an
* explicit pg_wal_replay_pause(). Either way, exit the
* conflict-wait loop now that nothing is blocking.
*/
if (!user_requested_pause)
SetRecoveryPause(false);
break;
}

/*
* Promote RECOVERY_PAUSE_REQUESTED to RECOVERY_PAUSED so that
* observers (pg_get_wal_replay_pause_state() / monitoring) see the
* pause as actually taken, not just requested.
*/
ConfirmRecoveryPaused();
ConditionVariableTimedSleep(&XLogRecoveryCtl->recoveryNotPausedCV,
1000, WAIT_EVENT_RECOVERY_PAUSE);
}
ConditionVariableCancelSleep();

/*
* Wait is over. For any slot whose consumer drained up to (or past)
* conflict_lsn, advance catalog_xmin past the horizon so the subsequent
* InvalidateObsoleteReplicationSlots() fall-through is a no-op. Slots
* that did not drain are left alone and get invalidated normally — the
* "I didn't act, just let the slot die" path that runs when an operator
* manually resumed without draining.
*/
{
int j;

LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
for (j = 0; j < max_replication_slots; j++)
{
ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[j];
bool advance;

if (!s->in_use)
continue;

SpinLockAcquire(&s->mutex);
/*
* Skip synced slots — same reason as in the pause-check scan.
* Writing their fields would race the slot-sync worker.
*/
advance = (s->data.invalidated == RS_INVAL_NONE &&
s->data.database == dboid &&
!s->data.synced &&
s->data.confirmed_flush >= conflict_lsn &&
((TransactionIdIsValid(s->data.catalog_xmin) &&
TransactionIdPrecedesOrEquals(s->data.catalog_xmin,
snapshotConflictHorizon)) ||
(TransactionIdIsValid(s->data.xmin) &&
TransactionIdPrecedesOrEquals(s->data.xmin,
snapshotConflictHorizon))));
if (advance)
{
TransactionId new_xmin = snapshotConflictHorizon;

TransactionIdAdvance(new_xmin); /* strictly > horizon */
if (TransactionIdIsValid(s->data.catalog_xmin) &&
TransactionIdPrecedesOrEquals(s->data.catalog_xmin,
snapshotConflictHorizon))
{
s->data.catalog_xmin = new_xmin;
s->effective_catalog_xmin = new_xmin;
}
if (TransactionIdIsValid(s->data.xmin) &&
TransactionIdPrecedesOrEquals(s->data.xmin,
snapshotConflictHorizon))
{
s->data.xmin = new_xmin;
s->effective_xmin = new_xmin;
}
s->just_dirtied = true;
s->dirty = true;
}
SpinLockRelease(&s->mutex);

if (advance)
ereport(LOG,
(errmsg("advanced catalog_xmin of logical slot \"%s\" past conflict horizon %u",
NameStr(s->data.name), snapshotConflictHorizon),
errdetail("Slot's confirmed_flush_lsn %X/%X reached the conflict record at %X/%X; consumer drained past the pause point.",
LSN_FORMAT_ARGS(s->data.confirmed_flush),
LSN_FORMAT_ARGS(conflict_lsn))));
}
LWLockRelease(ReplicationSlotControlLock);

/*
* Flush dirty slots to disk immediately — do not defer to the next
* restartpoint. The normal streaming path (LogicalConfirmReceivedLocation
* in logical.c) saves slot state to disk *before* updating the
* in-memory effective_catalog_xmin. We must uphold the same invariant:
* if we crash between the in-memory advance above and the next
* restartpoint, the on-disk catalog_xmin must reflect the advance so
* that vacuum cannot reclaim catalog tuples the slot still needs.
*/
CheckPointReplicationSlots(false);
}
}

/*
Expand Down
8 changes: 8 additions & 0 deletions src/backend/utils/misc/guc_parameters.dat
Original file line number Diff line number Diff line change
Expand Up @@ -2441,6 +2441,14 @@
max => 'INT_MAX',
},

{ name => 'recovery_pause_on_logical_slot_conflict', type => 'bool',
context => 'PGC_SIGHUP', group => 'REPLICATION_STANDBY',
short_desc => 'Pauses recovery instead of invalidating an active logical slot on catalog conflict.',
long_desc => 'When WAL replay on a standby is about to invalidate a logical replication slot because a catalog PRUNE_ON_ACCESS record has overtaken the slot\'s catalog_xmin, pause recovery instead. Recovery resumes automatically once the slot has been drained past the pause point, dropped, advanced, or invalidated for another reason (e.g. max_slot_wal_keep_size). pg_wal_replay_resume() also forces continuation, invalidating any remaining blocking slot.',
variable => 'recovery_pause_on_logical_slot_conflict',
boot_val => 'false',
},

{ name => 'recovery_prefetch', type => 'enum', context => 'PGC_SIGHUP', group => 'WAL_RECOVERY',
short_desc => 'Prefetch referenced blocks during recovery.',
long_desc => 'Look ahead in the WAL to find references to uncached data.',
Expand Down
4 changes: 4 additions & 0 deletions src/backend/utils/misc/postgresql.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,10 @@
#wal_retrieve_retry_interval = 5s # time to wait before retrying to
# retrieve WAL after a failed attempt
#recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery
#recovery_pause_on_logical_slot_conflict = off # pause recovery instead of invalidating
# a logical slot on catalog conflict;
# auto-resumes once the slot is drained,
# dropped, or otherwise unblocks
#sync_replication_slots = off # enables slot synchronization on the physical standby from the primary

# - Subscribers -
Expand Down
Loading