Toggle logical decoding dynamically based on logical slot presence.
authorMasahiko Sawada <[email protected]>
Tue, 23 Dec 2025 18:13:16 +0000 (10:13 -0800)
committerMasahiko Sawada <[email protected]>
Tue, 23 Dec 2025 18:13:16 +0000 (10:13 -0800)
Previously logical decoding required wal_level to be set to 'logical'
at server start. This meant that users had to incur the overhead of
logical-level WAL logging even when no logical replication slots were
in use.

This commit adds functionality to automatically control logical
decoding availability based on logical replication slot presence. The
newly introduced module logicalctl.c allows logical decoding to be
dynamically activated when needed when wal_level is set to
'replica'.

When the first logical replication slot is created, the system
automatically increases the effective WAL level to maintain
logical-level WAL records. Conversely, after the last logical slot is
dropped or invalidated, it decreases back to 'replica' WAL level.

While activation occurs synchronously right after creating the first
logical slot, deactivation happens asynchronously through the
checkpointer process. This design avoids a race condition at the end
of recovery; a concurrent deactivation could happen while the startup
process enables logical decoding at the end of recovery, but WAL
writes are still not permitted until recovery fully completes. The
checkpointer will handle it after recovery is done. Asynchronous
deactivation also avoids excessive toggling of the logical decoding
status in workloads that repeatedly create and drop a single logical
slot. On the other hand, this lazy approach can delay changes to
effective_wal_level and the disabling logical decoding, especially
when the checkpointer is busy with other tasks. We chose this lazy
approach in all deactivation paths to keep the implementation simple,
even though laziness is strictly required only for end-of-recovery
cases. Future work might address this limitation either by using a
dedicated worker instead of the checkpointer, or by implementing
synchronous waiting during slot drops if workloads are significantly
affected by the lazy deactivation of logical decoding.

The effective WAL level, determined internally by XLogLogicalInfo, is
allowed to change within a transaction until an XID is assigned. Once
an XID is assigned, the value becomes fixed for the remainder of the
transaction. This behavior ensures that the logging mode remains
consistent within a writing transaction, similar to the behavior of
GUC parameters.

A new read-only GUC parameter effective_wal_level is introduced to
monitor the actual WAL level in effect. This parameter reflects the
current operational WAL level, which may differ from the configured
wal_level setting.

Bump PG_CONTROL_VERSION as it adds a new field to CheckPoint struct.

Reviewed-by: Shveta Malik <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Hayato Kuroda <[email protected]>
Reviewed-by: Bertrand Drouvot <[email protected]>
Reviewed-by: Peter Smith <[email protected]>
Reviewed-by: Shlok Kyal <[email protected]>
Reviewed-by: Ashutosh Bapat <[email protected]>
Discussion: https://postgr.es/m/CAD21AoCVLeLYq09pQPaWs+Jwdni5FuJ8v2jgq-u9_uFbcp6UbA@mail.gmail.com

47 files changed:
doc/src/sgml/config.sgml
doc/src/sgml/logical-replication.sgml
doc/src/sgml/logicaldecoding.sgml
doc/src/sgml/ref/pg_createsubscriber.sgml
doc/src/sgml/system-views.sgml
src/backend/access/heap/heapam.c
src/backend/access/rmgrdesc/xlogdesc.c
src/backend/access/transam/xact.c
src/backend/access/transam/xlog.c
src/backend/commands/publicationcmds.c
src/backend/commands/tablecmds.c
src/backend/postmaster/checkpointer.c
src/backend/postmaster/postmaster.c
src/backend/replication/logical/Makefile
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/logicalctl.c [new file with mode: 0644]
src/backend/replication/logical/meson.build
src/backend/replication/logical/slotsync.c
src/backend/replication/slot.c
src/backend/replication/slotfuncs.c
src/backend/replication/walsender.c
src/backend/storage/ipc/ipci.c
src/backend/storage/ipc/procsignal.c
src/backend/storage/ipc/standby.c
src/backend/utils/activity/wait_event_names.txt
src/backend/utils/cache/inval.c
src/backend/utils/init/postinit.c
src/backend/utils/misc/guc_parameters.dat
src/backend/utils/misc/guc_tables.c
src/bin/pg_basebackup/pg_createsubscriber.c
src/bin/pg_basebackup/t/040_pg_createsubscriber.pl
src/bin/pg_upgrade/check.c
src/bin/pg_upgrade/t/002_pg_upgrade.pl
src/include/access/xlog.h
src/include/catalog/pg_control.h
src/include/replication/logicalctl.h [new file with mode: 0644]
src/include/replication/slot.h
src/include/storage/lwlocklist.h
src/include/storage/procsignal.h
src/include/utils/guc_hooks.h
src/test/recovery/meson.build
src/test/recovery/t/035_standby_logical_decoding.pl
src/test/recovery/t/051_effective_wal_level.pl [new file with mode: 0644]
src/test/regress/expected/publication.out
src/test/subscription/t/001_rep_changes.pl
src/tools/pgindent/typedefs.list

index 1c23538d3c5d2667eca9c088dbcb38e8288b5521..cdfe8e376f0c4bc1bdffa91428bf2aedf41e52eb 100644 (file)
@@ -3046,6 +3046,17 @@ include_dir 'conf.d'
         many <command>UPDATE</command> and <command>DELETE</command> statements are
         executed.
        </para>
+       <para>
+        It is important to note that when <varname>wal_level</varname> is set to
+        <literal>replica</literal>, the effective WAL level can automatically change
+        based on the presence of <link linkend="logicaldecoding-replication-slots">
+        logical replication slots</link>. The system automatically increases the
+        effective WAL level to <literal>logical</literal> when creating the first
+        logical replication slot, and decreases it back to <literal>replica</literal>
+        when dropping or invalidating the last logical replication slot. The current
+        effective WAL level can be monitored through
+        <xref linkend="guc-effective-wal-level"/> parameter.
+       </para>
        <para>
         In releases prior to 9.6, this parameter also allowed the
         values <literal>archive</literal> and <literal>hot_standby</literal>.
@@ -11851,6 +11862,38 @@ dynamic_library_path = '/usr/local/lib/postgresql:$libdir'
       </listitem>
      </varlistentry>
 
+     <varlistentry id="guc-effective-wal-level" xreflabel="effective_wal_level">
+      <term><varname>effective_wal_level</varname> (<type>enum</type>)
+      <indexterm>
+       <primary><varname>effective_wal_level</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Reports the actual WAL logging level currently in effect in the
+        system. This parameter shares the same set of values as
+        <xref linkend="guc-wal-level"/>, but reflects the operational WAL
+        level rather than the configured setting. For descriptions of
+        possible values, refer to the <varname>wal_level</varname>
+        parameter documentation.
+       </para>
+       <para>
+        The effective WAL level can differ from the configured
+        <varname>wal_level</varname> in certain situations. For example,
+        when <varname>wal_level</varname> is set to <literal>replica</literal>
+        and the system has one or more logical replication slots,
+        <varname>effective_wal_level</varname> will show <literal>logical</literal>
+        to indicate that the system is maintaining WAL records at
+        <literal>logical</literal> level equivalent.
+       </para>
+       <para>
+        On standby servers, <varname>effective_wal_level</varname> matches
+        the value of <varname>effective_wal_level</varname> from the most
+        upstream server in the replication chain.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-huge-pages-status" xreflabel="huge_pages_status">
       <term><varname>huge_pages_status</varname> (<type>enum</type>)
       <indexterm>
index b3faaa675ef33b7f4053e6cc8b49614d8e225944..f47b7378397814aac851725e3e058d3888d3c408 100644 (file)
@@ -2629,7 +2629,7 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
 
    <para>
     <link linkend="guc-wal-level"><varname>wal_level</varname></link> must be
-    set to <literal>logical</literal>.
+    set to <literal>replica</literal> or <literal>logical</literal>.
    </para>
 
    <para>
@@ -2751,7 +2751,7 @@ CONTEXT:  processing remote data for replication origin "pg_16395" during "INSER
      <para>
       The new cluster must have
       <link linkend="guc-wal-level"><varname>wal_level</varname></link> as
-      <literal>logical</literal>.
+      <literal>replica</literal> or <literal>logical</literal>.
      </para>
     </listitem>
     <listitem>
index 6368e46ce932f39aebd7cdbf6b78e18486e5aa3f..f36bf9462fa0da9a7b339565f4839fc2e76dedb4 100644 (file)
@@ -47,7 +47,7 @@
 
    <para>
     Before you can use logical decoding, you must set
-    <xref linkend="guc-wal-level"/> to <literal>logical</literal> and
+    <xref linkend="guc-wal-level"/> to <literal>replica</literal> or higher and
     <xref linkend="guc-max-replication-slots"/> to at least 1.  Then, you
     should connect to the target database (in the example
     below, <literal>postgres</literal>) as a superuser.
@@ -257,6 +257,47 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      log</link>, which describe changes on a storage level, into an
      application-specific form such as a stream of tuples or SQL statements.
     </para>
+
+    <para>
+     Logical decoding becomes available in two conditions:
+    </para>
+    <itemizedlist>
+     <listitem>
+      <para>
+       When <xref linkend="guc-wal-level"/> is set to <literal>logical</literal>.
+      </para>
+     </listitem>
+     <listitem>
+      <para>
+       When <xref linkend="guc-wal-level"/> is set to <literal>replica</literal>
+       and at least one valid logical replication slot exists on the system.
+      </para>
+     </listitem>
+    </itemizedlist>
+    <para>
+     If either condition is met, the operational WAL level becomes equivalent
+     to <literal>logical</literal>, which can be monitored through the
+     <xref linkend="guc-effective-wal-level"/> parameter.
+    </para>
+    <para>
+     When <varname>wal_level</varname> is set to <literal>replica</literal>,
+     logical decoding is automatically activated upon creation of the first
+     logical replication slot. This activation process involves several steps
+     and requires synchronization among processes, ensuring system-wide
+     consistency. Conversely, if <varname>wal_level</varname> is set to
+     <literal>replica</literal> and the last logical replication slot is dropped
+     or invalidated, logical decoding is automatically disabled. Note that the
+     deactivation of logical decoding might take some time as it is performed
+     asynchronously by the checkpointer process.
+    </para>
+
+    <caution>
+     <para>
+      When <varname>wal_level</varname> is set to <literal>replica</literal>,
+      dropping or invalidating the last logical slot disables logical decoding
+      on the primary, resulting in slots on standbys being invalidated.
+     </para>
+    </caution>
    </sect2>
 
    <sect2 id="logicaldecoding-replication-slots">
@@ -328,7 +369,7 @@ postgres=# SELECT * from pg_logical_slot_get_changes('regression_slot', NULL, NU
      that could be needed by the logical decoding on the standby (as it does
      not know about the <literal>catalog_xmin</literal> on the standby).
      Existing logical slots on standby also get invalidated if
-     <varname>wal_level</varname> on the primary is reduced to less than
+     <varname>effective_wal_level</varname> on the primary is reduced to less than
      <literal>logical</literal>.
      This is done as soon as the standby detects such a change in the WAL stream.
      It means that, for walsenders that are lagging (if any), some WAL records up
index 5a62187b189e6d6d6bc719c70ec1c259423edb87..e450c6a5b376944ab1e9f315b8c8cd98ff7677dc 100644 (file)
@@ -387,12 +387,12 @@ PostgreSQL documentation
    <para>
     The source server must accept connections from the target server.  The
     source server must not be in recovery. The source server must have <xref
-    linkend="guc-wal-level"/> as <literal>logical</literal>.  The source server
-    must have <xref linkend="guc-max-replication-slots"/> configured to a value
-    greater than or equal to the number of specified databases plus existing
-    replication slots.  The source server must have <xref
-    linkend="guc-max-wal-senders"/> configured to a value greater than or equal
-    to the number of specified databases and existing WAL sender processes.
+    linkend="guc-wal-level"/> as <literal>replica</literal> or <literal>logical</literal>.
+    The source server must have <xref linkend="guc-max-replication-slots"/>
+    configured to a value greater than or equal to the number of specified
+    databases plus existing replication slots.  The source server must have
+    <xref linkend="guc-max-wal-senders"/> configured to a value greater than or
+    equal to the number of specified databases and existing WAL sender processes.
    </para>
   </refsect2>
 
index 162c76b729ac54a5b6f484527066044b48afd50b..7ff7ca4f7193d5e46eaf6007423dab623f7f599b 100644 (file)
@@ -3062,8 +3062,9 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
         <listitem>
          <para>
           <literal>wal_level_insufficient</literal> means that the
-          primary doesn't have a <xref linkend="guc-wal-level"/> sufficient to
-          perform logical decoding.  It is set only for logical slots.
+          primary doesn't have an <xref linkend="guc-effective-wal-level"/>
+          sufficient to perform logical decoding. It is set only for logical
+          slots.
          </para>
         </listitem>
         <listitem>
index 98759e3bf70cda65e625065babb7623fcf5d2e4f..469397e73443964864279d1946f013cd27382c03 100644 (file)
@@ -8925,8 +8925,8 @@ log_heap_update(Relation reln, Buffer oldbuf,
     *
     * Skip this if we're taking a full-page image of the new page, as we
     * don't include the new tuple in the WAL record in that case.  Also
-    * disable if wal_level='logical', as logical decoding needs to be able to
-    * read the new tuple in whole from the WAL record alone.
+    * disable if effective_wal_level='logical', as logical decoding needs to
+    * be able to read the new tuple in whole from the WAL record alone.
     */
    if (oldbuf == newbuf && !need_tuple_data &&
        !XLogCheckBufferNeedsBackup(newbuf))
@@ -9098,8 +9098,8 @@ log_heap_update(Relation reln, Buffer oldbuf,
 /*
  * Perform XLogInsert of an XLOG_HEAP2_NEW_CID record
  *
- * This is only used in wal_level >= WAL_LEVEL_LOGICAL, and only for catalog
- * tuples.
+ * This is only used when effective_wal_level is logical, and only for
+ * catalog tuples.
  */
 static XLogRecPtr
 log_heap_new_cid(Relation relation, HeapTuple tup)
index 441034f5929cf1de544c07686a0512d6b3cb4eac..cd8edf5cc49029b95e3b0eeeba0c17bd57545942 100644 (file)
@@ -66,7 +66,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
        CheckPoint *checkpoint = (CheckPoint *) rec;
 
        appendStringInfo(buf, "redo %X/%08X; "
-                        "tli %u; prev tli %u; fpw %s; wal_level %s; xid %u:%u; oid %u; multi %u; offset %" PRIu64 "; "
+                        "tli %u; prev tli %u; fpw %s; wal_level %s; logical decoding %s; xid %u:%u; oid %u; multi %u; offset %" PRIu64 "; "
                         "oldest xid %u in DB %u; oldest multi %u in DB %u; "
                         "oldest/newest commit timestamp xid: %u/%u; "
                         "oldest running xid %u; %s",
@@ -75,6 +75,7 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
                         checkpoint->PrevTimeLineID,
                         checkpoint->fullPageWrites ? "true" : "false",
                         get_wal_level_string(checkpoint->wal_level),
+                        checkpoint->logicalDecodingEnabled ? "true" : "false",
                         EpochFromFullTransactionId(checkpoint->nextXid),
                         XidFromFullTransactionId(checkpoint->nextXid),
                         checkpoint->nextOid,
@@ -167,6 +168,13 @@ xlog_desc(StringInfo buf, XLogReaderState *record)
        memcpy(&wal_level, rec, sizeof(int));
        appendStringInfo(buf, "wal_level %s", get_wal_level_string(wal_level));
    }
+   else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE)
+   {
+       bool        enabled;
+
+       memcpy(&enabled, rec, sizeof(bool));
+       appendStringInfoString(buf, enabled ? "true" : "false");
+   }
 }
 
 const char *
@@ -218,6 +226,9 @@ xlog_identify(uint8 info)
        case XLOG_CHECKPOINT_REDO:
            id = "CHECKPOINT_REDO";
            break;
+       case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
+           id = "LOGICAL_DECODING_STATUS_CHANGE";
+           break;
    }
 
    return id;
index b69d452551ac1df787ea5c2e4c80bc0c324f95ba..1b5c1f6b7637c725fd758e2def5ec3b722d63881 100644 (file)
@@ -552,9 +552,9 @@ MarkCurrentTransactionIdLoggedIfAny(void)
  * operation in a subtransaction.  We require that for logical decoding, see
  * LogicalDecodingProcessRecord.
  *
- * This returns true if wal_level >= logical and we are inside a valid
- * subtransaction, for which the assignment was not yet written to any WAL
- * record.
+ * This returns true if effective_wal_level is logical and we are inside
+ * a valid subtransaction, for which the assignment was not yet written to
+ * any WAL record.
  */
 bool
 IsSubxactTopXidLogPending(void)
@@ -563,7 +563,7 @@ IsSubxactTopXidLogPending(void)
    if (CurrentTransactionState->topXidLogged)
        return false;
 
-   /* wal_level has to be logical */
+   /* effective_wal_level has to be logical */
    if (!XLogLogicalInfoActive())
        return false;
 
@@ -682,14 +682,14 @@ AssignTransactionId(TransactionState s)
    }
 
    /*
-    * When wal_level=logical, guarantee that a subtransaction's xid can only
-    * be seen in the WAL stream if its toplevel xid has been logged before.
-    * If necessary we log an xact_assignment record with fewer than
-    * PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't set
-    * for a transaction even though it appears in a WAL record, we just might
-    * superfluously log something. That can happen when an xid is included
-    * somewhere inside a wal record, but not in XLogRecord->xl_xid, like in
-    * xl_standby_locks.
+    * When effective_wal_level is logical, guarantee that a subtransaction's
+    * xid can only be seen in the WAL stream if its toplevel xid has been
+    * logged before. If necessary we log an xact_assignment record with fewer
+    * than PGPROC_MAX_CACHED_SUBXIDS. Note that it is fine if didLogXid isn't
+    * set for a transaction even though it appears in a WAL record, we just
+    * might superfluously log something. That can happen when an xid is
+    * included somewhere inside a wal record, but not in XLogRecord->xl_xid,
+    * like in xl_standby_locks.
     */
    if (isSubXact && XLogLogicalInfoActive() &&
        !TopTransactionStateData.didLogXid)
@@ -2489,6 +2489,7 @@ CommitTransaction(void)
    AtEOXact_Snapshot(true, false);
    AtEOXact_ApplyLauncher(true);
    AtEOXact_LogicalRepWorkers(true);
+   AtEOXact_LogicalCtl();
    pgstat_report_xact_timestamp(0);
 
    ResourceOwnerDelete(TopTransactionResourceOwner);
@@ -2784,6 +2785,7 @@ PrepareTransaction(void)
    /* we treat PREPARE as ROLLBACK so far as waking workers goes */
    AtEOXact_ApplyLauncher(false);
    AtEOXact_LogicalRepWorkers(false);
+   AtEOXact_LogicalCtl();
    pgstat_report_xact_timestamp(0);
 
    CurrentResourceOwner = NULL;
@@ -3011,6 +3013,7 @@ AbortTransaction(void)
        AtEOXact_PgStat(false, is_parallel_worker);
        AtEOXact_ApplyLauncher(false);
        AtEOXact_LogicalRepWorkers(false);
+       AtEOXact_LogicalCtl();
        pgstat_report_xact_timestamp(0);
    }
 
index 430a38b1a216acff593235544eca44693ce28f08..1b7ef589fc09737d453f43035b7acd6f12bd0e83 100644 (file)
@@ -80,6 +80,7 @@
 #include "postmaster/walwriter.h"
 #include "replication/origin.h"
 #include "replication/slot.h"
+#include "replication/slotsync.h"
 #include "replication/snapbuild.h"
 #include "replication/walreceiver.h"
 #include "replication/walsender.h"
@@ -4888,6 +4889,25 @@ show_in_hot_standby(void)
    return RecoveryInProgress() ? "on" : "off";
 }
 
+/*
+ * GUC show_hook for effective_wal_level
+ */
+const char *
+show_effective_wal_level(void)
+{
+   if (wal_level == WAL_LEVEL_MINIMAL)
+       return "minimal";
+
+   /*
+    * During recovery, effective_wal_level reflects the primary's
+    * configuration rather than the local wal_level value.
+    */
+   if (RecoveryInProgress())
+       return IsXLogLogicalInfoEnabled() ? "logical" : "replica";
+
+   return XLogLogicalInfoActive() ? "logical" : "replica";
+}
+
 /*
  * Read the control file, set respective GUCs.
  *
@@ -5134,6 +5154,7 @@ BootStrapXLOG(uint32 data_checksum_version)
    checkPoint.ThisTimeLineID = BootstrapTimeLineID;
    checkPoint.PrevTimeLineID = BootstrapTimeLineID;
    checkPoint.fullPageWrites = fullPageWrites;
+   checkPoint.logicalDecodingEnabled = (wal_level == WAL_LEVEL_LOGICAL);
    checkPoint.wal_level = wal_level;
    checkPoint.nextXid =
        FullTransactionIdFromEpochAndXid(0, FirstNormalTransactionId);
@@ -5658,6 +5679,12 @@ StartupXLOG(void)
     */
    StartupReplicationSlots();
 
+   /*
+    * Startup the logical decoding status with the last status stored in the
+    * checkpoint record.
+    */
+   StartupLogicalDecodingStatus(checkPoint.logicalDecodingEnabled);
+
    /*
     * Startup logical state, needs to be setup now so we have proper data
     * during crash recovery.
@@ -6206,6 +6233,12 @@ StartupXLOG(void)
     */
    CompleteCommitTsInitialization();
 
+   /*
+    * Update logical decoding status in shared memory and write an
+    * XLOG_LOGICAL_DECODING_STATUS_CHANGE, if necessary.
+    */
+   UpdateLogicalDecodingStatusEndOfRecovery();
+
    /* Clean up EndOfWalRecoveryInfo data to appease Valgrind leak checking */
    if (endOfRecoveryInfo->lastPage)
        pfree(endOfRecoveryInfo->lastPage);
@@ -6237,6 +6270,12 @@ StartupXLOG(void)
    UpdateControlFile();
    LWLockRelease(ControlFileLock);
 
+   /*
+    * Wake up the checkpointer process as there might be a request to disable
+    * logical decoding by concurrent slot drop.
+    */
+   WakeupCheckpointer();
+
    /*
     * Wake up all waiters for replay LSN.  They need to report an error that
     * recovery was ended before reaching the target LSN.
@@ -7187,6 +7226,8 @@ CreateCheckPoint(int flags)
        checkPoint.nextOid += TransamVariables->oidCount;
    LWLockRelease(OidGenLock);
 
+   checkPoint.logicalDecodingEnabled = IsLogicalDecodingEnabled();
+
    MultiXactGetCheckptMulti(shutdown,
                             &checkPoint.nextMulti,
                             &checkPoint.nextMultiOffset,
@@ -8581,21 +8622,6 @@ xlog_redo(XLogReaderState *record)
        /* Update our copy of the parameters in pg_control */
        memcpy(&xlrec, XLogRecGetData(record), sizeof(xl_parameter_change));
 
-       /*
-        * Invalidate logical slots if we are in hot standby and the primary
-        * does not have a WAL level sufficient for logical decoding. No need
-        * to search for potentially conflicting logically slots if standby is
-        * running with wal_level lower than logical, because in that case, we
-        * would have either disallowed creation of logical slots or
-        * invalidated existing ones.
-        */
-       if (InRecovery && InHotStandby &&
-           xlrec.wal_level < WAL_LEVEL_LOGICAL &&
-           wal_level >= WAL_LEVEL_LOGICAL)
-           InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
-                                              0, InvalidOid,
-                                              InvalidTransactionId);
-
        LWLockAcquire(ControlFileLock, LW_EXCLUSIVE);
        ControlFile->MaxConnections = xlrec.MaxConnections;
        ControlFile->max_worker_processes = xlrec.max_worker_processes;
@@ -8663,6 +8689,55 @@ xlog_redo(XLogReaderState *record)
    {
        /* nothing to do here, just for informational purposes */
    }
+   else if (info == XLOG_LOGICAL_DECODING_STATUS_CHANGE)
+   {
+       bool        status;
+
+       memcpy(&status, XLogRecGetData(record), sizeof(bool));
+
+       /*
+        * We need to toggle the logical decoding status and update the
+        * XLogLogicalInfo cache of processes synchronously because
+        * XLogLogicalInfoActive() is used even during read-only queries
+        * (e.g., via RelationIsAccessibleInLogicalDecoding()). In the
+        * 'disable' case, it is safe to invalidate existing slots after
+        * disabling logical decoding because logical decoding cannot process
+        * subsequent WAL records, which may not contain logical information.
+        */
+       if (status)
+           EnableLogicalDecoding();
+       else
+           DisableLogicalDecoding();
+
+       elog(DEBUG1, "update logical decoding status to %d during recovery",
+            status);
+
+       if (InRecovery && InHotStandby)
+       {
+           if (!status)
+           {
+               /*
+                * Invalidate logical slots if we are in hot standby and the
+                * primary disabled logical decoding.
+                */
+               InvalidateObsoleteReplicationSlots(RS_INVAL_WAL_LEVEL,
+                                                  0, InvalidOid,
+                                                  InvalidTransactionId);
+           }
+           else if (sync_replication_slots)
+           {
+               /*
+                * Signal the postmaster to launch the slotsync worker.
+                *
+                * XXX: For simplicity, we keep the slotsync worker running
+                * even after logical decoding is disabled. A future
+                * improvement can consider starting and stopping the worker
+                * based on logical decoding status change.
+                */
+               kill(PostmasterPid, SIGUSR1);
+           }
+       }
+   }
 }
 
 /*
index a198350895051053039cf2e820a72a3836624bb2..40a4efd7390c0ca7b4985b2659c87dcca8383b59 100644 (file)
@@ -975,11 +975,16 @@ CreatePublication(ParseState *pstate, CreatePublicationStmt *stmt)
 
    InvokeObjectPostCreateHook(PublicationRelationId, puboid, 0);
 
-   if (wal_level != WAL_LEVEL_LOGICAL)
+   /*
+    * We don't need this warning message when wal_level >= 'replica' since
+    * logical decoding is automatically enabled up on a logical slot
+    * creation.
+    */
+   if (wal_level < WAL_LEVEL_REPLICA)
        ereport(WARNING,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                errmsg("\"wal_level\" is insufficient to publish logical changes"),
-                errhint("Set \"wal_level\" to \"logical\" before creating subscriptions.")));
+                errmsg("logical decoding must be enabled to publish logical changes"),
+                errhint("Before creating subscriptions, ensure that \"wal_level\" is set to \"replica\" or higher.")));
 
    return myself;
 }
index 6b1a00ed477d3af8062469abbda0b8c855903541..1d9565b09fcd79034e42f05429d19afb874a1678 100644 (file)
@@ -2299,7 +2299,7 @@ ExecuteTruncateGuts(List *explicit_rels,
        xl_heap_truncate xlrec;
        int         i = 0;
 
-       /* should only get here if wal_level >= logical */
+       /* should only get here if effective_wal_level is 'logical' */
        Assert(XLogLogicalInfoActive());
 
        logrelids = palloc(list_length(relids_logged) * sizeof(Oid));
index 7f8cf1fa2ec8d7d44d059a11b4f21c16ca16473d..2eac8ac30d3284d810f0e5aca81c805a66dde628 100644 (file)
@@ -559,6 +559,12 @@ CheckpointerMain(const void *startup_data, size_t startup_data_len)
                break;
        }
 
+       /*
+        * Disable logical decoding if someone requested it. See comments atop
+        * logicalctl.c.
+        */
+       DisableLogicalDecodingIfNecessary();
+
        /* Check for archive_timeout and switch xlog files if necessary. */
        CheckArchiveTimeout();
 
@@ -1535,3 +1541,16 @@ FirstCallSinceLastCheckpoint(void)
 
    return FirstCall;
 }
+
+/*
+ * Wake up the checkpointer process.
+ */
+void
+WakeupCheckpointer(void)
+{
+   volatile PROC_HDR *procglobal = ProcGlobal;
+   ProcNumber  checkpointerProc = procglobal->checkpointerProc;
+
+   if (checkpointerProc != INVALID_PROC_NUMBER)
+       SetLatch(&GetPGProcByNumber(checkpointerProc)->procLatch);
+}
index 7dd3a201b1c99c111d7f1af04de275609148a459..cf44a677187186ee946f9a7f198c9faee96d862c 100644 (file)
@@ -854,9 +854,9 @@ PostmasterMain(int argc, char *argv[])
    if (summarize_wal && wal_level == WAL_LEVEL_MINIMAL)
        ereport(ERROR,
                (errmsg("WAL cannot be summarized when \"wal_level\" is \"minimal\"")));
-   if (sync_replication_slots && wal_level < WAL_LEVEL_LOGICAL)
+   if (sync_replication_slots && wal_level == WAL_LEVEL_MINIMAL)
        ereport(ERROR,
-               (errmsg("replication slot synchronization (\"sync_replication_slots\" = on) requires \"wal_level\" >= \"logical\"")));
+               (errmsg("replication slot synchronization (\"sync_replication_slots\" = on) requires \"wal_level\" to be \"replica\" or \"logical\"")));
 
    /*
     * Other one-time internal sanity checks can go here, if they are fast.
index c719af1f8a94e507357b736cfdcbb271364ae577..455768a57f0f32d079ddc771ebed9bad42706be5 100644 (file)
@@ -20,6 +20,7 @@ OBJS = \
    decode.o \
    launcher.o \
    logical.o \
+   logicalctl.o \
    logicalfuncs.o \
    message.o \
    origin.o \
index 5e15cb1825ed6f38ccd8261d975530d2b5048787..a1df8e1d6460bdcc03948aac5b5970182e2cd742 100644 (file)
@@ -149,39 +149,34 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
             * can restart from there.
             */
            break;
-       case XLOG_PARAMETER_CHANGE:
+       case XLOG_LOGICAL_DECODING_STATUS_CHANGE:
            {
-               xl_parameter_change *xlrec =
-                   (xl_parameter_change *) XLogRecGetData(buf->record);
+               bool        logical_decoding;
+
+               memcpy(&logical_decoding, XLogRecGetData(buf->record), sizeof(bool));
 
                /*
-                * If wal_level on the primary is reduced to less than
-                * logical, we want to prevent existing logical slots from
-                * being used.  Existing logical slots on the standby get
-                * invalidated when this WAL record is replayed; and further,
-                * slot creation fails when wal_level is not sufficient; but
-                * all these operations are not synchronized, so a logical
-                * slot may creep in while the wal_level is being reduced.
-                * Hence this extra check.
+                * Error out as we should not decode this WAL record.
+                *
+                * Logical decoding is disabled, and existing logical slots on
+                * the standby are invalidated when this WAL record is
+                * replayed. No logical decoder can process this WAL record
+                * until replay completes, and by then the slots are already
+                * invalidated. Furthermore, no new logical slots can be
+                * created while logical decoding is disabled. This cannot
+                * occur even on primary either, since it will not restart
+                * with wal_level < replica if any logical slots exist.
                 */
-               if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
-               {
-                   /*
-                    * This can occur only on a standby, as a primary would
-                    * not allow to restart after changing wal_level < logical
-                    * if there is pre-existing logical slot.
-                    */
-                   Assert(RecoveryInProgress());
-                   ereport(ERROR,
-                           (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                            errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
-               }
+               elog(ERROR, "unexpected logical decoding status change %d",
+                    logical_decoding);
+
                break;
            }
        case XLOG_NOOP:
        case XLOG_NEXTOID:
        case XLOG_SWITCH:
        case XLOG_BACKUP_END:
+       case XLOG_PARAMETER_CHANGE:
        case XLOG_RESTORE_POINT:
        case XLOG_FPW_CHANGE:
        case XLOG_FPI_FOR_HINT:
index 1b11ed63dc69b3c1dd1e7993cf20f362919e0d67..c8858e06616766b7c98d4482e92b602a249113c1 100644 (file)
@@ -117,31 +117,20 @@ CheckLogicalDecodingRequirements(void)
     * needs the same check.
     */
 
-   if (wal_level < WAL_LEVEL_LOGICAL)
-       ereport(ERROR,
-               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                errmsg("logical decoding requires \"wal_level\" >= \"logical\"")));
-
    if (MyDatabaseId == InvalidOid)
        ereport(ERROR,
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("logical decoding requires a database connection")));
 
-   if (RecoveryInProgress())
-   {
-       /*
-        * This check may have race conditions, but whenever
-        * XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
-        * verify that there are no existing logical replication slots. And to
-        * avoid races around creating a new slot,
-        * CheckLogicalDecodingRequirements() is called once before creating
-        * the slot, and once when logical decoding is initially starting up.
-        */
-       if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
-           ereport(ERROR,
-                   (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                    errmsg("logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary")));
-   }
+   /* CheckSlotRequirements() has already checked if wal_level >= 'replica' */
+   Assert(wal_level >= WAL_LEVEL_REPLICA);
+
+   /* Check if logical decoding is available on standby */
+   if (RecoveryInProgress() && !IsLogicalDecodingEnabled())
+       ereport(ERROR,
+               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                errmsg("logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"),
+                errhint("Set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\".")));
 }
 
 /*
diff --git a/src/backend/replication/logical/logicalctl.c b/src/backend/replication/logical/logicalctl.c
new file mode 100644 (file)
index 0000000..5a0ddf3
--- /dev/null
@@ -0,0 +1,639 @@
+/*-------------------------------------------------------------------------
+ * logicalctl.c
+ *     Functionality to control logical decoding status online.
+ *
+ * This module enables dynamic control of logical decoding availability.
+ * Logical decoding becomes active under two conditions: when the wal_level
+ * parameter is set to 'logical', or when at least one valid logical replication
+ * slot exists with wal_level set to 'replica'. The system disables logical
+ * decoding when neither condition is met. Therefore, the dynamic control
+ * of logical decoding availability is required only when wal_level is set
+ * to 'replica'. Logical decoding is always enabled when wal_level='logical'
+ * and always disabled when wal_level='minimal'.
+ *
+ * The core concept of dynamically enabling and disabling logical decoding
+ * is to separately control two aspects: writing information required for
+ * logical decoding to WAL records, and using logical decoding itself. During
+ * activation, we first enable logical WAL writing while keeping logical
+ * decoding disabled. This change is reflected in the read-only
+ * effective_wal_level GUC parameter. Once we ensure that all processes have
+ * updated to the latest effective_wal_level value, we then enable logical
+ * decoding. Deactivation follows a similar careful, multi-step process
+ * in reverse order.
+ *
+ * While activation occurs synchronously right after creating the first
+ * logical slot, deactivation happens asynchronously through the checkpointer
+ * process. This design avoids a race condition at the end of recovery; see
+ * the comments in UpdateLogicalDecodingStatusEndOfRecovery() for details.
+ * Asynchronous deactivation also avoids excessive toggling of the logical
+ * decoding status in workloads that repeatedly create and drop a single
+ * logical slot. On the other hand, this lazy approach can delay changes
+ * to effective_wal_level and the disabling logical decoding, especially
+ * when the checkpointer is busy with other tasks. We chose this lazy approach
+ * in all deactivation paths to keep the implementation simple, even though
+ * laziness is strictly required only for end-of-recovery cases. Future work
+ * might address this limitation either by using a dedicated worker instead
+ * of the checkpointer, or by implementing synchronous waiting during slot
+ * drops if workloads are significantly affected by the lazy deactivation
+ * of logical decoding.
+ *
+ * Standby servers use the primary server's effective_wal_level and logical
+ * decoding status. Unlike normal activation and deactivation, these
+ * are updated simultaneously without status change coordination, solely by
+ * replaying XLOG_LOGICAL_DECODING_STATUS_CHANGE records. The local wal_level
+ * setting has no effect during this time. Upon promotion, we update the
+ * logical decoding status based on local conditions: the wal_level value and
+ * the presence of logical slots.
+ *
+ * In the future, we could extend support to include automatic transitions
+ * of effective_wal_level between 'minimal' and 'logical' WAL levels. However,
+ * this enhancement would require additional coordination mechanisms and
+ * careful implementation of operations such as terminating walsenders and
+ * archiver processes while carefully considering the sequence of operations
+ * to ensure system stability during these transitions.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *       src/backend/replication/logical/logicalctl.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "access/xloginsert.h"
+#include "catalog/pg_control.h"
+#include "miscadmin.h"
+#include "replication/slot.h"
+#include "storage/ipc.h"
+#include "storage/lmgr.h"
+#include "storage/proc.h"
+#include "storage/procarray.h"
+#include "utils/injection_point.h"
+
+/*
+ * Struct for controlling the logical decoding status.
+ *
+ * This struct is protected by LogicalDecodingControlLock.
+ */
+typedef struct LogicalDecodingCtlData
+{
+   /*
+    * This is the authoritative value used by all processes to determine
+    * whether to write additional information required by logical decoding to
+    * WAL. Since this information could be checked frequently, each process
+    * caches this value in XLogLogicalInfo for better performance.
+    */
+   bool        xlog_logical_info;
+
+   /* True if logical decoding is available in the system */
+   bool        logical_decoding_enabled;
+
+   /* True if logical decoding might need to be disabled */
+   bool        pending_disable;
+} LogicalDecodingCtlData;
+
+static LogicalDecodingCtlData *LogicalDecodingCtl = NULL;
+
+/*
+ * A process-local cache of LogicalDecodingCtl->xlog_logical_info. This is
+ * initialized at process startup, and updated when processing the process
+ * barrier signal in ProcessBarrierUpdateXLogLogicalInfo(). If the process
+ * is in an XID-assigned transaction, the cache update is delayed until the
+ * transaction ends. See the comments for XLogLogicalInfoUpdatePending for details.
+ */
+bool       XLogLogicalInfo = false;
+
+/*
+ * When receiving the PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO signal, if
+ * an XID is assigned to the current transaction, the process sets this flag and
+ * delays the XLogLogicalInfo update until the transaction ends. This ensures
+ * that the XLogLogicalInfo value (typically accessed via XLogLogicalInfoActive)
+ * remains consistent throughout the transaction.
+ */
+static bool XLogLogicalInfoUpdatePending = false;
+
+static void update_xlog_logical_info(void);
+static void abort_logical_decoding_activation(int code, Datum arg);
+static void write_logical_decoding_status_update_record(bool status);
+
+Size
+LogicalDecodingCtlShmemSize(void)
+{
+   return sizeof(LogicalDecodingCtlData);
+}
+
+void
+LogicalDecodingCtlShmemInit(void)
+{
+   bool        found;
+
+   LogicalDecodingCtl = ShmemInitStruct("Logical decoding control",
+                                        LogicalDecodingCtlShmemSize(),
+                                        &found);
+
+   if (!found)
+       MemSet(LogicalDecodingCtl, 0, LogicalDecodingCtlShmemSize());
+}
+
+/*
+ * Initialize the logical decoding status in shmem at server startup. This
+ * must be called ONCE during postmaster or standalone-backend startup.
+ */
+void
+StartupLogicalDecodingStatus(bool last_status)
+{
+   /* Logical decoding is always disabled when 'minimal' WAL level */
+   if (wal_level == WAL_LEVEL_MINIMAL)
+       return;
+
+   /*
+    * Set the initial logical decoding status based on the last status. If
+    * logical decoding was enabled before the last shutdown, it remains
+    * enabled as we might have set wal_level='logical' or have at least one
+    * logical slot.
+    */
+   LogicalDecodingCtl->xlog_logical_info = last_status;
+   LogicalDecodingCtl->logical_decoding_enabled = last_status;
+}
+
+/*
+ * Update the XLogLogicalInfo cache.
+ */
+static inline void
+update_xlog_logical_info(void)
+{
+   XLogLogicalInfo = IsXLogLogicalInfoEnabled();
+}
+
+/*
+ * Initialize XLogLogicalInfo backend-private cache. This routine is called
+ * during process initialization.
+ */
+void
+InitializeProcessXLogLogicalInfo(void)
+{
+   update_xlog_logical_info();
+}
+
+/*
+ * This routine is called when we are told to update XLogLogicalInfo
+ * by a ProcSignalBarrier.
+ */
+bool
+ProcessBarrierUpdateXLogLogicalInfo(void)
+{
+   if (GetTopTransactionIdIfAny() != InvalidTransactionId)
+   {
+       /* Delay updating XLogLogicalInfo until the transaction end */
+       XLogLogicalInfoUpdatePending = true;
+   }
+   else
+       update_xlog_logical_info();
+
+   return true;
+}
+
+/*
+ * Check the shared memory state and return true if logical decoding is
+ * enabled on the system.
+ */
+bool
+IsLogicalDecodingEnabled(void)
+{
+   bool        enabled;
+
+   LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
+   enabled = LogicalDecodingCtl->logical_decoding_enabled;
+   LWLockRelease(LogicalDecodingControlLock);
+
+   return enabled;
+}
+
+/*
+ * Returns true if logical WAL logging is enabled based on the shared memory
+ * status.
+ */
+bool
+IsXLogLogicalInfoEnabled(void)
+{
+   bool        xlog_logical_info;
+
+   LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
+   xlog_logical_info = LogicalDecodingCtl->xlog_logical_info;
+   LWLockRelease(LogicalDecodingControlLock);
+
+   return xlog_logical_info;
+}
+
+/*
+ * Reset the local cache at end of the transaction.
+ */
+void
+AtEOXact_LogicalCtl(void)
+{
+   /* Update the local cache if there is a pending update */
+   if (XLogLogicalInfoUpdatePending)
+   {
+       update_xlog_logical_info();
+       XLogLogicalInfoUpdatePending = false;
+   }
+}
+
+/*
+ * Writes an XLOG_LOGICAL_DECODING_STATUS_CHANGE WAL record with the given
+ * status.
+ */
+static void
+write_logical_decoding_status_update_record(bool status)
+{
+   XLogRecPtr  recptr;
+
+   XLogBeginInsert();
+   XLogRegisterData(&status, sizeof(bool));
+   recptr = XLogInsert(RM_XLOG_ID, XLOG_LOGICAL_DECODING_STATUS_CHANGE);
+   XLogFlush(recptr);
+}
+
+/*
+ * A PG_ENSURE_ERROR_CLEANUP callback for activating logical decoding, resetting
+ * the shared flags to revert the logical decoding activation process.
+ */
+static void
+abort_logical_decoding_activation(int code, Datum arg)
+{
+   Assert(MyReplicationSlot);
+   Assert(!LogicalDecodingCtl->logical_decoding_enabled);
+
+   elog(DEBUG1, "aborting logical decoding activation process");
+
+   /*
+    * Abort the change to xlog_logical_info. We don't need to check
+    * CheckLogicalSlotExists() as we're still holding a logical slot.
+    */
+   LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+   LogicalDecodingCtl->xlog_logical_info = false;
+   LWLockRelease(LogicalDecodingControlLock);
+
+   /*
+    * Some processes might have already started logical info WAL logging, so
+    * tell all running processes to update their caches. We don't need to
+    * wait for all processes to disable xlog_logical_info locally as it's
+    * always safe to write logical information to WAL records, even when not
+    * strictly required.
+    */
+   EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+}
+
+/*
+ * Enable logical decoding if disabled.
+ *
+ * If this function is called during recovery, it simply returns without
+ * action since the logical decoding status change is not allowed during
+ * this time. The logical decoding status depends on the status on the primary.
+ * The caller should use CheckLogicalDecodingRequirements() before calling this
+ * function to make sure that the logical decoding status can be modified.
+ *
+ * Note that there is no interlock between logical decoding activation
+ * and slot creation. To ensure enabling logical decoding, the caller
+ * needs to call this function after creating a logical slot before
+ * initializing the logical decoding context.
+ */
+void
+EnsureLogicalDecodingEnabled(void)
+{
+   Assert(MyReplicationSlot);
+   Assert(wal_level >= WAL_LEVEL_REPLICA);
+
+   /* Logical decoding is always enabled */
+   if (wal_level >= WAL_LEVEL_LOGICAL)
+       return;
+
+   if (RecoveryInProgress())
+   {
+       /*
+        * CheckLogicalDecodingRequirements() must have already errored out if
+        * logical decoding is not enabled since we cannot enable the logical
+        * decoding status during recovery.
+        */
+       Assert(IsLogicalDecodingEnabled());
+       return;
+   }
+
+   /*
+    * Ensure to abort the activation process in cases where there in an
+    * interruption during the wait.
+    */
+   PG_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
+   {
+       EnableLogicalDecoding();
+   }
+   PG_END_ENSURE_ERROR_CLEANUP(abort_logical_decoding_activation, (Datum) 0);
+}
+
+/*
+ * A workhorse function to enable logical decoding.
+ */
+void
+EnableLogicalDecoding(void)
+{
+   bool        in_recovery;
+
+   LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+
+   /* Return if it is already enabled */
+   if (LogicalDecodingCtl->logical_decoding_enabled)
+   {
+       LogicalDecodingCtl->pending_disable = false;
+       LWLockRelease(LogicalDecodingControlLock);
+       return;
+   }
+
+   /*
+    * Set logical info WAL logging in shmem. All process starts after this
+    * point will include the information required by logical decoding to WAL
+    * records.
+    */
+   LogicalDecodingCtl->xlog_logical_info = true;
+
+   LWLockRelease(LogicalDecodingControlLock);
+
+   /*
+    * Tell all running processes to reflect the xlog_logical_info update, and
+    * wait. This ensures that all running processes have enabled logical
+    * information WAL logging.
+    */
+   WaitForProcSignalBarrier(
+                            EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
+
+   INJECTION_POINT("logical-decoding-activation", NULL);
+
+   in_recovery = RecoveryInProgress();
+
+   /*
+    * There could be some transactions that might have started with the old
+    * status, but we don't need to wait for these transactions to complete as
+    * long as they have valid XIDs. These transactions will appear in the
+    * xl_running_xacts record and therefore the snapshot builder will not try
+    * to decode the transaction during the logical decoding initialization.
+    *
+    * There is a theoretical case where a transaction decides whether to
+    * include logical-info to WAL records before getting an XID. In this
+    * case, the transaction won't appear in xl_running_xacts.
+    *
+    * For operations that do not require an XID assignment, the process
+    * starts including logical-info immediately upon receiving the signal
+    * (barrier). If such an operation checks the effective_wal_level multiple
+    * times within a single execution, the resulting WAL records might be
+    * inconsistent (i.e., logical-info is included in some records but not in
+    * others). However, this is harmless because logical decoding generally
+    * ignores WAL records that are not associated with an assigned XID.
+    *
+    * One might think we need to wait for all running transactions, including
+    * those without XIDs and read-only transactions, to finish before
+    * enabling logical decoding. However, such a requirement would force the
+    * slot creation to wait for a potentially very long time due to
+    * long-running read queries, which is practically unacceptable.
+    */
+
+   START_CRIT_SECTION();
+
+   /*
+    * We enable logical decoding first, followed by writing the WAL record.
+    * This sequence ensures logical decoding becomes available on the primary
+    * first.
+    */
+   LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+
+   LogicalDecodingCtl->logical_decoding_enabled = true;
+
+   if (!in_recovery)
+       write_logical_decoding_status_update_record(true);
+
+   LogicalDecodingCtl->pending_disable = false;
+
+   LWLockRelease(LogicalDecodingControlLock);
+
+   END_CRIT_SECTION();
+
+   if (!in_recovery)
+       ereport(LOG,
+               errmsg("logical decoding is enabled upon creating a new logical replication slot"));
+}
+
+/*
+ * Initiate a request for disabling logical decoding.
+ *
+ * Note that this function does not verify whether logical slots exist. The
+ * checkpointer will verify if logical decoding should actually be disabled.
+ */
+void
+RequestDisableLogicalDecoding(void)
+{
+   if (wal_level != WAL_LEVEL_REPLICA)
+       return;
+
+   /*
+    * It's possible that we might not actually need to disable logical
+    * decoding if someone creates a new logical slot concurrently. We set the
+    * flag anyway and the checkpointer will check it and disable logical
+    * decoding if necessary.
+    */
+   LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+   LogicalDecodingCtl->pending_disable = true;
+   LWLockRelease(LogicalDecodingControlLock);
+
+   WakeupCheckpointer();
+
+   elog(DEBUG1, "requested disabling logical decoding");
+}
+
+/*
+ * Disable logical decoding if necessary.
+ *
+ * This function disables logical decoding upon a request initiated by
+ * RequestDisableLogicalDecoding(). Otherwise, it performs no action.
+ */
+void
+DisableLogicalDecodingIfNecessary(void)
+{
+   bool        pending_disable;
+
+   if (wal_level != WAL_LEVEL_REPLICA)
+       return;
+
+   /*
+    * Sanity check as we cannot disable logical decoding while holding a
+    * logical slot.
+    */
+   Assert(!MyReplicationSlot);
+
+   if (RecoveryInProgress())
+       return;
+
+   LWLockAcquire(LogicalDecodingControlLock, LW_SHARED);
+   pending_disable = LogicalDecodingCtl->pending_disable;
+   LWLockRelease(LogicalDecodingControlLock);
+
+   /* Quick return if no pending disable request */
+   if (!pending_disable)
+       return;
+
+   DisableLogicalDecoding();
+}
+
+/*
+ * A workhorse function to disable logical decoding.
+ */
+void
+DisableLogicalDecoding(void)
+{
+   bool        in_recovery = RecoveryInProgress();
+
+   LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+
+   /*
+    * Check if we can disable logical decoding.
+    *
+    * Skip CheckLogicalSlotExists() check during recovery because the
+    * existing slots will be invalidated after disabling logical decoding.
+    */
+   if (!LogicalDecodingCtl->logical_decoding_enabled ||
+       (!in_recovery && CheckLogicalSlotExists()))
+   {
+       LogicalDecodingCtl->pending_disable = false;
+       LWLockRelease(LogicalDecodingControlLock);
+       return;
+   }
+
+   START_CRIT_SECTION();
+
+   /*
+    * We need to disable logical decoding first and then disable logical
+    * information WAL logging in order to ensure that no logical decoding
+    * processes WAL records with insufficient information.
+    */
+   LogicalDecodingCtl->logical_decoding_enabled = false;
+
+   /* Write the WAL to disable logical decoding on standbys too */
+   if (!in_recovery)
+       write_logical_decoding_status_update_record(false);
+
+   /* Now disable logical information WAL logging */
+   LogicalDecodingCtl->xlog_logical_info = false;
+   LogicalDecodingCtl->pending_disable = false;
+
+   END_CRIT_SECTION();
+
+   if (!in_recovery)
+       ereport(LOG,
+               errmsg("logical decoding is disabled because there are no valid logical replication slots"));
+
+   LWLockRelease(LogicalDecodingControlLock);
+
+   /*
+    * Tell all running processes to reflect the xlog_logical_info update.
+    * Unlike when enabling logical decoding, we don't need to wait for all
+    * processes to complete it in this case. We already disabled logical
+    * decoding and it's always safe to write logical information to WAL
+    * records, even when not strictly required. Therefore, we don't need to
+    * wait for all running transactions to finish either.
+    */
+   EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO);
+}
+
+/*
+ * Updates the logical decoding status at end of recovery, and ensures that
+ * all running processes have the updated XLogLogicalInfo status. This
+ * function must be called before accepting writes.
+ */
+void
+UpdateLogicalDecodingStatusEndOfRecovery(void)
+{
+   bool        new_status = false;
+
+   Assert(RecoveryInProgress());
+
+   /*
+    * With 'minimal' WAL level, there are no logical replication slots during
+    * recovery. Logical decoding is always disabled, so there is no need to
+    * synchronize XLogLogicalInfo.
+    */
+   if (wal_level == WAL_LEVEL_MINIMAL)
+   {
+       Assert(!IsXLogLogicalInfoEnabled() && !IsLogicalDecodingEnabled());
+       return;
+   }
+
+   LWLockAcquire(LogicalDecodingControlLock, LW_EXCLUSIVE);
+
+   if (wal_level == WAL_LEVEL_LOGICAL || CheckLogicalSlotExists())
+       new_status = true;
+
+   /*
+    * When recovery ends, we need to either enable or disable logical
+    * decoding based on the wal_level setting and the presence of logical
+    * slots. We need to note that concurrent slot creation and deletion could
+    * happen but WAL writes are still not permitted until recovery fully
+    * completes. Here's how we handle concurrent toggling of logical
+    * decoding:
+    *
+    * For 'enable' case, if there's a concurrent disable request before
+    * recovery fully completes, the checkpointer will handle it after
+    * recovery is done. This means there might be a brief period after
+    * recovery where logical decoding remains enabled even with no logical
+    * replication slots present. This temporary state is not new - it can
+    * already occur due to the checkpointer's asynchronous deactivation
+    * process.
+    *
+    * For 'disable' case, backend cannot create logical replication slots
+    * during recovery (see checks in CheckLogicalDecodingRequirements()),
+    * which prevents a race condition between disabling logical decoding and
+    * concurrent slot creation.
+    */
+   if (new_status != LogicalDecodingCtl->logical_decoding_enabled)
+   {
+       /*
+        * Update both the logical decoding status and logical WAL logging
+        * status. Unlike toggling these status during non-recovery, we don't
+        * need to worry about the operation order as WAL writes are still not
+        * permitted.
+        */
+       LogicalDecodingCtl->xlog_logical_info = new_status;
+       LogicalDecodingCtl->logical_decoding_enabled = new_status;
+
+       elog(DEBUG1,
+            "update logical decoding status to %d at the end of recovery",
+            new_status);
+
+       /*
+        * Now that we updated the logical decoding status, clear the pending
+        * disable flag. It's possible that a concurrent process drops the
+        * last logical slot and initiates the pending disable again. The
+        * checkpointer process will check it.
+        */
+       LogicalDecodingCtl->pending_disable = false;
+
+       LWLockRelease(LogicalDecodingControlLock);
+
+       write_logical_decoding_status_update_record(new_status);
+   }
+   else
+       LWLockRelease(LogicalDecodingControlLock);
+
+   /*
+    * Ensure all running processes have the updated status. We don't need to
+    * wait for running transactions to finish as we don't accept any writes
+    * yet. On the other hand, we need to wait for synchronizing
+    * XLogLogicalInfo even if we've not updated the status above as the
+    * status have been turned on and off during recovery, having running
+    * processes have different status on their local caches.
+    */
+   if (IsUnderPostmaster)
+       WaitForProcSignalBarrier(
+                                EmitProcSignalBarrier(PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO));
+
+   INJECTION_POINT("startup-logical-decoding-status-change-end-of-recovery", NULL);
+}
index a2268d8361eee0d0b314895c3e243ac576311402..928b503addf5414ea8b958b70f50b10e5e44bf81 100644 (file)
@@ -6,6 +6,7 @@ backend_sources += files(
   'decode.c',
   'launcher.c',
   'logical.c',
+  'logicalctl.c',
   'logicalfuncs.c',
   'message.c',
   'origin.c',
index bf50317b4436e198ffae4e57cb6555c4e661fae4..2aea776352d949c398343871078940ced452e9b2 100644 (file)
@@ -1201,13 +1201,15 @@ bool
 ValidateSlotSyncParams(int elevel)
 {
    /*
-    * Logical slot sync/creation requires wal_level >= logical.
+    * Logical slot sync/creation requires logical decoding to be enabled.
     */
-   if (wal_level < WAL_LEVEL_LOGICAL)
+   if (!IsLogicalDecodingEnabled())
    {
        ereport(elevel,
                errcode(ERRCODE_INVALID_PARAMETER_VALUE),
-               errmsg("replication slot synchronization requires \"wal_level\" >= \"logical\""));
+               errmsg("replication slot synchronization requires \"effective_wal_level\" >= \"logical\" on the primary"),
+               errhint("To enable logical decoding on primary, set \"wal_level\" >= \"logical\" or create at least one logical slot when \"wal_level\" = \"replica\"."));
+
        return false;
    }
 
index 682eccd116c1cbc3e2b67629100ecf7014f3dd57..58c41d455167844956b1394b61ddc5e9245274ba 100644 (file)
@@ -765,16 +765,15 @@ ReplicationSlotRelease(void)
 {
    ReplicationSlot *slot = MyReplicationSlot;
    char       *slotname = NULL;    /* keep compiler quiet */
-   bool        is_logical = false; /* keep compiler quiet */
+   bool        is_logical;
    TimestampTz now = 0;
 
    Assert(slot != NULL && slot->active_pid != 0);
 
+   is_logical = SlotIsLogical(slot);
+
    if (am_walsender)
-   {
        slotname = pstrdup(NameStr(slot->data.name));
-       is_logical = SlotIsLogical(slot);
-   }
 
    if (slot->data.persistency == RS_EPHEMERAL)
    {
@@ -784,6 +783,14 @@ ReplicationSlotRelease(void)
         * data.
         */
        ReplicationSlotDropAcquired();
+
+       /*
+        * Request to disable logical decoding, even though this slot may not
+        * have been the last logical slot. The checkpointer will verify if
+        * logical decoding should actually be disabled.
+        */
+       if (is_logical)
+           RequestDisableLogicalDecoding();
    }
 
    /*
@@ -848,15 +855,21 @@ ReplicationSlotRelease(void)
  *
  * Cleanup only synced temporary slots if 'synced_only' is true, else
  * cleanup all temporary slots.
+ *
+ * If it drops the last logical slot in the cluster, requests to disable
+ * logical decoding.
  */
 void
 ReplicationSlotCleanup(bool synced_only)
 {
    int         i;
+   bool        found_valid_logicalslot;
+   bool        dropped_logical = false;
 
    Assert(MyReplicationSlot == NULL);
 
 restart:
+   found_valid_logicalslot = false;
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
@@ -866,6 +879,10 @@ restart:
            continue;
 
        SpinLockAcquire(&s->mutex);
+
+       found_valid_logicalslot |=
+           (SlotIsLogical(s) && s->data.invalidated == RS_INVAL_NONE);
+
        if ((s->active_pid == MyProcPid &&
             (!synced_only || s->data.synced)))
        {
@@ -873,6 +890,9 @@ restart:
            SpinLockRelease(&s->mutex);
            LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
 
+           if (SlotIsLogical(s))
+               dropped_logical = true;
+
            ReplicationSlotDropPtr(s);
 
            ConditionVariableBroadcast(&s->active_cv);
@@ -883,6 +903,9 @@ restart:
    }
 
    LWLockRelease(ReplicationSlotControlLock);
+
+   if (dropped_logical && !found_valid_logicalslot)
+       RequestDisableLogicalDecoding();
 }
 
 /*
@@ -891,6 +914,8 @@ restart:
 void
 ReplicationSlotDrop(const char *name, bool nowait)
 {
+   bool        is_logical;
+
    Assert(MyReplicationSlot == NULL);
 
    ReplicationSlotAcquire(name, nowait, false);
@@ -905,7 +930,12 @@ ReplicationSlotDrop(const char *name, bool nowait)
                errmsg("cannot drop replication slot \"%s\"", name),
                errdetail("This replication slot is being synchronized from the primary server."));
 
+   is_logical = SlotIsLogical(MyReplicationSlot);
+
    ReplicationSlotDropAcquired();
+
+   if (is_logical)
+       RequestDisableLogicalDecoding();
 }
 
 /*
@@ -1436,16 +1466,22 @@ ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
  *
  * This routine isn't as efficient as it could be - but we don't drop
  * databases often, especially databases with lots of slots.
+ *
+ * If it drops the last logical slot in the cluster, it requests to disable
+ * logical decoding.
  */
 void
 ReplicationSlotsDropDBSlots(Oid dboid)
 {
    int         i;
+   bool        found_valid_logicalslot;
+   bool        dropped = false;
 
    if (max_replication_slots <= 0)
        return;
 
 restart:
+   found_valid_logicalslot = false;
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (i = 0; i < max_replication_slots; i++)
    {
@@ -1463,11 +1499,19 @@ restart:
        if (!SlotIsLogical(s))
            continue;
 
+       /*
+        * Check logical slots on other databases too so we can disable
+        * logical decoding only if no slots in the cluster.
+        */
+       SpinLockAcquire(&s->mutex);
+       found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
+       SpinLockRelease(&s->mutex);
+
        /* not our database, skip */
        if (s->data.database != dboid)
            continue;
 
-       /* NB: intentionally including invalidated slots */
+       /* NB: intentionally including invalidated slots to drop */
 
        /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
        SpinLockAcquire(&s->mutex);
@@ -1519,11 +1563,55 @@ restart:
         */
        LWLockRelease(ReplicationSlotControlLock);
        ReplicationSlotDropAcquired();
+       dropped = true;
        goto restart;
    }
    LWLockRelease(ReplicationSlotControlLock);
+
+   if (dropped && !found_valid_logicalslot)
+       RequestDisableLogicalDecoding();
 }
 
+/*
+ * Returns true if there is at least one in-use valid logical replication slot.
+ */
+bool
+CheckLogicalSlotExists(void)
+{
+   bool        found = false;
+
+   if (max_replication_slots <= 0)
+       return false;
+
+   LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+   for (int i = 0; i < max_replication_slots; i++)
+   {
+       ReplicationSlot *s;
+       bool        invalidated;
+
+       s = &ReplicationSlotCtl->replication_slots[i];
+
+       /* cannot change while ReplicationSlotCtlLock is held */
+       if (!s->in_use)
+           continue;
+
+       if (SlotIsPhysical(s))
+           continue;
+
+       SpinLockAcquire(&s->mutex);
+       invalidated = s->data.invalidated != RS_INVAL_NONE;
+       SpinLockRelease(&s->mutex);
+
+       if (invalidated)
+           continue;
+
+       found = true;
+       break;
+   }
+   LWLockRelease(ReplicationSlotControlLock);
+
+   return found;
+}
 
 /*
  * Check whether the server's configuration supports using replication
@@ -1686,7 +1774,7 @@ ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
            break;
 
        case RS_INVAL_WAL_LEVEL:
-           appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
+           appendStringInfoString(&err_detail, _("Logical decoding on standby requires the primary server to either set \"wal_level\" >= \"logical\" or have at least one logical slot when \"wal_level\" = \"replica\"."));
            break;
 
        case RS_INVAL_IDLE_TIMEOUT:
@@ -1828,10 +1916,11 @@ DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
  *
  * Acquires the given slot and mark it invalid, if necessary and possible.
  *
- * Returns whether ReplicationSlotControlLock was released in the interim (and
- * in that case we're not holding the lock at return, otherwise we are).
+ * Returns true if the slot was invalidated.
  *
- * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
+ * Set *released_lock_out if ReplicationSlotControlLock was released in the
+ * interim (and in that case we're not holding the lock at return, otherwise
+ * we are).
  *
  * This is inherently racy, because we release the LWLock
  * for syscalls, so caller must restart if we return true.
@@ -1841,10 +1930,11 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
                               ReplicationSlot *s,
                               XLogRecPtr oldestLSN,
                               Oid dboid, TransactionId snapshotConflictHorizon,
-                              bool *invalidated)
+                              bool *released_lock_out)
 {
    int         last_signaled_pid = 0;
    bool        released_lock = false;
+   bool        invalidated = false;
    TimestampTz inactive_since = 0;
 
    for (;;)
@@ -1933,7 +2023,7 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
            }
 
            /* Let caller know */
-           *invalidated = true;
+           invalidated = true;
        }
 
        SpinLockRelease(&s->mutex);
@@ -2041,7 +2131,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
 
    Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
 
-   return released_lock;
+   *released_lock_out = released_lock;
+   return invalidated;
 }
 
 /*
@@ -2054,7 +2145,8 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
  * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
  * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
  *   db; dboid may be InvalidOid for shared relations
- * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
+ * - RS_INVAL_WAL_LEVEL: is a logical slot and effective_wal_level is not
+ *   logical.
  * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
  *   "idle_replication_slot_timeout" duration.
  *
@@ -2062,6 +2154,9 @@ InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
  * causes in a single pass, minimizing redundant iterations. The "cause"
  * parameter can be a MASK representing one or more of the defined causes.
  *
+ * If it invalidates the last logical slot in the cluster, it requests to
+ * disable logical decoding.
+ *
  * NB - this runs as part of checkpoint, so avoid raising errors if possible.
  */
 bool
@@ -2071,6 +2166,8 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
 {
    XLogRecPtr  oldestLSN;
    bool        invalidated = false;
+   bool        invalidated_logical = false;
+   bool        found_valid_logicalslot;
 
    Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
    Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
@@ -2082,25 +2179,58 @@ InvalidateObsoleteReplicationSlots(uint32 possible_causes,
    XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
 
 restart:
+   found_valid_logicalslot = false;
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
    for (int i = 0; i < max_replication_slots; i++)
    {
        ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
+       bool        released_lock = false;
 
        if (!s->in_use)
            continue;
 
        /* Prevent invalidation of logical slots during binary upgrade */
        if (SlotIsLogical(s) && IsBinaryUpgrade)
+       {
+           SpinLockAcquire(&s->mutex);
+           found_valid_logicalslot |= (s->data.invalidated == RS_INVAL_NONE);
+           SpinLockRelease(&s->mutex);
+
            continue;
+       }
 
-       if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
-                                          snapshotConflictHorizon,
-                                          &invalidated))
+       if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN,
+                                          dboid, snapshotConflictHorizon,
+                                          &released_lock))
        {
-           /* if the lock was released, start from scratch */
-           goto restart;
+           Assert(released_lock);
+
+           /* Remember we have invalidated a physical or logical slot */
+           invalidated = true;
+
+           /*
+            * Additionally, remember we have invalidated a logical slot as we
+            * can request disabling logical decoding later.
+            */
+           if (SlotIsLogical(s))
+               invalidated_logical = true;
+       }
+       else
+       {
+           /*
+            * We need to check if the slot is invalidated here since
+            * InvalidatePossiblyObsoleteSlot() returns false also if the slot
+            * is already invalidated.
+            */
+           SpinLockAcquire(&s->mutex);
+           found_valid_logicalslot |=
+               (SlotIsLogical(s) && (s->data.invalidated == RS_INVAL_NONE));
+           SpinLockRelease(&s->mutex);
        }
+
+       /* if the lock was released, start from scratch */
+       if (released_lock)
+           goto restart;
    }
    LWLockRelease(ReplicationSlotControlLock);
 
@@ -2113,6 +2243,15 @@ restart:
        ReplicationSlotsComputeRequiredLSN();
    }
 
+   /*
+    * Request the checkpointer to disable logical decoding if no valid
+    * logical slots remain. If called by the checkpointer during a
+    * checkpoint, only the request is initiated; actual deactivation is
+    * deferred until after the checkpoint completes.
+    */
+   if (invalidated_logical && !found_valid_logicalslot)
+       RequestDisableLogicalDecoding();
+
    return invalidated;
 }
 
@@ -2648,19 +2787,20 @@ RestoreSlotFromDisk(const char *name)
     */
    if (cp.slotdata.database != InvalidOid)
    {
-       if (wal_level < WAL_LEVEL_LOGICAL)
+       if (wal_level < WAL_LEVEL_REPLICA)
            ereport(FATAL,
                    (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-                    errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
+                    errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
                            NameStr(cp.slotdata.name)),
-                    errhint("Change \"wal_level\" to be \"logical\" or higher.")));
+                    errhint("Change \"wal_level\" to be \"replica\" or higher.")));
 
        /*
         * In standby mode, the hot standby must be enabled. This check is
         * necessary to ensure logical slots are invalidated when they become
         * incompatible due to insufficient wal_level. Otherwise, if the
-        * primary reduces wal_level < logical while hot standby is disabled,
-        * logical slots would remain valid even after promotion.
+        * primary reduces effective_wal_level < logical while hot standby is
+        * disabled, primary disable logical decoding while hot standby is
+        * disabled, logical slots would remain valid even after promotion.
         */
        if (StandbyMode && !EnableHotStandby)
            ereport(FATAL,
index 7647f0515816085fd5fa9a3c611b0a3793be599f..70a27f83f2931f8db39b92b513a849d775b38720 100644 (file)
@@ -147,6 +147,13 @@ create_logical_replication_slot(char *name, char *plugin,
                          temporary ? RS_TEMPORARY : RS_EPHEMERAL, two_phase,
                          failover, false);
 
+   /*
+    * Ensure the logical decoding is enabled before initializing the logical
+    * decoding context.
+    */
+   EnsureLogicalDecodingEnabled();
+   Assert(IsLogicalDecodingEnabled());
+
    /*
     * Create logical decoding context to find start point or, if we don't
     * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
index 449632ad1aa3dc4d13f6c6151165a6ce678fed3a..96cede8f45a94588f12668cf7b9c2fde258f3c88 100644 (file)
@@ -1297,6 +1297,13 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
            need_full_snapshot = true;
        }
 
+       /*
+        * Ensure the logical decoding is enabled before initializing the
+        * logical decoding context.
+        */
+       EnsureLogicalDecodingEnabled();
+       Assert(IsLogicalDecodingEnabled());
+
        ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
                                        InvalidXLogRecPtr,
                                        XL_ROUTINE(.page_read = logical_read_xlog_page,
index b23d0c19360ade87942466412ba410f6216ec110..adebba625e69103dec07e9eec3ff01bf3947fecd 100644 (file)
@@ -140,6 +140,7 @@ CalculateShmemSize(void)
    size = add_size(size, SlotSyncShmemSize());
    size = add_size(size, AioShmemSize());
    size = add_size(size, WaitLSNShmemSize());
+   size = add_size(size, LogicalDecodingCtlShmemSize());
 
    /* include additional requested shmem from preload libraries */
    size = add_size(size, total_addin_request);
@@ -328,6 +329,7 @@ CreateOrAttachShmemStructs(void)
    InjectionPointShmemInit();
    AioShmemInit();
    WaitLSNShmemInit();
+   LogicalDecodingCtlShmemInit();
 }
 
 /*
index 087821311cceb4d0445aa341b7d7d924fc703fec..b0b93d9609184ce407f46cafecdcd94d0c1f6ee7 100644 (file)
@@ -576,6 +576,9 @@ ProcessProcSignalBarrier(void)
                    case PROCSIGNAL_BARRIER_SMGRRELEASE:
                        processed = ProcessBarrierSmgrRelease();
                        break;
+                   case PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO:
+                       processed = ProcessBarrierUpdateXLogLogicalInfo();
+                       break;
                }
 
                /*
index fc45d72c79bcea4ba13c74e2651f8eac58588e71..773832c3a36cbd43c16aeb6d07da3f0415c25e9a 100644 (file)
@@ -499,7 +499,7 @@ ResolveRecoveryConflictWithSnapshot(TransactionId snapshotConflictHorizon,
     * seems OK, given that this kind of conflict should not normally be
     * reached, e.g. due to using a physical replication slot.
     */
-   if (wal_level >= WAL_LEVEL_LOGICAL && isCatalogRel)
+   if (IsLogicalDecodingEnabled() && isCatalogRel)
        InvalidateObsoleteReplicationSlots(RS_INVAL_HORIZON, 0, locator.dbOid,
                                           snapshotConflictHorizon);
 }
@@ -1285,6 +1285,7 @@ LogStandbySnapshot(void)
    RunningTransactions running;
    xl_standby_lock *locks;
    int         nlocks;
+   bool        logical_decoding_enabled = IsLogicalDecodingEnabled();
 
    Assert(XLogStandbyInfoActive());
 
@@ -1325,13 +1326,13 @@ LogStandbySnapshot(void)
     * record. Fortunately this routine isn't executed frequently, and it's
     * only a shared lock.
     */
-   if (wal_level < WAL_LEVEL_LOGICAL)
+   if (!logical_decoding_enabled)
        LWLockRelease(ProcArrayLock);
 
    recptr = LogCurrentRunningXacts(running);
 
    /* Release lock if we kept it longer ... */
-   if (wal_level >= WAL_LEVEL_LOGICAL)
+   if (logical_decoding_enabled)
        LWLockRelease(ProcArrayLock);
 
    /* GetRunningTransactionData() acquired XidGenLock, we must release it */
index c0632bf901ad2f87f6442e9c7de2ab0bddc2f30e..dcfadbd5aaec053be9c19f61b6f3c3b69ee7d56d 100644 (file)
@@ -359,6 +359,7 @@ InjectionPoint  "Waiting to read or update information related to injection point
 SerialControl  "Waiting to read or update shared <filename>pg_serial</filename> state."
 AioWorkerSubmissionQueue   "Waiting to access AIO worker submission queue."
 WaitLSN    "Waiting to read or update shared Wait-for-LSN state."
+LogicalDecodingControl "Waiting to read or update logical decoding status information."
 
 #
 # END OF PREDEFINED LWLOCKS (DO NOT CHANGE THIS LINE)
index 8f7a56d0f2c583ae0a76a3ef6be74cc0b6ba201a..11ed876264c11c4d775ff2c2227ba101f177a985 100644 (file)
@@ -98,9 +98,9 @@
  * likewise send the invalidation immediately, before ending the change's
  * critical section.  This includes inplace heap updates, relmap, and smgr.
  *
- * When wal_level=logical, write invalidations into WAL at each command end to
- * support the decoding of the in-progress transactions.  See
- * CommandEndInvalidationMessages.
+ * When effective_wal_level is 'logical', write invalidations into WAL at
+ * each command end to support the decoding of the in-progress transactions.
+ * See CommandEndInvalidationMessages.
  *
  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -1419,7 +1419,7 @@ CommandEndInvalidationMessages(void)
    ProcessInvalidationMessages(&transInvalInfo->ii.CurrentCmdInvalidMsgs,
                                LocalExecuteInvalidationMessage);
 
-   /* WAL Log per-command invalidation messages for wal_level=logical */
+   /* WAL Log per-command invalidation messages for logical decoding */
    if (XLogLogicalInfoActive())
        LogLogicalInvalidations();
 
index 4ed69ac7ba2960929b2ccf5b166e5269efa27629..b7e94ca45bdb1c189b6ad4ca5b44aaa7550edbf0 100644 (file)
@@ -653,6 +653,9 @@ BaseInit(void)
    /* Initialize lock manager's local structs */
    InitLockManagerAccess();
 
+   /* Initialize logical info WAL logging state */
+   InitializeProcessXLogLogicalInfo();
+
    /*
     * Initialize replication slots after pgstat. The exit hook might need to
     * drop ephemeral slots, which in turn triggers stats reporting.
index 3b9d8349078b693125aa4704fbc4b7693837197d..ac0c7c36c56179ca396fce7f04b8e51b14ecaeb4 100644 (file)
   max => 'MAX_IO_CONCURRENCY',
 },
 
+{ name => 'effective_wal_level', type => 'enum', context => 'PGC_INTERNAL', group => 'PRESET_OPTIONS',
+  short_desc => 'Show effective WAL level.',
+  flags => 'GUC_NOT_IN_SAMPLE | GUC_DISALLOW_IN_FILE',
+  variable => 'effective_wal_level',
+  boot_val => 'WAL_LEVEL_REPLICA',
+  options => 'wal_level_options',
+  show_hook => 'show_effective_wal_level',
+},
+
 { name => 'enable_async_append', type => 'bool', context => 'PGC_USERSET', group => 'QUERY_TUNING_METHOD',
   short_desc => 'Enables the planner\'s use of async append plans.',
   flags => 'GUC_EXPLAIN',
index f87b558c2c66c1d103f0cd5d2e3bead7a90f8ff8..04ab0a266088a25865a12d9ae02d35b74f50deef 100644 (file)
@@ -617,6 +617,7 @@ static int  shared_memory_size_mb;
 static int shared_memory_size_in_huge_pages;
 static int wal_block_size;
 static int num_os_semaphores;
+static int effective_wal_level = WAL_LEVEL_REPLICA;
 static bool data_checksums;
 static bool integer_datetimes;
 
index 41a649297c738357f2615f97e2d7755170c3a7c6..dab4dfb3a52d65fd1f6b8dcb6a5646aaad0e7fe5 100644 (file)
@@ -953,7 +953,7 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
     * Since these parameters are not a requirement for physical replication,
     * we should check it to make sure it won't fail.
     *
-    * - wal_level = logical
+    * - wal_level >= replica
     * - max_replication_slots >= current + number of dbs to be converted
     * - max_wal_senders >= current + number of dbs to be converted
     * - max_slot_wal_keep_size = -1 (to prevent deletion of required WAL files)
@@ -997,9 +997,9 @@ check_publisher(const struct LogicalRepInfo *dbinfo)
 
    disconnect_database(conn, false);
 
-   if (strcmp(wal_level, "logical") != 0)
+   if (strcmp(wal_level, "minimal") == 0)
    {
-       pg_log_error("publisher requires \"wal_level\" >= \"logical\"");
+       pg_log_error("publisher requires \"wal_level\" >= \"replica\"");
        failed = true;
    }
 
index 9e0db6cd09914bcd74f5c9081fc8c26ea046c4fa..4657172c9ac790e752affbb3ce9bf0651b81c848 100644 (file)
@@ -240,7 +240,6 @@ command_fails(
 # Check some unmet conditions on node P
 $node_p->append_conf(
    'postgresql.conf', q{
-wal_level = replica
 max_replication_slots = 1
 max_wal_senders = 1
 max_worker_processes = 2
@@ -265,7 +264,6 @@ command_fails(
 # standby settings should not be a lower setting than on the primary.
 $node_p->append_conf(
    'postgresql.conf', q{
-wal_level = logical
 max_replication_slots = 10
 max_wal_senders = 10
 max_worker_processes = 8
index 1e17d64b3ec633eab6c9d61ca7d15f6e46ee536e..9cdeb15bd51f67bbcb82a7121ac1d63097fc8a8c 100644 (file)
@@ -2131,11 +2131,7 @@ check_new_cluster_replication_slots(void)
 
    wal_level = PQgetvalue(res, 0, 0);
 
-   if (nslots_on_old > 0 && strcmp(wal_level, "logical") != 0)
-       pg_fatal("\"wal_level\" must be \"logical\" but is set to \"%s\"",
-                wal_level);
-
-   if (old_cluster.sub_retain_dead_tuples &&
+   if ((nslots_on_old > 0 || old_cluster.sub_retain_dead_tuples) &&
        strcmp(wal_level, "minimal") == 0)
        pg_fatal("\"wal_level\" must be \"replica\" or \"logical\" but is set to \"%s\"",
                 wal_level);
index 823f41e754ced43859ccffddc62bf1d6fb77902a..587b683aec12e43a93061fa784f6f45a96480a0e 100644 (file)
@@ -225,6 +225,10 @@ $oldnode->init(%old_node_params);
 # Override log_statement=all set by Cluster.pm.  This avoids large amounts
 # of log traffic that slow this test down even more when run under valgrind.
 $oldnode->append_conf('postgresql.conf', 'log_statement = none');
+
+# Set wal_level = replica to run the regression tests in the same
+# wal_level as when 'make check' runs.
+$oldnode->append_conf('postgresql.conf', 'wal_level = replica');
 $oldnode->start;
 
 my $result;
index 605280ed8fb62b735f53427c7f0557a2f52caecd..9af3318ef252efc4cdc9b4b69ab0de8b40229b41 100644 (file)
@@ -13,6 +13,7 @@
 
 #include "access/xlogbackup.h"
 #include "access/xlogdefs.h"
+#include "replication/logicalctl.h"
 #include "datatype/timestamp.h"
 #include "lib/stringinfo.h"
 #include "nodes/pg_list.h"
@@ -94,6 +95,7 @@ typedef enum RecoveryState
 } RecoveryState;
 
 extern PGDLLIMPORT int wal_level;
+extern PGDLLIMPORT bool XLogLogicalInfo;
 
 /* Is WAL archiving enabled (always or only while server is running normally)? */
 #define XLogArchivingActive() \
@@ -122,8 +124,17 @@ extern PGDLLIMPORT int wal_level;
 /* Do we need to WAL-log information required only for Hot Standby and logical replication? */
 #define XLogStandbyInfoActive() (wal_level >= WAL_LEVEL_REPLICA)
 
-/* Do we need to WAL-log information required only for logical replication? */
-#define XLogLogicalInfoActive() (wal_level >= WAL_LEVEL_LOGICAL)
+/*
+ * Do we need to WAL-log information required only for logical replication?
+ *
+ * When XLogLogicalInfoActive() returns true, it enables logical-decoding-related
+ * WAL logging as if wal_level were set to 'logical', even if it's actually set
+ * to 'replica'. Note that XLogLogicalInfo is a process-local cache and can
+ * change until an XID is assigned to the transaction. In other words, it
+ * ensures that the same result is returned within an XID-assigned transaction.
+ */
+#define XLogLogicalInfoActive() \
+    (wal_level >= WAL_LEVEL_LOGICAL || XLogLogicalInfo)
 
 #ifdef WAL_DEBUG
 extern PGDLLIMPORT bool XLOG_DEBUG;
@@ -257,6 +268,8 @@ extern XLogRecPtr GetLastImportantRecPtr(void);
 
 extern void SetWalWriterSleeping(bool sleeping);
 
+extern void WakeupCheckpointer(void);
+
 extern Size WALReadFromBuffers(char *dstbuf, XLogRecPtr startptr, Size count,
                               TimeLineID tli);
 
index 293e9e03f599f4fe6cb4390a3709a1b62d95d351..742b152b51bfdca991b25cba22b6467aca8902dd 100644 (file)
@@ -22,7 +22,7 @@
 
 
 /* Version identifier for this pg_control format */
-#define PG_CONTROL_VERSION 1900
+#define PG_CONTROL_VERSION 1901
 
 /* Nonce key length, see below */
 #define MOCK_AUTH_NONCE_LEN        32
@@ -41,6 +41,7 @@ typedef struct CheckPoint
                                 * timeline (equals ThisTimeLineID otherwise) */
    bool        fullPageWrites; /* current full_page_writes */
    int         wal_level;      /* current wal_level */
+   bool        logicalDecodingEnabled; /* current logical decoding status */
    FullTransactionId nextXid;  /* next free transaction ID */
    Oid         nextOid;        /* next free OID */
    MultiXactId nextMulti;      /* next free MultiXactId */
@@ -80,6 +81,7 @@ typedef struct CheckPoint
 /* 0xC0 is used in Postgres 9.5-11 */
 #define XLOG_OVERWRITE_CONTRECORD      0xD0
 #define XLOG_CHECKPOINT_REDO           0xE0
+#define XLOG_LOGICAL_DECODING_STATUS_CHANGE    0xF0
 
 
 /*
diff --git a/src/include/replication/logicalctl.h b/src/include/replication/logicalctl.h
new file mode 100644 (file)
index 0000000..fbe4fa5
--- /dev/null
@@ -0,0 +1,33 @@
+/*-------------------------------------------------------------------------
+ *
+ * logicalctl.h
+ *     Definitions for logical decoding status control facility.
+ *
+ * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *     src/include/replication/logicalctl.h
+ *
+ *-------------------------------------------------------------------------
+ */
+#ifndef LOGICALCTL_H
+#define LOGICALCTL_H
+
+extern Size LogicalDecodingCtlShmemSize(void);
+extern void LogicalDecodingCtlShmemInit(void);
+extern void StartupLogicalDecodingStatus(bool status_in_control_file);
+extern void InitializeProcessXLogLogicalInfo(void);
+extern bool ProcessBarrierUpdateXLogLogicalInfo(void);
+extern bool IsLogicalDecodingEnabled(void);
+extern bool IsXLogLogicalInfoEnabled(void);
+extern bool CheckXLogLogicalInfo(void);
+extern void AtEOXact_LogicalCtl(void);
+extern void EnsureLogicalDecodingEnabled(void);
+extern void EnableLogicalDecoding(void);
+extern void RequestDisableLogicalDecoding(void);
+extern void DisableLogicalDecodingIfNecessary(void);
+extern void DisableLogicalDecoding(void);
+extern void UpdateLogicalDecodingStatusEndOfRecovery(void);
+
+#endif
index 28251d866388bcf12064d5c195868eed2fefca38..87b0ee8856d912246ef06503517664080f94c4de 100644 (file)
@@ -359,6 +359,7 @@ extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
 extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
+extern bool CheckLogicalSlotExists(void);
 extern void ReplicationSlotsDropDBSlots(Oid dboid);
 extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
                                               XLogSegNo oldestSegno,
index 5b0ce383408c16f3ff67464042ed937f0e1d4612..533344509e980062a677975418b24c1ed60dfec2 100644 (file)
@@ -86,6 +86,7 @@ PG_LWLOCK(51, InjectionPoint)
 PG_LWLOCK(52, SerialControl)
 PG_LWLOCK(53, AioWorkerSubmissionQueue)
 PG_LWLOCK(54, WaitLSN)
+PG_LWLOCK(55, LogicalDecodingControl)
 
 /*
  * There also exist several built-in LWLock tranches.  As with the predefined
index afeeb1ca019f8ea35958e009bc685e1dbd009c98..8e428f298c669c9367e61a7572136aba20d4745c 100644 (file)
@@ -54,6 +54,8 @@ typedef enum
 typedef enum
 {
    PROCSIGNAL_BARRIER_SMGRRELEASE, /* ask smgr to close files */
+   PROCSIGNAL_BARRIER_UPDATE_XLOG_LOGICAL_INFO,    /* ask to update
+                                                    * XLogLogicalInfo */
 } ProcSignalBarrierType;
 
 /*
index 82ac8646a8d43d05bf577b22d98811cc3c008b84..fbe0b1e2e3dc31dc077704eebace2003b78d85a3 100644 (file)
@@ -61,6 +61,7 @@ extern bool check_default_text_search_config(char **newval, void **extra, GucSou
 extern void assign_default_text_search_config(const char *newval, void *extra);
 extern bool check_default_with_oids(bool *newval, void **extra,
                                    GucSource source);
+extern const char *show_effective_wal_level(void);
 extern bool check_huge_page_size(int *newval, void **extra, GucSource source);
 extern void assign_io_method(int newval, void *extra);
 extern bool check_io_max_concurrency(int *newval, void **extra, GucSource source);
index e93248bd66e22893cbb194f3db8efa3176596f45..9cc057909e209df565fa1791811b1c54e91fe923 100644 (file)
@@ -59,6 +59,7 @@ tests += {
       't/048_vacuum_horizon_floor.pl',
       't/049_wait_for_lsn.pl',
       't/050_redo_segment_missing.pl',
+      't/051_effective_wal_level.pl',
     ],
   },
 }
index ebe2fae178981aa3368759cf47a8ad92c9a23956..49d9ea4d09690abe9a936710df664f02fbdbddaa 100644 (file)
@@ -878,9 +878,10 @@ check_slots_conflict_reason('wal_level_', 'wal_level_insufficient');
 
 $handle =
   make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
-# We are not able to read from the slot as it requires wal_level >= logical on the primary server
+# We are not able to read from the slot as it requires effective_wal_level >= logical on
+# the primary server
 check_pg_recvlogical_stderr($handle,
-   "logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary"
+   "logical decoding on standby requires \"effective_wal_level\" >= \"logical\" on the primary"
 );
 
 # Restore primary wal_level
diff --git a/src/test/recovery/t/051_effective_wal_level.pl b/src/test/recovery/t/051_effective_wal_level.pl
new file mode 100644 (file)
index 0000000..d245359
--- /dev/null
@@ -0,0 +1,404 @@
+# Copyright (c) 2025, PostgreSQL Global Development Group
+#
+# Test that effective_wal_level changes upon logical replication slot creation
+# and deletion.
+
+use strict;
+use warnings FATAL => 'all';
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+# Check both wal_level and effective_wal_level values on the given node
+# are expected.
+sub test_wal_level
+{
+   my ($node, $expected, $msg) = @_;
+
+   is( $node->safe_psql(
+           'postgres',
+           qq[select current_setting('wal_level'), current_setting('effective_wal_level');]
+       ),
+       "$expected",
+       "$msg");
+}
+
+# Wait for the checkpointer to decrease effective_wal_level to 'replica'.
+sub wait_for_logical_decoding_disabled
+{
+   my ($node) = @_;
+
+   $node->poll_query_until('postgres',
+       qq[select current_setting('effective_wal_level') = 'replica';]);
+}
+
+# Initialize the primary server with wal_level = 'replica'.
+my $primary = PostgreSQL::Test::Cluster->new('primary');
+$primary->init(allows_streaming => 1);
+$primary->append_conf('postgresql.conf', "log_min_messages = debug1");
+$primary->start();
+
+# Check both initial wal_level and effective_wal_level values.
+test_wal_level($primary, "replica|replica",
+   "wal_level and effective_wal_level start with the same value 'replica'");
+
+# Create a physical slot and verify that it doesn't affect effective_wal_level.
+$primary->safe_psql('postgres',
+   qq[select pg_create_physical_replication_slot('test_phy_slot', false, false)]
+);
+test_wal_level($primary, "replica|replica",
+   "effective_wal_level doesn't change with a new physical slot");
+$primary->safe_psql('postgres',
+   qq[select pg_drop_replication_slot('test_phy_slot')]);
+
+# Create a temporary logical slot but exit without releasing it explicitly.
+# This enables logical decoding but skips disabling it and delegates to the
+# checkpointer.
+$primary->safe_psql('postgres',
+   qq[select pg_create_logical_replication_slot('test_tmp_slot', 'test_decoding', true)]
+);
+ok( $primary->log_contains(
+       "logical decoding is enabled upon creating a new logical replication slot"
+   ),
+   "logical decoding has been enabled upon creating a temp slot");
+
+# Wait for the checkpointer to disable logical decoding.
+wait_for_logical_decoding_disabled($primary);
+
+# Create a new logical slot and check that effective_wal_level must be increased
+# to 'logical'.
+$primary->safe_psql('postgres',
+   qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]);
+test_wal_level($primary, "replica|logical",
+   "effective_wal_level increased to 'logical' upon a logical slot creation"
+);
+
+# Restart the server and check again.
+$primary->restart();
+test_wal_level($primary, "replica|logical",
+   "effective_wal_level remains 'logical' even after a server restart");
+
+# Create and drop another logical slot, then verify that effective_wal_level remains
+# 'logical'.
+$primary->safe_psql('postgres',
+   qq[select pg_create_logical_replication_slot('test_slot2', 'pgoutput')]);
+$primary->safe_psql('postgres',
+   qq[select pg_drop_replication_slot('test_slot2')]);
+test_wal_level($primary, "replica|logical",
+   "effective_wal_level stays 'logical' as one slot remains");
+
+# Verify that the server cannot start with wal_level='minimal' when there is
+# at least one replication slot.
+$primary->adjust_conf('postgresql.conf', 'wal_level', 'minimal');
+$primary->adjust_conf('postgresql.conf', 'max_wal_senders', '0');
+$primary->stop;
+
+command_fails(
+   [
+       'pg_ctl',
+       '--pgdata' => $primary->data_dir,
+       '--log' => $primary->logfile,
+       'start',
+   ],
+   "cannot start server with wal_level='minimal' as there is in-use logical slot");
+
+my $logfile = slurp_file($primary->logfile());
+like(
+   $logfile,
+   qr/logical replication slot "test_slot" exists, but "wal_level" < "replica"/,
+   'logical slots requires logical decoding enabled at server startup');
+
+# Revert the modified settings.
+$primary->adjust_conf('postgresql.conf', 'wal_level', 'replica');
+$primary->adjust_conf('postgresql.conf', 'max_wal_senders', '10');
+
+# Add other settings to test if we disable logical decoding when invalidating the last
+# logical slot.
+$primary->append_conf(
+   'postgresql.conf',
+   qq[
+min_wal_size = 32MB
+max_wal_size = 32MB
+max_slot_wal_keep_size = 16MB
+]);
+$primary->start;
+
+# Advance WAL and verify that the slot gets invalidated.
+$primary->advance_wal(2);
+$primary->safe_psql('postgres', qq[CHECKPOINT]);
+is( $primary->safe_psql(
+       'postgres',
+       qq[
+select invalidation_reason = 'wal_removed' from pg_replication_slots where slot_name = 'test_slot';
+                ]),
+   't',
+   'test_slot gets invalidated due to wal_removed');
+
+# Verify that logical decoding is disabled after invalidating the last logical slot.
+wait_for_logical_decoding_disabled($primary);
+test_wal_level($primary, "replica|replica",
+   "effective_wal_level got decreased to 'replica' after invalidating the last logical slot"
+);
+
+# Revert the modified settings, and restart the server.
+$primary->adjust_conf('postgresql.conf', 'max_slot_wal_keep_size', undef);
+$primary->adjust_conf('postgresql.conf', 'min_wal_size', undef);
+$primary->adjust_conf('postgresql.conf', 'max_wal_size', undef);
+$primary->restart;
+
+# Recreate the logical slot to enable logical decoding again.
+$primary->safe_psql('postgres',
+   qq[select pg_drop_replication_slot('test_slot')]);
+$primary->safe_psql('postgres',
+   qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]);
+
+# Take backup during the effective_wal_level being 'logical'. But note that
+# replication slots are not included in the backup.
+$primary->backup('my_backup');
+
+# Initialize standby1 node.
+my $standby1 = PostgreSQL::Test::Cluster->new('standby1');
+$standby1->init_from_backup($primary, 'my_backup', has_streaming => 1);
+$standby1->start;
+
+# Creating a logical slot on standby should succeed as the primary enables
+# it.
+$primary->wait_for_replay_catchup($standby1);
+$standby1->create_logical_slot_on_standby($primary, 'standby1_slot',
+   'postgres');
+
+# Promote the standby1 node that has one logical slot. So effective_wal_level
+# remains 'logical' even after the promotion.
+$standby1->promote;
+test_wal_level($standby1, "replica|logical",
+   "effective_wal_level remains 'logical' even after the promotion");
+
+# Confirm if we can create a logical slot after the promotion.
+$standby1->safe_psql('postgres',
+   qq[select pg_create_logical_replication_slot('standby1_slot2', 'pgoutput')]
+);
+$standby1->stop;
+
+# Initialize standby2 node and start it with wal_level = 'logical'.
+my $standby2 = PostgreSQL::Test::Cluster->new('standby2');
+$standby2->init_from_backup($primary, 'my_backup', has_streaming => 1);
+$standby2->append_conf('postgresql.conf', qq[wal_level = 'logical']);
+$standby2->start();
+$standby2->backup('my_backup3');
+
+# Initialize cascade standby and start with wal_level = 'replica'.
+my $cascade = PostgreSQL::Test::Cluster->new('cascade');
+$cascade->init_from_backup($standby2, 'my_backup3', has_streaming => 1);
+$cascade->adjust_conf('postgresql.conf', 'wal_level', 'replica');
+$cascade->start();
+
+# Regardless of their wal_level values, effective_wal_level values on the
+# standby and the cascaded standby depend on the primary's value, 'logical'.
+test_wal_level($standby2, "logical|logical",
+   "check wal_level and effective_wal_level on standby");
+test_wal_level($cascade, "replica|logical",
+   "check wal_level and effective_wal_level on cascaded standby");
+
+# Drop the primary's last logical slot, decreasing effective_wal_level to
+# 'replica' on all nodes.
+$primary->safe_psql('postgres',
+   qq[select pg_drop_replication_slot('test_slot')]);
+wait_for_logical_decoding_disabled($primary);
+
+$primary->wait_for_replay_catchup($standby2);
+$standby2->wait_for_replay_catchup($cascade, $primary);
+
+test_wal_level($primary, "replica|replica",
+   "effective_wal_level got decreased to 'replica' on primary");
+test_wal_level($standby2, "logical|replica",
+   "effective_wal_level got decreased to 'replica' on standby");
+test_wal_level($cascade, "replica|replica",
+   "effective_wal_level got decreased to 'replica' on cascaded standby");
+
+# Promote standby2, increasing effective_wal_level to 'logical' as its wal_level
+# is set to 'logical'.
+$standby2->promote;
+
+# Verify that effective_wal_level is increased to 'logical' on the cascaded standby.
+$standby2->wait_for_replay_catchup($cascade);
+test_wal_level($cascade, "replica|logical",
+   "effective_wal_level got increased to 'logical' on standby as the new primary has wal_level='logical'"
+);
+
+$standby2->stop;
+$cascade->stop;
+
+# Initialize standby3 node and start it.
+my $standby3 = PostgreSQL::Test::Cluster->new('standby3');
+$standby3->init_from_backup($primary, 'my_backup', has_streaming => 1);
+$standby3->start;
+
+# Create logical slots on both nodes.
+$primary->safe_psql('postgres',
+   qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]);
+$primary->wait_for_replay_catchup($standby3);
+$standby3->create_logical_slot_on_standby($primary, 'standby3_slot',
+   'postgres');
+
+# Drop the logical slot from the primary, decreasing effective_wal_level to
+# 'replica' on the primary, which leads to invalidating the logical slot on the
+# standby due to 'wal_level_insufficient'.
+$primary->safe_psql('postgres',
+   qq[select pg_drop_replication_slot('test_slot')]);
+wait_for_logical_decoding_disabled($primary);
+test_wal_level($primary, "replica|replica",
+   "effective_wal_level got decreased to 'replica' on the primary to invalidate standby's slots"
+);
+$standby3->poll_query_until(
+   'postgres', qq[
+select invalidation_reason = 'wal_level_insufficient' from pg_replication_slots where slot_name = 'standby3_slot'
+               ]);
+
+# Restart the server to verify that the slot is successfully restored during
+# startup.
+$standby3->restart;
+
+# Check that the logical decoding is not enabled on the standby3. Note that it still has
+# the invalidated logical slot.
+test_wal_level($standby3, "replica|replica",
+   "effective_wal_level got decreased to 'replica' on standby");
+
+my ($result, $stdout, $stderr) = $standby3->psql('postgres',
+   qq[select pg_logical_slot_get_changes('standby3_slot', null, null)]);
+like(
+   $stderr,
+   qr/ERROR:  logical decoding on standby requires "effective_wal_level" >= "logical" on the primary/,
+   "cannot use logical decoding on standby as it is disabled on primary");
+
+# Restart the primary with setting wal_level = 'logical' and create a new logical
+# slot.
+$primary->append_conf('postgresql.conf', qq[wal_level = 'logical']);
+$primary->restart;
+$primary->safe_psql('postgres',
+   qq[select pg_create_logical_replication_slot('test_slot', 'pgoutput')]);
+
+# effective_wal_level should be 'logical' on both nodes.
+$primary->wait_for_replay_catchup($standby3);
+test_wal_level($primary, "logical|logical",
+   "check WAL levels on the primary node");
+test_wal_level($standby3, "replica|logical",
+   "effective_wal_level got increased to 'logical' again on standby");
+
+# Set wal_level to 'replica' and restart the primary. Since one logical slot
+# is still present on the primary, effective_wal_level remains 'logical' even
+# if wal_level got decreased to 'replica'.
+$primary->adjust_conf('postgresql.conf', 'wal_level', 'replica');
+$primary->restart;
+$primary->wait_for_replay_catchup($standby3);
+
+# Verify that the effective_wal_level remains 'logical' on both nodes
+test_wal_level($primary, "replica|logical",
+   "effective_wal_level remains 'logical' on primary even after setting wal_level to 'replica'"
+);
+test_wal_level($standby3, "replica|logical",
+   "effective_wal_level remains 'logical' on standby even after setting wal_level to 'replica' on primary"
+);
+
+# Promote the standby3 and verify that effective_wal_level got decreased to
+# 'replica' after the promotion since there is no valid logical slot.
+$standby3->promote;
+test_wal_level($standby3, "replica|replica",
+   "effective_wal_level got decreased to 'replica' as there is no valid logical slot"
+);
+
+# Cleanup the invalidated slot.
+$standby3->safe_psql('postgres',
+   qq[select pg_drop_replication_slot('standby3_slot')]);
+
+$standby3->stop;
+
+# Test the race condition at end of the recovery between the startup and logical
+# decoding status change. This test requires injection points enabled.
+if (   $ENV{enable_injection_points} eq 'yes'
+   && $primary->check_extension('injection_points'))
+{
+   # Initialize standby4 and start it.
+   my $standby4 = PostgreSQL::Test::Cluster->new('standby4');
+   $standby4->init_from_backup($primary, 'my_backup', has_streaming => 1);
+   $standby4->start;
+
+   # Both servers have one logical slot.
+   $primary->wait_for_replay_catchup($standby4);
+   $standby4->create_logical_slot_on_standby($primary, 'standby4_slot',
+       'postgres');
+
+   # Enable and attach the injection point on the standby4.
+   $primary->safe_psql('postgres', 'create extension injection_points');
+   $primary->wait_for_replay_catchup($standby4);
+   $standby4->safe_psql('postgres',
+       qq[select injection_points_attach('startup-logical-decoding-status-change-end-of-recovery', 'wait');]
+   );
+
+   # Trigger promotion with no wait, and wait for the startup process to reach
+   # the injection point.
+   $standby4->safe_psql('postgres', qq[select pg_promote(false)]);
+   note('promote the standby and waiting for injection_point');
+   $standby4->wait_for_event('startup',
+       'startup-logical-decoding-status-change-end-of-recovery');
+   note(
+       "injection_point 'startup-logical-decoding-status-change-end-of-recovery' is reached"
+   );
+
+   # Drop the logical slot, requesting to disable logical decoding to the checkpointer.
+   $standby4->safe_psql('postgres',
+       qq[select pg_drop_replication_slot('standby4_slot');]);
+
+   # Resume the startup process to complete the recovery.
+   $standby4->safe_psql('postgres',
+       qq[select injection_points_wakeup('startup-logical-decoding-status-change-end-of-recovery')]
+   );
+
+   # Verify that logical decoding got disabled after the recovery.
+   wait_for_logical_decoding_disabled($standby4);
+   test_wal_level($standby4, "replica|replica",
+       "effective_wal_level properly got decreased to 'replica'");
+   $standby4->stop;
+
+   # Test the abort process of logical decoding activation. We drop the primary's
+   # slot to decrease its effective_wal_level to 'replica'.
+   $primary->safe_psql('postgres',
+       qq[select pg_drop_replication_slot('test_slot')]);
+   wait_for_logical_decoding_disabled($primary);
+   test_wal_level($primary, "replica|replica",
+       "effective_wal_level got decreased to 'replica' on primary");
+
+   # Start a psql session to test the case where the activation process is
+   # interrupted.
+   my $psql_create_slot = $primary->background_psql('postgres');
+
+   # Start the logical decoding activation process upon creating the logical
+   # slot, but it will wait due to the injection point.
+   $psql_create_slot->query_until(
+       qr/create_slot_canceled/,
+       q(\echo create_slot_canceled
+select injection_points_set_local();
+select injection_points_attach('logical-decoding-activation', 'wait');
+select pg_create_logical_replication_slot('slot_canceled', 'pgoutput');
+\q
+));
+
+   $primary->wait_for_event('client backend', 'logical-decoding-activation');
+   note("injection_point 'logical-decoding-activation' is reached");
+
+   # Cancel the backend initiated by $psql_create_slot, aborting its activation
+   # process.
+   $primary->safe_psql(
+       'postgres',
+       qq[
+select pg_cancel_backend(pid) from pg_stat_activity where query ~ 'slot_canceled' and pid <> pg_backend_pid()
+]);
+
+   # Verify that the backend aborted the activation process.
+   $primary->wait_for_log("aborting logical decoding activation process");
+   test_wal_level($primary, "replica|replica",
+       "the activation process aborted");
+}
+
+$primary->stop;
+
+done_testing();
index e72d1308967e807132a176c0edd0662b48642719..7f81e61d7a75e04ab32f8e491a21f4fa7d63fa0a 100644 (file)
@@ -288,12 +288,8 @@ CREATE PUBLICATION regress_pub_for_allsequences_alltables FOR ALL SEQUENCES, ALL
 SET client_min_messages = 'NOTICE';
 CREATE PUBLICATION regress_pub_for_allsequences_alltables_withclause FOR ALL SEQUENCES, ALL TABLES WITH (publish = 'insert');
 NOTICE:  publication parameters are not applicable to sequence synchronization and will be ignored for sequences
-WARNING:  "wal_level" is insufficient to publish logical changes
-HINT:  Set "wal_level" to "logical" before creating subscriptions.
 CREATE PUBLICATION regress_pub_for_allsequences_withclause FOR ALL SEQUENCES WITH (publish_generated_columns = 'stored');
 NOTICE:  publication parameters are not applicable to sequence synchronization and will be ignored for sequences
-WARNING:  "wal_level" is insufficient to publish logical changes
-HINT:  Set "wal_level" to "logical" before creating subscriptions.
 RESET client_min_messages;
 SELECT pubname, puballtables, puballsequences FROM pg_publication WHERE pubname = 'regress_pub_for_allsequences_alltables';
                 pubname                 | puballtables | puballsequences 
index 430c1246d14c4526cc4d01b7db5b75ed06ba731f..ecb79e794740939514d51662382aa3db02526981 100644 (file)
@@ -595,7 +595,7 @@ ROLLBACK;
 });
 like(
    $reterr,
-   qr/WARNING:  "wal_level" is insufficient to publish logical changes/,
+   qr/WARNING:  logical decoding must be enabled to publish logical changes/,
    'CREATE PUBLICATION while "wal_level=minimal"');
 
 done_testing();
index 04845d5e6809eb61f6276f42af06e909624f5f13..5c88fa92f4e57fb009b0ffe783b3410466d5bd97 100644 (file)
@@ -1627,6 +1627,7 @@ LogicalDecodeStreamStopCB
 LogicalDecodeStreamTruncateCB
 LogicalDecodeTruncateCB
 LogicalDecodingContext
+LogicalDecodingCtlData
 LogicalErrorCallbackState
 LogicalOutputPluginInit
 LogicalOutputPluginWriterPrepareWrite