From 85ddcc2f4cdef490276d151c80459e287bceb782 Mon Sep 17 00:00:00 2001 From: Amit Kapila Date: Wed, 17 Dec 2025 09:43:53 +0000 Subject: [PATCH] Support existing publications in pg_createsubscriber. Allow pg_createsubscriber to reuse existing publications instead of failing when they already exist on the publisher. Previously, pg_createsubscriber would fail if any specified publication already existed. Now, existing publications are reused as-is with their current configuration, and non-existing publications are created automatically with FOR ALL TABLES. This change provides flexibility when working with mixed scenarios of existing and new publications. Users should verify that existing publications have the desired configuration before reusing them, and can use --dry-run with verbose mode to see which publications will be reused and which will be created. Only publications created by pg_createsubscriber are cleaned up during error cleanup operations. Pre-existing publications are preserved unless '--clean=publications' is explicitly specified, which drops all publications. This feature would be helpful for pub-sub configurations where users want to subscribe to a subset of tables from the publisher. Author: Shubham Khanna Reviewed-by: Euler Taveira Reviewed-by: Peter Smith Reviewed-by: Zhijie Hou (Fujitsu) Reviewed-by: vignesh C Reviewed-by: tianbing Discussion: https://postgr.es/m/CAHv8Rj%2BsxWutv10WiDEAPZnygaCbuY2RqiLMj2aRMH-H3iZwyA%40mail.gmail.com --- doc/src/sgml/ref/pg_createsubscriber.sgml | 8 ++ src/bin/pg_basebackup/pg_createsubscriber.c | 95 +++++++++++++++---- .../t/040_pg_createsubscriber.pl | 54 ++++++++++- 3 files changed, 132 insertions(+), 25 deletions(-) diff --git a/doc/src/sgml/ref/pg_createsubscriber.sgml b/doc/src/sgml/ref/pg_createsubscriber.sgml index bb9cc72576c..5a62187b189 100644 --- a/doc/src/sgml/ref/pg_createsubscriber.sgml +++ b/doc/src/sgml/ref/pg_createsubscriber.sgml @@ -285,6 +285,14 @@ PostgreSQL documentation a generated name is assigned to the publication name. This option cannot be used together with . + + If a specified publication already exists on the publisher, it is reused. + It is useful to partially replicate the database if the specified + publication includes a list of tables. If the publication does not exist, + it is automatically created with FOR ALL TABLES. Use + option to preview which publications will be + reused and which will be created. + diff --git a/src/bin/pg_basebackup/pg_createsubscriber.c b/src/bin/pg_basebackup/pg_createsubscriber.c index ef6deec14af..41a649297c7 100644 --- a/src/bin/pg_basebackup/pg_createsubscriber.c +++ b/src/bin/pg_basebackup/pg_createsubscriber.c @@ -116,6 +116,7 @@ static void stop_standby_server(const char *datadir); static void wait_for_end_recovery(const char *conninfo, const struct CreateSubscriberOptions *opt); static void create_publication(PGconn *conn, struct LogicalRepInfo *dbinfo); +static bool find_publication(PGconn *conn, const char *pubname, const char *dbname); static void drop_publication(PGconn *conn, const char *pubname, const char *dbname, bool *made_publication); static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo); @@ -763,6 +764,39 @@ generate_object_name(PGconn *conn) return objname; } +/* + * Does the publication exist in the specified database? + */ +static bool +find_publication(PGconn *conn, const char *pubname, const char *dbname) +{ + PQExpBuffer str = createPQExpBuffer(); + PGresult *res; + bool found = false; + char *pubname_esc = PQescapeLiteral(conn, pubname, strlen(pubname)); + + appendPQExpBuffer(str, + "SELECT 1 FROM pg_catalog.pg_publication " + "WHERE pubname = %s", + pubname_esc); + res = PQexec(conn, str->data); + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + pg_log_error("could not find publication \"%s\" in database \"%s\": %s", + pubname, dbname, PQerrorMessage(conn)); + disconnect_database(conn, true); + } + + if (PQntuples(res) == 1) + found = true; + + PQclear(res); + PQfreemem(pubname_esc); + destroyPQExpBuffer(str); + + return found; +} + /* * Create the publications and replication slots in preparation for logical * replication. Returns the LSN from latest replication slot. It will be the @@ -799,13 +833,25 @@ setup_publisher(struct LogicalRepInfo *dbinfo) if (num_replslots == 0) dbinfo[i].replslotname = pg_strdup(dbinfo[i].subname); - /* - * Create publication on publisher. This step should be executed - * *before* promoting the subscriber to avoid any transactions between - * consistent LSN and the new publication rows (such transactions - * wouldn't see the new publication rows resulting in an error). - */ - create_publication(conn, &dbinfo[i]); + if (find_publication(conn, dbinfo[i].pubname, dbinfo[i].dbname)) + { + /* Reuse existing publication on publisher. */ + pg_log_info("use existing publication \"%s\" in database \"%s\"", + dbinfo[i].pubname, dbinfo[i].dbname); + /* Don't remove pre-existing publication if an error occurs. */ + dbinfo[i].made_publication = false; + } + else + { + /* + * Create publication on publisher. This step should be executed + * *before* promoting the subscriber to avoid any transactions + * between consistent LSN and the new publication rows (such + * transactions wouldn't see the new publication rows resulting in + * an error). + */ + create_publication(conn, &dbinfo[i]); + } /* Create replication slot on publisher */ if (lsn) @@ -1749,11 +1795,10 @@ drop_publication(PGconn *conn, const char *pubname, const char *dbname, /* * Retrieve and drop the publications. * - * Since the publications were created before the consistent LSN, they - * remain on the subscriber even after the physical replica is - * promoted. Remove these publications from the subscriber because - * they have no use. Additionally, if requested, drop all pre-existing - * publications. + * Publications copied during physical replication remain on the subscriber + * after promotion. If --clean=publications is specified, drop all existing + * publications in the subscriber database. Otherwise, only drop publications + * that were created by pg_createsubscriber during this operation. */ static void check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) @@ -1785,14 +1830,24 @@ check_and_drop_publications(PGconn *conn, struct LogicalRepInfo *dbinfo) PQclear(res); } - - /* - * In dry-run mode, we don't create publications, but we still try to drop - * those to provide necessary information to the user. - */ - if (!drop_all_pubs || dry_run) - drop_publication(conn, dbinfo->pubname, dbinfo->dbname, - &dbinfo->made_publication); + else + { + /* Drop publication only if it was created by this tool */ + if (dbinfo->made_publication) + { + drop_publication(conn, dbinfo->pubname, dbinfo->dbname, + &dbinfo->made_publication); + } + else + { + if (dry_run) + pg_log_info("dry-run: would preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + else + pg_log_info("preserve existing publication \"%s\" in database \"%s\"", + dbinfo->pubname, dbinfo->dbname); + } + } } /* diff --git a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl index 3d6086dc489..9e0db6cd099 100644 --- a/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl +++ b/src/bin/pg_basebackup/t/040_pg_createsubscriber.pl @@ -443,10 +443,17 @@ is(scalar(() = $stderr =~ /would create the replication slot/g), is(scalar(() = $stderr =~ /would create subscription/g), 3, "verify subscriptions are created for all databases"); +# Create a user-defined publication, and a table that is not a member of that +# publication. +$node_p->safe_psql($db1, qq( + CREATE PUBLICATION test_pub3 FOR TABLE tbl1; + CREATE TABLE not_replicated (a int); +)); + # Run pg_createsubscriber on node S. --verbose is used twice # to show more information. -# In passing, also test the --enable-two-phase option and -# --clean option +# +# Test two phase and clean options. Use pre-existing publication. command_ok( [ 'pg_createsubscriber', @@ -456,7 +463,7 @@ command_ok( '--publisher-server' => $node_p->connstr($db1), '--socketdir' => $node_s->host, '--subscriber-port' => $node_s->port, - '--publication' => 'pub1', + '--publication' => 'test_pub3', '--publication' => 'pub2', '--replication-slot' => 'replslot1', '--replication-slot' => 'replslot2', @@ -478,13 +485,16 @@ is($result, qq(0), # Insert rows on P $node_p->safe_psql($db1, "INSERT INTO tbl1 VALUES('third row')"); $node_p->safe_psql($db2, "INSERT INTO tbl2 VALUES('row 1')"); +$node_p->safe_psql($db1, "INSERT INTO not_replicated VALUES(0)"); # Start subscriber $node_s->start; # Confirm publications are removed from the subscriber node -is($node_s->safe_psql($db1, "SELECT COUNT(*) FROM pg_publication;"), - '0', 'all publications on subscriber have been removed'); +is($node_s->safe_psql($db1, 'SELECT COUNT(*) FROM pg_publication'), + '0', 'all publications were removed from db1'); +is($node_s->safe_psql($db2, 'SELECT COUNT(*) FROM pg_publication'), + '0', 'all publications were removed from db2'); # Verify that all subtwophase states are pending or enabled, # e.g. there are no subscriptions where subtwophase is disabled ('d') @@ -525,6 +535,9 @@ is( $result, qq(first row second row third row), "logical replication works in database $db1"); +$result = $node_s->safe_psql($db1, 'SELECT * FROM not_replicated'); +is($result, qq(), + "table is not replicated in database $db1"); # Check result in database $db2 $result = $node_s->safe_psql($db2, 'SELECT * FROM tbl2'); @@ -537,6 +550,37 @@ my $sysid_s = $node_s->safe_psql('postgres', 'SELECT system_identifier FROM pg_control_system()'); isnt($sysid_p, $sysid_s, 'system identifier was changed'); +# Verify that pub2 was created in $db2 +is($node_p->safe_psql($db2, "SELECT COUNT(*) FROM pg_publication WHERE pubname = 'pub2'"), + '1', "publication pub2 was created in $db2"); + +# Get subscription and publication names +$result = $node_s->safe_psql( + 'postgres', qq( + SELECT subname, subpublications FROM pg_subscription WHERE subname ~ '^pg_createsubscriber_' + ORDER BY subpublications; +)); +like( + $result, + qr/^pg_createsubscriber_\d+_[0-9a-f]+ \|\{pub2\}\n + pg_createsubscriber_\d+_[0-9a-f]+ \|\{test_pub3\}$/x, + 'subscription and publication names are ok'); + +# Verify that the correct publications are being used +$result = $node_s->safe_psql( + 'postgres', qq( + SELECT d.datname, s.subpublications + FROM pg_subscription s + JOIN pg_database d ON d.oid = s.subdbid + WHERE subname ~ '^pg_createsubscriber_' + ORDER BY s.subdbid + ) +); + +is($result, qq($db1|{test_pub3} +$db2|{pub2}), + "subscriptions use the correct publications"); + # clean up $node_p->teardown_node; $node_s->teardown_node; -- 2.39.5