Support existing publications in pg_createsubscriber.
authorAmit Kapila <[email protected]>
Wed, 17 Dec 2025 09:43:53 +0000 (09:43 +0000)
committerAmit Kapila <[email protected]>
Wed, 17 Dec 2025 09:43:53 +0000 (09:43 +0000)
Allow pg_createsubscriber to reuse existing publications instead of
failing when they already exist on the publisher.

Previously, pg_createsubscriber would fail if any specified publication
already existed. Now, existing publications are reused as-is with their
current configuration, and non-existing publications are created
automatically with FOR ALL TABLES.

This change provides flexibility when working with mixed scenarios of
existing and new publications. Users should verify that existing
publications have the desired configuration before reusing them, and can
use --dry-run with verbose mode to see which publications will be reused
and which will be created.

Only publications created by pg_createsubscriber are cleaned up during
error cleanup operations. Pre-existing publications are preserved unless
'--clean=publications' is explicitly specified, which drops all
publications.

This feature would be helpful for pub-sub configurations where users want
to subscribe to a subset of tables from the publisher.

Author: Shubham Khanna <[email protected]>
Reviewed-by: Euler Taveira <[email protected]>
Reviewed-by: Peter Smith <[email protected]>
Reviewed-by: Zhijie Hou (Fujitsu) <[email protected]
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: vignesh C <[email protected]>
Reviewed-by: tianbing <[email protected]>
Discussion: https://postgr.es/m/CAHv8Rj%2BsxWutv10WiDEAPZnygaCbuY2RqiLMj2aRMH-H3iZwyA%40mail.gmail.com

doc/src/sgml/ref/pg_createsubscriber.sgml
src/bin/pg_basebackup/pg_createsubscriber.c
src/bin/pg_basebackup/t/040_pg_createsubscriber.pl

index bb9cc72576c4aebd5d93af93c1101d62f7644777..5a62187b189e6d6d6bc719c70ec1c259423edb87 100644 (file)
@@ -285,6 +285,14 @@ PostgreSQL documentation
        a generated name is assigned to the publication name. This option cannot
        be used together with <option>--all</option>.
       </para>
+      <para>
+       If a specified publication already exists on the publisher, it is reused.
+       It is useful to partially replicate the database if the specified
+       publication includes a list of tables. If the publication does not exist,
+       it is automatically created with <literal>FOR ALL TABLES</literal>. Use
+       <option>--dry-run</option> option to preview which publications will be
+       reused and which will be created.
+      </para>
      </listitem>
     </varlistentry>
 
index ef6deec14af8cfc87a18c86443d0c9b164cfb342..41a649297c738357f2615f97e2d7755170c3a7c6 100644 (file)
@@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir);
 static void wait_for_end_recovery(const char *conninfo,
                                  const struct CreateSubscriberOptions *opt);
 static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo);
+static bool find_publication(PGconn *conn, const char *pubname, const char *dbname);
 static void drop_publication(PGconn *conn, const char *pubname,
                             const char *dbname, bool *made_publication);
 static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo);
@@ -763,6 +764,39 @@ generate_object_name(PGconn *conn)
    return objname;
 }
 
+/*
+ * Does the publication exist in the specified database?
+ */
+static bool
+find_publication(PGconn *conn, const char *pubname, const char *dbname)
+{
+   PQExpBuffer str = createPQExpBuffer();
+   PGresult   *res;
+   bool        found = false;
+   char       *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname));
+
+   appendPQExpBuffer(str,
+                     "SELECT 1 FROM pg_catalog.pg_publication "
+                     "WHERE pubname = %s",
+                     pubname_esc);
+   res = PQexec(conn, str->data);
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+   {
+       pg_log_error("could not find publication \"%s\" in database \"%s\": %s",
+                    pubname, dbname, PQerrorMessage(conn));
+       disconnect_database(conn, true);
+   }
+
+   if (PQntuples(res) == 1)
+       found = true;
+
+   PQclear(res);
+   PQfreemem(pubname_esc);
+   destroyPQExpBuffer(str);
+
+   return found;
+}
+
 /*
  * Create the publications and replication slots in preparation for logical
  * replication. Returns the LSN from latest replication slot. It will be the
@@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo)
        if (num_replslots == 0)
            dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname);
 
-       /*
-        * Create publication on publisher. This step should be executed
-        * *before* promoting the subscriber to avoid any transactions between
-        * consistent LSN and the new publication rows (such transactions
-        * wouldn't see the new publication rows resulting in an error).
-        */
-       create_publication(conn, &dbinfo[i]);
+       if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname))
+       {
+           /* Reuse existing publication on publisher. */
+           pg_log_info("use existing publication \"%s\" in database \"%s\"",
+                       dbinfo[i].pubname, dbinfo[i].dbname);
+           /* Don't remove pre-existing publication if an error occurs. */
+           dbinfo[i].made_publication = false;
+       }
+       else
+       {
+           /*
+            * Create publication on publisher. This step should be executed
+            * *before* promoting the subscriber to avoid any transactions
+            * between consistent LSN and the new publication rows (such
+            * transactions wouldn't see the new publication rows resulting in
+            * an error).
+            */
+           create_publication(conn, &dbinfo[i]);
+       }
 
        /* Create replication slot on publisher */
        if (lsn)
@@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname,
 /*
  * Retrieve and drop the publications.
  *
- * Since the publications were created before the consistent LSN, they
- * remain on the subscriber even after the physical replica is
- * promoted. Remove these publications from the subscriber because
- * they have no use. Additionally, if requested, drop all pre-existing
- * publications.
+ * Publications copied during physical replication remain on the subscriber
+ * after promotion. If --clean=publications is specified, drop all existing
+ * publications in the subscriber database. Otherwise, only drop publications
+ * that were created by pg_createsubscriber during this operation.
  */
 static void
 check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
@@ -1785,14 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo)
 
        PQclear(res);
    }
-
-   /*
-    * In dry-run mode, we don't create publications, but we still try to drop
-    * those to provide necessary information to the user.
-    */
-   if (!drop_all_pubs || dry_run)
-       drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
-                        &dbinfo->made_publication);
+   else
+   {
+       /* Drop publication only if it was created by this tool */
+       if (dbinfo->made_publication)
+       {
+           drop_publication(conn, dbinfo->pubname, dbinfo->dbname,
+                            &dbinfo->made_publication);
+       }
+       else
+       {
+           if (dry_run)
+               pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"",
+                           dbinfo->pubname, dbinfo->dbname);
+           else
+               pg_log_info("preserve existing publication \"%s\" in database \"%s\"",
+                           dbinfo->pubname, dbinfo->dbname);
+       }
+   }
 }
 
 /*
index 3d6086dc4893b94b5ffc7e01fb2c0e3fdf8b1633..9e0db6cd09914bcd74f5c9081fc8c26ea046c4fa 100644 (file)
@@ -443,10 +443,17 @@ is(scalar(() = $stderr =~ /would create the replication slot/g),
 is(scalar(() = $stderr =~ /would create subscription/g),
    3, "verify subscriptions are created for all databases");
 
+# Create a user-defined publication, and a table that is not a member of that
+# publication.
+$node_p->safe_psql($db1, qq(
+   CREATE PUBLICATION test_pub3 FOR TABLE tbl1;
+   CREATE TABLE not_replicated (a int);
+));
+
 # Run pg_createsubscriber on node S.  --verbose is used twice
 # to show more information.
-# In passing, also test the --enable-two-phase option and
-# --clean option
+#
+# Test two phase and clean options. Use pre-existing publication.
 command_ok(
    [
        'pg_createsubscriber',
@@ -456,7 +463,7 @@ command_ok(
        '--publisher-server' => $node_p->connstr($db1),
        '--socketdir' => $node_s->host,
        '--subscriber-port' => $node_s->port,
-       '--publication' => 'pub1',
+       '--publication' => 'test_pub3',
        '--publication' => 'pub2',
        '--replication-slot' => 'replslot1',
        '--replication-slot' => 'replslot2',
@@ -478,13 +485,16 @@ is($result, qq(0),
 # Insert rows on P
 $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')");
 $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')");
+$node_p->safe_psql($db1, "INSERT INTO not_replicated VALUES(0)");
 
 # Start subscriber
 $node_s->start;
 
 # Confirm publications are removed from the subscriber node
-is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"),
-   '0', 'all publications on subscriber have been removed');
+is($node_s->safe_psql($db1, 'SELECT COUNT(*) FROM pg_publication'),
+   '0', 'all publications were removed from db1');
+is($node_s->safe_psql($db2, 'SELECT COUNT(*) FROM pg_publication'),
+   '0', 'all publications were removed from db2');
 
 # Verify that all subtwophase states are pending or enabled,
 # e.g. there are no subscriptions where subtwophase is disabled ('d')
@@ -525,6 +535,9 @@ is( $result, qq(first row
 second row
 third row),
    "logical replication works in database $db1");
+$result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated');
+is($result, qq(),
+   "table is not replicated in database $db1");
 
 # Check result in database $db2
 $result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2');
@@ -537,6 +550,37 @@ my $sysid_s = $node_s->safe_psql('postgres',
    'SELECT system_identifier FROM pg_control_system()');
 isnt($sysid_p, $sysid_s, 'system identifier was changed');
 
+# Verify that pub2 was created in $db2
+is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"),
+   '1', "publication pub2 was created in $db2");
+
+# Get subscription and publication names
+$result = $node_s->safe_psql(
+   'postgres', qq(
+    SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_'
+   ORDER BY subpublications;
+));
+like(
+   $result,
+   qr/^pg_createsubscriber_\d+_[0-9a-f]+ \|\{pub2\}\n
+        pg_createsubscriber_\d+_[0-9a-f]+ \|\{test_pub3\}$/x,
+   'subscription and publication names are ok');
+
+# Verify that the correct publications are being used
+$result = $node_s->safe_psql(
+   'postgres', qq(
+       SELECT d.datname, s.subpublications
+       FROM pg_subscription s
+       JOIN pg_database d ON d.oid = s.subdbid
+       WHERE subname ~ '^pg_createsubscriber_'
+       ORDER BY s.subdbid
+    )
+);
+
+is($result, qq($db1|{test_pub3}
+$db2|{pub2}),
+   "subscriptions use the correct publications");
+
 # clean up
 $node_p->teardown_node;
 $node_s->teardown_node;