From: Amit Kapila Date: Mon, 15 Dec 2025 02:50:21 +0000 (+0000) Subject: Add retry logic to pg_sync_replication_slots(). X-Git-Url: http://git.postgresql.org/gitweb/s%3Cscript%20data-cfasync=?a=commitdiff_plain;h=refs%2Fremotes%2Fgithub%2Fmaster;p=postgresql.git Add retry logic to pg_sync_replication_slots(). Previously, pg_sync_replication_slots() would finish without synchronizing slots that didn't meet requirements, rather than failing outright. This could leave some failover slots unsynchronized if required catalog rows or WAL segments were missing or at risk of removal, while the standby continued removing needed data. To address this, the function now waits for the primary slot to advance to a position where all required data is available on the standby before completing synchronization. It retries cyclically until all failover slots that existed on the primary at the start of the call are synchronized. Slots created after the function begins are not included. If the standby is promoted during this wait, the function exits gracefully and the temporary slots will be removed. Author: Ajin Cherian Author: Hou Zhijie Reviewed-by: Shveta Malik Reviewed-by: Japin Li Reviewed-by: Ashutosh Bapat Reviewed-by: Ashutosh Sharma Reviewed-by: Chao Li Reviewed-by: Yilin Zhang Reviewed-by: Amit Kapila Discussion: https://postgr.es/m/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com --- diff --git a/doc/src/sgml/func/func-admin.sgml b/doc/src/sgml/func/func-admin.sgml index 1b465bc8ba7..2896cd9e429 100644 --- a/doc/src/sgml/func/func-admin.sgml +++ b/doc/src/sgml/func/func-admin.sgml @@ -1497,9 +1497,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset standby server. Temporary synced slots, if any, cannot be used for logical decoding and must be dropped after promotion. See for details. - Note that this function is primarily intended for testing and - debugging purposes and should be used with caution. Additionally, - this function cannot be executed if + Note that this function cannot be executed if sync_replication_slots is enabled and the slotsync worker is already running to perform the synchronization of slots. diff --git a/doc/src/sgml/logicaldecoding.sgml b/doc/src/sgml/logicaldecoding.sgml index d5a5e22fe2c..cae8a376c3b 100644 --- a/doc/src/sgml/logicaldecoding.sgml +++ b/doc/src/sgml/logicaldecoding.sgml @@ -405,15 +405,13 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU periodic synchronization of failover slots, they can also be manually synchronized using the pg_sync_replication_slots function on the standby. - However, this function is primarily intended for testing and debugging and - should be used with caution. Unlike automatic synchronization, it does not - include cyclic retries, making it more prone to synchronization failures, - particularly during initial sync scenarios where the required WAL files - or catalog rows for the slot might have already been removed or are at risk - of being removed on the standby. In contrast, automatic synchronization - via sync_replication_slots provides continuous slot - updates, enabling seamless failover and supporting high availability. - Therefore, it is the recommended method for synchronizing slots. + However, unlike automatic synchronization, it does not perform incremental + updates. It retries cyclically until all the failover slots that existed on + primary at the start of the function call are synchronized. Any slots created + after the function begins will not be synchronized. In contrast, automatic + synchronization via sync_replication_slots provides + continuous slot updates, enabling seamless failover and supporting high + availability. Therefore, it is the recommended method for synchronizing slots. diff --git a/src/backend/replication/logical/slotsync.c b/src/backend/replication/logical/slotsync.c index 873aa003eec..bf50317b443 100644 --- a/src/backend/replication/logical/slotsync.c +++ b/src/backend/replication/logical/slotsync.c @@ -39,6 +39,13 @@ * the last cycle. Refer to the comments above wait_for_slot_activity() for * more details. * + * If the SQL function pg_sync_replication_slots() is used to sync the slots, + * and if the slots are not ready to be synced and are marked as RS_TEMPORARY + * because of any of the reasons mentioned above, then the SQL function also + * waits and retries until the slots are marked as RS_PERSISTENT (which means + * sync-ready). Refer to the comments in SyncReplicationSlots() for more + * details. + * * Any standby synchronized slots will be dropped if they no longer need * to be synchronized. See comment atop drop_local_obsolete_slots() for more * details. @@ -64,6 +71,7 @@ #include "storage/procarray.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/memutils.h" #include "utils/pg_lsn.h" #include "utils/ps_status.h" #include "utils/timeout.h" @@ -599,11 +607,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn) * local ones, then update the LSNs and persist the local synced slot for * future synchronization; otherwise, do nothing. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. + * * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise * false. */ static bool -update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) +update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot = MyReplicationSlot; bool found_consistent_snapshot = false; @@ -627,7 +639,13 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * current location when recreating the slot in the next cycle. It may * take more time to create such a slot. Therefore, we keep this slot * and attempt the synchronization in the next cycle. + * + * We also update the slot_persistence_pending parameter, so the SQL + * function can retry. */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -642,6 +660,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.", LSN_FORMAT_ARGS(slot->data.restart_lsn))); + /* Set this, so that SQL function can retry */ + if (slot_persistence_pending) + *slot_persistence_pending = true; + return false; } @@ -665,10 +687,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid) * updated. The slot is then persisted and is considered as sync-ready for * periodic syncs. * + * *slot_persistence_pending is set to true if any of the slots fail to + * persist. + * * Returns TRUE if the local slot is updated. */ static bool -synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) +synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid, + bool *slot_persistence_pending) { ReplicationSlot *slot; XLogRecPtr latestFlushPtr = GetStandbyFlushRecPtr(NULL); @@ -770,7 +796,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) if (slot->data.persistency == RS_TEMPORARY) { slot_updated = update_and_persist_local_synced_slot(remote_slot, - remote_dbid); + remote_dbid, + slot_persistence_pending); } /* Slot ready for sync, so sync it. */ @@ -867,7 +894,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) return false; } - update_and_persist_local_synced_slot(remote_slot, remote_dbid); + update_and_persist_local_synced_slot(remote_slot, remote_dbid, + slot_persistence_pending); slot_updated = true; } @@ -878,15 +906,16 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid) } /* - * Synchronize slots. + * Fetch remote slots. * - * Gets the failover logical slots info from the primary server and updates - * the slots locally. Creates the slots if not present on the standby. + * If slot_names is NIL, fetches all failover logical slots from the + * primary server, otherwise fetches only the ones with names in slot_names. * - * Returns TRUE if any of the slots gets updated in this sync-cycle. + * Returns a list of remote slot information structures, or NIL if none + * are found. */ -static bool -synchronize_slots(WalReceiverConn *wrconn) +static List * +fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names) { #define SLOTSYNC_COLUMN_COUNT 10 Oid slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID, @@ -895,29 +924,45 @@ synchronize_slots(WalReceiverConn *wrconn) WalRcvExecResult *res; TupleTableSlot *tupslot; List *remote_slot_list = NIL; - bool some_slot_updated = false; - bool started_tx = false; - const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn," - " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover," - " database, invalidation_reason" - " FROM pg_catalog.pg_replication_slots" - " WHERE failover and NOT temporary"; - - /* The syscache access in walrcv_exec() needs a transaction env. */ - if (!IsTransactionState()) + StringInfoData query; + + initStringInfo(&query); + appendStringInfoString(&query, + "SELECT slot_name, plugin, confirmed_flush_lsn," + " restart_lsn, catalog_xmin, two_phase," + " two_phase_at, failover," + " database, invalidation_reason" + " FROM pg_catalog.pg_replication_slots" + " WHERE failover and NOT temporary"); + + if (slot_names != NIL) { - StartTransactionCommand(); - started_tx = true; + bool first_slot = true; + + /* + * Construct the query to fetch only the specified slots + */ + appendStringInfoString(&query, " AND slot_name IN ("); + + foreach_ptr(char, slot_name, slot_names) + { + if (!first_slot) + appendStringInfoString(&query, ", "); + + appendStringInfo(&query, "%s", quote_literal_cstr(slot_name)); + first_slot = false; + } + appendStringInfoChar(&query, ')'); } /* Execute the query */ - res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow); + res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow); + pfree(query.data); if (res->status != WALRCV_OK_TUPLES) ereport(ERROR, errmsg("could not fetch failover logical slots info from the primary server: %s", res->err)); - /* Construct the remote_slot tuple and synchronize each slot locally */ tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple); while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot)) { @@ -994,6 +1039,29 @@ synchronize_slots(WalReceiverConn *wrconn) ExecClearTuple(tupslot); } + walrcv_clear_result(res); + + return remote_slot_list; +} + +/* + * Synchronize slots. + * + * This function takes a list of remote slots and synchronizes them locally. It + * creates the slots if not present on the standby and updates existing ones. + * + * If slot_persistence_pending is not NULL, it will be set to true if one or + * more slots could not be persisted. This allows callers such as + * SyncReplicationSlots() to retry those slots. + * + * Returns TRUE if any of the slots gets updated in this sync-cycle. + */ +static bool +synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list, + bool *slot_persistence_pending) +{ + bool some_slot_updated = false; + /* Drop local slots that no longer need to be synced. */ drop_local_obsolete_slots(remote_slot_list); @@ -1009,19 +1077,12 @@ synchronize_slots(WalReceiverConn *wrconn) */ LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); - some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid); + some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid, + slot_persistence_pending); UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock); } - /* We are done, free remote_slot_list elements */ - list_free_deep(remote_slot_list); - - walrcv_clear_result(res); - - if (started_tx) - CommitTransactionCommand(); - return some_slot_updated; } @@ -1460,6 +1521,9 @@ reset_syncing_flag(void) * * It connects to the primary server, fetches logical failover slots * information periodically in order to create and sync the slots. + * + * Note: If any changes are made here, check if the corresponding SQL + * function logic in SyncReplicationSlots() also needs to be changed. */ void ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) @@ -1621,10 +1685,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len) for (;;) { bool some_slot_updated = false; + bool started_tx = false; + List *remote_slots; ProcessSlotSyncInterrupts(); - some_slot_updated = synchronize_slots(wrconn); + /* + * The syscache access in fetch_remote_slots() needs a transaction + * env. + */ + if (!IsTransactionState()) + { + StartTransactionCommand(); + started_tx = true; + } + + remote_slots = fetch_remote_slots(wrconn, NIL); + some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL); + list_free_deep(remote_slots); + + if (started_tx) + CommitTransactionCommand(); wait_for_slot_activity(some_slot_updated); } @@ -1864,15 +1945,43 @@ slotsync_failure_callback(int code, Datum arg) walrcv_disconnect(wrconn); } +/* + * Helper function to extract slot names from a list of remote slots + */ +static List * +extract_slot_names(List *remote_slots) +{ + List *slot_names = NIL; + + foreach_ptr(RemoteSlot, remote_slot, remote_slots) + { + char *slot_name; + + slot_name = pstrdup(remote_slot->name); + slot_names = lappend(slot_names, slot_name); + } + + return slot_names; +} + /* * Synchronize the failover enabled replication slots using the specified * primary server connection. + * + * Repeatedly fetches and updates replication slot information from the + * primary until all slots are at least "sync ready". + * + * Exits early if promotion is triggered or certain critical + * configuration parameters have changed. */ void SyncReplicationSlots(WalReceiverConn *wrconn) { PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn)); { + List *remote_slots = NIL; + List *slot_names = NIL; /* List of slot names to track */ + check_and_set_sync_info(MyProcPid); /* Check for interrupts and config changes */ @@ -1880,7 +1989,54 @@ SyncReplicationSlots(WalReceiverConn *wrconn) validate_remote_info(wrconn); - synchronize_slots(wrconn); + /* Retry until all the slots are sync-ready */ + for (;;) + { + bool slot_persistence_pending = false; + bool some_slot_updated = false; + + /* Check for interrupts and config changes */ + ProcessSlotSyncInterrupts(); + + /* We must be in a valid transaction state */ + Assert(IsTransactionState()); + + /* + * Fetch remote slot info for the given slot_names. If slot_names + * is NIL, fetch all failover-enabled slots. Note that we reuse + * slot_names from the first iteration; re-fetching all failover + * slots each time could cause an endless loop. Instead of + * reprocessing only the pending slots in each iteration, it's + * better to process all the slots received in the first + * iteration. This ensures that by the time we're done, all slots + * reflect the latest values. + */ + remote_slots = fetch_remote_slots(wrconn, slot_names); + + /* Attempt to synchronize slots */ + some_slot_updated = synchronize_slots(wrconn, remote_slots, + &slot_persistence_pending); + + /* + * If slot_persistence_pending is true, extract slot names for + * future iterations (only needed if we haven't done it yet) + */ + if (slot_names == NIL && slot_persistence_pending) + slot_names = extract_slot_names(remote_slots); + + /* Free the current remote_slots list */ + list_free_deep(remote_slots); + + /* Done if all slots are persisted i.e are sync-ready */ + if (!slot_persistence_pending) + break; + + /* wait before retrying again */ + wait_for_slot_activity(some_slot_updated); + } + + if (slot_names) + list_free_deep(slot_names); /* Cleanup the synced temporary slots */ ReplicationSlotCleanup(true); diff --git a/src/backend/utils/activity/wait_event_names.txt b/src/backend/utils/activity/wait_event_names.txt index f39830dbb34..c0632bf901a 100644 --- a/src/backend/utils/activity/wait_event_names.txt +++ b/src/backend/utils/activity/wait_event_names.txt @@ -62,7 +62,7 @@ LOGICAL_APPLY_MAIN "Waiting in main loop of logical replication apply process." LOGICAL_LAUNCHER_MAIN "Waiting in main loop of logical replication launcher process." LOGICAL_PARALLEL_APPLY_MAIN "Waiting in main loop of logical replication parallel apply process." RECOVERY_WAL_STREAM "Waiting in main loop of startup process for WAL to arrive, during streaming recovery." -REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot sync worker." +REPLICATION_SLOTSYNC_MAIN "Waiting in main loop of slot synchronization." REPLICATION_SLOTSYNC_SHUTDOWN "Waiting for slot sync worker to shut down." SYSLOGGER_MAIN "Waiting in main loop of syslogger process." WAL_RECEIVER_MAIN "Waiting in main loop of WAL receiver process." diff --git a/src/test/recovery/t/040_standby_failover_slots_sync.pl b/src/test/recovery/t/040_standby_failover_slots_sync.pl index 25777fa188c..20f942cfd14 100644 --- a/src/test/recovery/t/040_standby_failover_slots_sync.pl +++ b/src/test/recovery/t/040_standby_failover_slots_sync.pl @@ -1000,6 +1000,13 @@ $primary->psql( )); $subscriber2->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub2;'); +$subscriber1->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub1;'); +$subscriber1->safe_psql('postgres', 'TRUNCATE tab_int;'); + +# Remove the dropped sb1_slot from the synchronized_standby_slots list and reload the +# configuration. +$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''"); +$primary->reload; # Verify that all slots have been removed except the one necessary for standby2, # which is needed for further testing. @@ -1016,34 +1023,46 @@ $primary->safe_psql('postgres', "COMMIT PREPARED 'test_twophase_slotsync';"); $primary->wait_for_replay_catchup($standby2); ################################################## -# Verify that slotsync skip statistics are correctly updated when the +# Test that pg_sync_replication_slots() on the standby skips and retries +# until the slot becomes sync-ready (when the remote slot catches up with +# the locally reserved position). +# Also verify that slotsync skip statistics are correctly updated when the # slotsync operation is skipped. ################################################## -# Create a logical replication slot and create some DDL on the primary so -# that the slot lags behind the standby. -$primary->safe_psql( - 'postgres', qq( - SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true); - CREATE TABLE wal_push(a int); -)); +# Recreate the slot by creating a subscription on the subscriber, keep it disabled. +$subscriber1->safe_psql('postgres', qq[ + CREATE TABLE push_wal (a int); + CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, enabled = false);]); + +# Create some DDL on the primary so that the slot lags behind the standby +$primary->safe_psql('postgres', "CREATE TABLE push_wal (a int);"); + +# Make sure the DDL changes are synced to the standby $primary->wait_for_replay_catchup($standby2); $log_offset = -s $standby2->logfile; -# Enable slot sync worker +# Enable standby for slot synchronization $standby2->append_conf( 'postgresql.conf', qq( hot_standby_feedback = on primary_conninfo = '$connstr_1 dbname=postgres' log_min_messages = 'debug2' -sync_replication_slots = on )); $standby2->reload; -# Confirm that the slot sync worker is able to start. -$standby2->wait_for_log(qr/slot sync worker started/, $log_offset); +# Attempt to synchronize slots using API. The API will continue retrying +# synchronization until the remote slot catches up. +# The API will not return until this happens, to be able to make +# further calls, call the API in a background process. +my $h = $standby2->background_psql('postgres', on_error_stop => 0); + +$h->query_until(qr/start/, q( + \echo start + SELECT pg_sync_replication_slots(); + )); # Confirm that the slot sync is skipped due to the remote slot lagging behind $standby2->wait_for_log( @@ -1061,4 +1080,18 @@ $result = $standby2->safe_psql('postgres', ); is($result, 't', "check slot sync skip count increments"); +# Enable the Subscription, so that the remote slot catches up +$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE"); +$subscriber1->wait_for_subscription_sync; + +# Create xl_running_xacts on the primary to speed up restart_lsn advancement. +$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();"); + +# Confirm from the log that the slot is sync-ready now. +$standby2->wait_for_log( + qr/newly created replication slot \"lsub1_slot\" is sync-ready now/, + $log_offset); + +$h->quit; + done_testing();