Add retry logic to pg_sync_replication_slots(). master github/master
authorAmit Kapila <[email protected]>
Mon, 15 Dec 2025 02:50:21 +0000 (02:50 +0000)
committerAmit Kapila <[email protected]>
Mon, 15 Dec 2025 02:50:21 +0000 (02:50 +0000)
Previously, pg_sync_replication_slots() would finish without synchronizing
slots that didn't meet requirements, rather than failing outright. This
could leave some failover slots unsynchronized if required catalog rows or
WAL segments were missing or at risk of removal, while the standby
continued removing needed data.

To address this, the function now waits for the primary slot to advance to
a position where all required data is available on the standby before
completing synchronization. It retries cyclically until all failover slots
that existed on the primary at the start of the call are synchronized.
Slots created after the function begins are not included. If the standby
is promoted during this wait, the function exits gracefully and the
temporary slots will be removed.

Author: Ajin Cherian <[email protected]>
Author: Hou Zhijie <[email protected]>
Reviewed-by: Shveta Malik <[email protected]>
Reviewed-by: Japin Li <[email protected]>
Reviewed-by: Ashutosh Bapat <[email protected]>
Reviewed-by: Ashutosh Sharma <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Yilin Zhang <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Discussion: https://postgr.es/m/CAFPTHDZAA%2BgWDntpa5ucqKKba41%3DtXmoXqN3q4rpjO9cdxgQrw%40mail.gmail.com

doc/src/sgml/func/func-admin.sgml
doc/src/sgml/logicaldecoding.sgml
src/backend/replication/logical/slotsync.c
src/backend/utils/activity/wait_event_names.txt
src/test/recovery/t/040_standby_failover_slots_sync.pl

index 1b465bc8ba71ca319845f5b372b9a5038d522246..2896cd9e4290901fbf6c6e21fbeb96e371501f16 100644 (file)
@@ -1497,9 +1497,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
         standby server. Temporary synced slots, if any, cannot be used for
         logical decoding and must be dropped after promotion. See
         <xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
-        Note that this function is primarily intended for testing and
-        debugging purposes and should be used with caution. Additionally,
-        this function cannot be executed if
+        Note that this function cannot be executed if
         <link linkend="guc-sync-replication-slots"><varname>
         sync_replication_slots</varname></link> is enabled and the slotsync
         worker is already running to perform the synchronization of slots.
index d5a5e22fe2c2e87eecea2c550cf3d1139fd44a16..cae8a376c3ba769cf8c6f9dd443053dc383fcbb6 100644 (file)
@@ -405,15 +405,13 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU
       periodic synchronization of failover slots, they can also be manually
       synchronized using the <link linkend="pg-sync-replication-slots">
       <function>pg_sync_replication_slots</function></link> function on the standby.
-      However, this function is primarily intended for testing and debugging and
-      should be used with caution. Unlike automatic synchronization, it does not
-      include cyclic retries, making it more prone to synchronization failures,
-      particularly during initial sync scenarios where the required WAL files
-      or catalog rows for the slot might have already been removed or are at risk
-      of being removed on the standby. In contrast, automatic synchronization
-      via <varname>sync_replication_slots</varname> provides continuous slot
-      updates, enabling seamless failover and supporting high availability.
-      Therefore, it is the recommended method for synchronizing slots.
+      However, unlike automatic synchronization, it does not perform incremental
+      updates. It retries cyclically until all the failover slots that existed on
+      primary at the start of the function call are synchronized. Any slots created
+      after the function begins will not be synchronized. In contrast, automatic
+      synchronization via <varname>sync_replication_slots</varname> provides
+      continuous slot updates, enabling seamless failover and supporting high
+      availability. Therefore, it is the recommended method for synchronizing slots.
      </para>
     </note>
 
index 873aa003eec351bbe6c11cac592da70b70721400..bf50317b4436e198ffae4e57cb6555c4e661fae4 100644 (file)
  * the last cycle. Refer to the comments above wait_for_slot_activity() for
  * more details.
  *
+ * If the SQL function pg_sync_replication_slots() is used to sync the slots,
+ * and if the slots are not ready to be synced and are marked as RS_TEMPORARY
+ * because of any of the reasons mentioned above, then the SQL function also
+ * waits and retries until the slots are marked as RS_PERSISTENT (which means
+ * sync-ready). Refer to the comments in SyncReplicationSlots() for more
+ * details.
+ *
  * Any standby synchronized slots will be dropped if they no longer need
  * to be synchronized. See comment atop drop_local_obsolete_slots() for more
  * details.
@@ -64,6 +71,7 @@
 #include "storage/procarray.h"
 #include "tcop/tcopprot.h"
 #include "utils/builtins.h"
+#include "utils/memutils.h"
 #include "utils/pg_lsn.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
@@ -599,11 +607,15 @@ reserve_wal_for_local_slot(XLogRecPtr restart_lsn)
  * local ones, then update the LSNs and persist the local synced slot for
  * future synchronization; otherwise, do nothing.
  *
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist.
+ *
  * Return true if the slot is marked as RS_PERSISTENT (sync-ready), otherwise
  * false.
  */
 static bool
-update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+                                    bool *slot_persistence_pending)
 {
    ReplicationSlot *slot = MyReplicationSlot;
    bool        found_consistent_snapshot = false;
@@ -627,7 +639,13 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
         * current location when recreating the slot in the next cycle. It may
         * take more time to create such a slot. Therefore, we keep this slot
         * and attempt the synchronization in the next cycle.
+        *
+        * We also update the slot_persistence_pending parameter, so the SQL
+        * function can retry.
         */
+       if (slot_persistence_pending)
+           *slot_persistence_pending = true;
+
        return false;
    }
 
@@ -642,6 +660,10 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
                errdetail("Synchronization could lead to data loss, because the standby could not build a consistent snapshot to decode WALs at LSN %X/%08X.",
                          LSN_FORMAT_ARGS(slot->data.restart_lsn)));
 
+       /* Set this, so that SQL function can retry */
+       if (slot_persistence_pending)
+           *slot_persistence_pending = true;
+
        return false;
    }
 
@@ -665,10 +687,14 @@ update_and_persist_local_synced_slot(RemoteSlot *remote_slot, Oid remote_dbid)
  * updated. The slot is then persisted and is considered as sync-ready for
  * periodic syncs.
  *
+ * *slot_persistence_pending is set to true if any of the slots fail to
+ * persist.
+ *
  * Returns TRUE if the local slot is updated.
  */
 static bool
-synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
+synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid,
+                    bool *slot_persistence_pending)
 {
    ReplicationSlot *slot;
    XLogRecPtr  latestFlushPtr = GetStandbyFlushRecPtr(NULL);
@@ -770,7 +796,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
        if (slot->data.persistency == RS_TEMPORARY)
        {
            slot_updated = update_and_persist_local_synced_slot(remote_slot,
-                                                               remote_dbid);
+                                                               remote_dbid,
+                                                               slot_persistence_pending);
        }
 
        /* Slot ready for sync, so sync it. */
@@ -867,7 +894,8 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
            return false;
        }
 
-       update_and_persist_local_synced_slot(remote_slot, remote_dbid);
+       update_and_persist_local_synced_slot(remote_slot, remote_dbid,
+                                            slot_persistence_pending);
 
        slot_updated = true;
    }
@@ -878,15 +906,16 @@ synchronize_one_slot(RemoteSlot *remote_slot, Oid remote_dbid)
 }
 
 /*
- * Synchronize slots.
+ * Fetch remote slots.
  *
- * Gets the failover logical slots info from the primary server and updates
- * the slots locally. Creates the slots if not present on the standby.
+ * If slot_names is NIL, fetches all failover logical slots from the
+ * primary server, otherwise fetches only the ones with names in slot_names.
  *
- * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ * Returns a list of remote slot information structures, or NIL if none
+ * are found.
  */
-static bool
-synchronize_slots(WalReceiverConn *wrconn)
+static List *
+fetch_remote_slots(WalReceiverConn *wrconn, List *slot_names)
 {
 #define SLOTSYNC_COLUMN_COUNT 10
    Oid         slotRow[SLOTSYNC_COLUMN_COUNT] = {TEXTOID, TEXTOID, LSNOID,
@@ -895,29 +924,45 @@ synchronize_slots(WalReceiverConn *wrconn)
    WalRcvExecResult *res;
    TupleTableSlot *tupslot;
    List       *remote_slot_list = NIL;
-   bool        some_slot_updated = false;
-   bool        started_tx = false;
-   const char *query = "SELECT slot_name, plugin, confirmed_flush_lsn,"
-       " restart_lsn, catalog_xmin, two_phase, two_phase_at, failover,"
-       " database, invalidation_reason"
-       " FROM pg_catalog.pg_replication_slots"
-       " WHERE failover and NOT temporary";
-
-   /* The syscache access in walrcv_exec() needs a transaction env. */
-   if (!IsTransactionState())
+   StringInfoData query;
+
+   initStringInfo(&query);
+   appendStringInfoString(&query,
+                          "SELECT slot_name, plugin, confirmed_flush_lsn,"
+                          " restart_lsn, catalog_xmin, two_phase,"
+                          " two_phase_at, failover,"
+                          " database, invalidation_reason"
+                          " FROM pg_catalog.pg_replication_slots"
+                          " WHERE failover and NOT temporary");
+
+   if (slot_names != NIL)
    {
-       StartTransactionCommand();
-       started_tx = true;
+       bool        first_slot = true;
+
+       /*
+        * Construct the query to fetch only the specified slots
+        */
+       appendStringInfoString(&query, " AND slot_name IN (");
+
+       foreach_ptr(char, slot_name, slot_names)
+       {
+           if (!first_slot)
+               appendStringInfoString(&query, ", ");
+
+           appendStringInfo(&query, "%s", quote_literal_cstr(slot_name));
+           first_slot = false;
+       }
+       appendStringInfoChar(&query, ')');
    }
 
    /* Execute the query */
-   res = walrcv_exec(wrconn, query, SLOTSYNC_COLUMN_COUNT, slotRow);
+   res = walrcv_exec(wrconn, query.data, SLOTSYNC_COLUMN_COUNT, slotRow);
+   pfree(query.data);
    if (res->status != WALRCV_OK_TUPLES)
        ereport(ERROR,
                errmsg("could not fetch failover logical slots info from the primary server: %s",
                       res->err));
 
-   /* Construct the remote_slot tuple and synchronize each slot locally */
    tupslot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
    while (tuplestore_gettupleslot(res->tuplestore, true, false, tupslot))
    {
@@ -994,6 +1039,29 @@ synchronize_slots(WalReceiverConn *wrconn)
        ExecClearTuple(tupslot);
    }
 
+   walrcv_clear_result(res);
+
+   return remote_slot_list;
+}
+
+/*
+ * Synchronize slots.
+ *
+ * This function takes a list of remote slots and synchronizes them locally. It
+ * creates the slots if not present on the standby and updates existing ones.
+ *
+ * If slot_persistence_pending is not NULL, it will be set to true if one or
+ * more slots could not be persisted. This allows callers such as
+ * SyncReplicationSlots() to retry those slots.
+ *
+ * Returns TRUE if any of the slots gets updated in this sync-cycle.
+ */
+static bool
+synchronize_slots(WalReceiverConn *wrconn, List *remote_slot_list,
+                 bool *slot_persistence_pending)
+{
+   bool        some_slot_updated = false;
+
    /* Drop local slots that no longer need to be synced. */
    drop_local_obsolete_slots(remote_slot_list);
 
@@ -1009,19 +1077,12 @@ synchronize_slots(WalReceiverConn *wrconn)
         */
        LockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
 
-       some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid);
+       some_slot_updated |= synchronize_one_slot(remote_slot, remote_dbid,
+                                                 slot_persistence_pending);
 
        UnlockSharedObject(DatabaseRelationId, remote_dbid, 0, AccessShareLock);
    }
 
-   /* We are done, free remote_slot_list elements */
-   list_free_deep(remote_slot_list);
-
-   walrcv_clear_result(res);
-
-   if (started_tx)
-       CommitTransactionCommand();
-
    return some_slot_updated;
 }
 
@@ -1460,6 +1521,9 @@ reset_syncing_flag(void)
  *
  * It connects to the primary server, fetches logical failover slots
  * information periodically in order to create and sync the slots.
+ *
+ * Note: If any changes are made here, check if the corresponding SQL
+ * function logic in SyncReplicationSlots() also needs to be changed.
  */
 void
 ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
@@ -1621,10 +1685,27 @@ ReplSlotSyncWorkerMain(const void *startup_data, size_t startup_data_len)
    for (;;)
    {
        bool        some_slot_updated = false;
+       bool        started_tx = false;
+       List       *remote_slots;
 
        ProcessSlotSyncInterrupts();
 
-       some_slot_updated = synchronize_slots(wrconn);
+       /*
+        * The syscache access in fetch_remote_slots() needs a transaction
+        * env.
+        */
+       if (!IsTransactionState())
+       {
+           StartTransactionCommand();
+           started_tx = true;
+       }
+
+       remote_slots = fetch_remote_slots(wrconn, NIL);
+       some_slot_updated = synchronize_slots(wrconn, remote_slots, NULL);
+       list_free_deep(remote_slots);
+
+       if (started_tx)
+           CommitTransactionCommand();
 
        wait_for_slot_activity(some_slot_updated);
    }
@@ -1864,15 +1945,43 @@ slotsync_failure_callback(int code, Datum arg)
    walrcv_disconnect(wrconn);
 }
 
+/*
+ * Helper function to extract slot names from a list of remote slots
+ */
+static List *
+extract_slot_names(List *remote_slots)
+{
+   List       *slot_names = NIL;
+
+   foreach_ptr(RemoteSlot, remote_slot, remote_slots)
+   {
+       char       *slot_name;
+
+       slot_name = pstrdup(remote_slot->name);
+       slot_names = lappend(slot_names, slot_name);
+   }
+
+   return slot_names;
+}
+
 /*
  * Synchronize the failover enabled replication slots using the specified
  * primary server connection.
+ *
+ * Repeatedly fetches and updates replication slot information from the
+ * primary until all slots are at least "sync ready".
+ *
+ * Exits early if promotion is triggered or certain critical
+ * configuration parameters have changed.
  */
 void
 SyncReplicationSlots(WalReceiverConn *wrconn)
 {
    PG_ENSURE_ERROR_CLEANUP(slotsync_failure_callback, PointerGetDatum(wrconn));
    {
+       List       *remote_slots = NIL;
+       List       *slot_names = NIL;   /* List of slot names to track */
+
        check_and_set_sync_info(MyProcPid);
 
        /* Check for interrupts and config changes */
@@ -1880,7 +1989,54 @@ SyncReplicationSlots(WalReceiverConn *wrconn)
 
        validate_remote_info(wrconn);
 
-       synchronize_slots(wrconn);
+       /* Retry until all the slots are sync-ready */
+       for (;;)
+       {
+           bool        slot_persistence_pending = false;
+           bool        some_slot_updated = false;
+
+           /* Check for interrupts and config changes */
+           ProcessSlotSyncInterrupts();
+
+           /* We must be in a valid transaction state */
+           Assert(IsTransactionState());
+
+           /*
+            * Fetch remote slot info for the given slot_names. If slot_names
+            * is NIL, fetch all failover-enabled slots. Note that we reuse
+            * slot_names from the first iteration; re-fetching all failover
+            * slots each time could cause an endless loop. Instead of
+            * reprocessing only the pending slots in each iteration, it's
+            * better to process all the slots received in the first
+            * iteration. This ensures that by the time we're done, all slots
+            * reflect the latest values.
+            */
+           remote_slots = fetch_remote_slots(wrconn, slot_names);
+
+           /* Attempt to synchronize slots */
+           some_slot_updated = synchronize_slots(wrconn, remote_slots,
+                                                 &slot_persistence_pending);
+
+           /*
+            * If slot_persistence_pending is true, extract slot names for
+            * future iterations (only needed if we haven't done it yet)
+            */
+           if (slot_names == NIL && slot_persistence_pending)
+               slot_names = extract_slot_names(remote_slots);
+
+           /* Free the current remote_slots list */
+           list_free_deep(remote_slots);
+
+           /* Done if all slots are persisted i.e are sync-ready */
+           if (!slot_persistence_pending)
+               break;
+
+           /* wait before retrying again */
+           wait_for_slot_activity(some_slot_updated);
+       }
+
+       if (slot_names)
+           list_free_deep(slot_names);
 
        /* Cleanup the synced temporary slots */
        ReplicationSlotCleanup(true);
index f39830dbb344d217b9da4e7ca692343ab5515236..c0632bf901ad2f87f6442e9c7de2ab0bddc2f30e 100644 (file)
@@ -62,7 +62,7 @@ LOGICAL_APPLY_MAIN    "Waiting in main loop of logical replication apply process."
 LOGICAL_LAUNCHER_MAIN  "Waiting in main loop of logical replication launcher process."
 LOGICAL_PARALLEL_APPLY_MAIN    "Waiting in main loop of logical replication parallel apply process."
 RECOVERY_WAL_STREAM    "Waiting in main loop of startup process for WAL to arrive, during streaming recovery."
-REPLICATION_SLOTSYNC_MAIN  "Waiting in main loop of slot sync worker."
+REPLICATION_SLOTSYNC_MAIN  "Waiting in main loop of slot synchronization."
 REPLICATION_SLOTSYNC_SHUTDOWN  "Waiting for slot sync worker to shut down."
 SYSLOGGER_MAIN "Waiting in main loop of syslogger process."
 WAL_RECEIVER_MAIN  "Waiting in main loop of WAL receiver process."
index 25777fa188c08eef370a3257abc0487e23b163d6..20f942cfd14581aed775fdf77598d4bb989b801f 100644 (file)
@@ -1000,6 +1000,13 @@ $primary->psql(
 ));
 
 $subscriber2->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub2;');
+$subscriber1->safe_psql('postgres', 'DROP SUBSCRIPTION regress_mysub1;');
+$subscriber1->safe_psql('postgres', 'TRUNCATE tab_int;');
+
+# Remove the dropped sb1_slot from the synchronized_standby_slots list and reload the
+# configuration.
+$primary->adjust_conf('postgresql.conf', 'synchronized_standby_slots', "''");
+$primary->reload;
 
 # Verify that all slots have been removed except the one necessary for standby2,
 # which is needed for further testing.
@@ -1016,34 +1023,46 @@ $primary->safe_psql('postgres', "COMMIT PREPARED 'test_twophase_slotsync';");
 $primary->wait_for_replay_catchup($standby2);
 
 ##################################################
-# Verify that slotsync skip statistics are correctly updated when the
+# Test that pg_sync_replication_slots() on the standby skips and retries
+# until the slot becomes sync-ready (when the remote slot catches up with
+# the locally reserved position).
+# Also verify that slotsync skip statistics are correctly updated when the
 # slotsync operation is skipped.
 ##################################################
 
-# Create a logical replication slot and create some DDL on the primary so
-# that the slot lags behind the standby.
-$primary->safe_psql(
-   'postgres', qq(
-   SELECT pg_create_logical_replication_slot('lsub1_slot', 'pgoutput', false, false, true);
-   CREATE TABLE wal_push(a int);
-));
+# Recreate the slot by creating a subscription on the subscriber, keep it disabled.
+$subscriber1->safe_psql('postgres', qq[
+   CREATE TABLE push_wal (a int);
+   CREATE SUBSCRIPTION regress_mysub1 CONNECTION '$publisher_connstr' PUBLICATION regress_mypub WITH (slot_name = lsub1_slot, failover = true, enabled = false);]);
+
+# Create some DDL on the primary so that the slot lags behind the standby
+$primary->safe_psql('postgres', "CREATE TABLE push_wal (a int);");
+
+# Make sure the DDL changes are synced to the standby
 $primary->wait_for_replay_catchup($standby2);
 
 $log_offset = -s $standby2->logfile;
 
-# Enable slot sync worker
+# Enable standby for slot synchronization
 $standby2->append_conf(
    'postgresql.conf', qq(
 hot_standby_feedback = on
 primary_conninfo = '$connstr_1 dbname=postgres'
 log_min_messages = 'debug2'
-sync_replication_slots = on
 ));
 
 $standby2->reload;
 
-# Confirm that the slot sync worker is able to start.
-$standby2->wait_for_log(qr/slot sync worker started/, $log_offset);
+# Attempt to synchronize slots using API. The API will continue retrying
+# synchronization until the remote slot catches up.
+# The API will not return until this happens, to be able to make
+# further calls, call the API in a background process.
+my $h = $standby2->background_psql('postgres', on_error_stop => 0);
+
+$h->query_until(qr/start/, q(
+   \echo start
+   SELECT pg_sync_replication_slots();
+   ));
 
 # Confirm that the slot sync is skipped due to the remote slot lagging behind
 $standby2->wait_for_log(
@@ -1061,4 +1080,18 @@ $result = $standby2->safe_psql('postgres',
 );
 is($result, 't', "check slot sync skip count increments");
 
+# Enable the Subscription, so that the remote slot catches up
+$subscriber1->safe_psql('postgres', "ALTER SUBSCRIPTION regress_mysub1 ENABLE");
+$subscriber1->wait_for_subscription_sync;
+
+# Create xl_running_xacts on the primary to speed up restart_lsn advancement.
+$primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot();");
+
+# Confirm from the log that the slot is sync-ready now.
+$standby2->wait_for_log(
+    qr/newly created replication slot \"lsub1_slot\" is sync-ready now/,
+    $log_offset);
+
+$h->quit;
+
 done_testing();