From 8e0d32a4a1bfbae7f8340679c3586626b5fe3a08 Mon Sep 17 00:00:00 2001 From: Michael Paquier Date: Tue, 23 Dec 2025 14:32:14 +0900 Subject: [PATCH] Fix orphaned origin in shared memory after DROP SUBSCRIPTION Since ce0fdbfe9722, a replication slot and an origin are created by each tablesync worker, whose information is stored in both a catalog and shared memory (once the origin is set up in the latter case). The transaction where the origin is created is the same as the one that runs the initial COPY, with the catalog state of the origin becoming visible for other sessions only once the COPY transaction has committed. The catalog state is coupled with a state in shared memory, initialized at the same time as the origin created in the catalogs. Note that the transaction doing the initial data sync can take a long time, time that depends on the amount of data to transfer from a publication node to its subscriber node. Now, when a DROP SUBSCRIPTION is executed, all its workers are stopped with the origins removed. The removal of each origin relies on a catalog lookup. A worker still running the initial COPY would fail its transaction, with the catalog state of the origin rolled back while the shared memory state remains around. The session running the DROP SUBSCRIPTION should be in charge of cleaning up the catalog and the shared memory state, but as there is no data in the catalogs the shared memory state is not removed. This issue would leave orphaned origin data in shared memory, leading to a confusing state as it would still show up in pg_replication_origin_status. Note that this shared memory data is sticky, being flushed on disk in replorigin_checkpoint at checkpoint. This prevents other origins from reusing a slot position in the shared memory data. To address this problem, the commit moves the creation of the origin at the end of the transaction that precedes the one executing the initial COPY, making the origin immediately visible in the catalogs for other sessions, giving DROP SUBSCRIPTION a way to know about it. A different solution would have been to clean up the shared memory state using an abort callback within the tablesync worker. The solution of this commit is more consistent with the apply worker that creates an origin in a short transaction. A test is added in the subscription test 004_sync.pl, which was able to display the problem. The test fails when this commit is reverted. Reported-by: Tenglong Gu Reported-by: Daisuke Higuchi Analyzed-by: Michael Paquier Author: Hou Zhijie Reviewed-by: Amit Kapila Reviewed-by: Masahiko Sawada Discussion: https://postgr.es/m/aUTekQTg4OYnw-Co@paquier.xyz Backpatch-through: 14 --- src/backend/commands/subscriptioncmds.c | 10 ++-- src/backend/replication/logical/tablesync.c | 58 ++++++++++----------- src/test/subscription/t/004_sync.pl | 6 +++ 3 files changed, 39 insertions(+), 35 deletions(-) diff --git a/src/backend/commands/subscriptioncmds.c b/src/backend/commands/subscriptioncmds.c index abbcaff0838..921cd9674f1 100644 --- a/src/backend/commands/subscriptioncmds.c +++ b/src/backend/commands/subscriptioncmds.c @@ -1099,10 +1099,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data, * * It is possible that the origin is not yet created for * tablesync worker, this can happen for the states before - * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or - * apply worker can also concurrently try to drop the - * origin and by this time the origin might be already - * removed. For these reasons, passing missing_ok = true. + * SUBREL_STATE_DATASYNC. The tablesync worker or apply + * worker can also concurrently try to drop the origin and + * by this time the origin might be already removed. For + * these reasons, passing missing_ok = true. */ ReplicationOriginNameForLogicalRep(sub->oid, relid, originname, sizeof(originname)); @@ -2174,7 +2174,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel) * * It is possible that the origin is not yet created for tablesync * worker so passing missing_ok = true. This can happen for the states - * before SUBREL_STATE_FINISHEDCOPY. + * before SUBREL_STATE_DATASYNC. */ ReplicationOriginNameForLogicalRep(subid, relid, originname, sizeof(originname)); diff --git a/src/backend/replication/logical/tablesync.c b/src/backend/replication/logical/tablesync.c index 6bb0cbeedad..2522e372036 100644 --- a/src/backend/replication/logical/tablesync.c +++ b/src/backend/replication/logical/tablesync.c @@ -1333,13 +1333,27 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) MyLogicalRepWorker->relstate_lsn = InvalidXLogRecPtr; SpinLockRelease(&MyLogicalRepWorker->relmutex); - /* Update the state and make it visible to others. */ + /* + * Update the state, create the replication origin, and make them visible + * to others. + */ StartTransactionCommand(); UpdateSubscriptionRelState(MyLogicalRepWorker->subid, MyLogicalRepWorker->relid, MyLogicalRepWorker->relstate, MyLogicalRepWorker->relstate_lsn, false); + + /* + * Create the replication origin in a separate transaction from the one + * that sets up the origin in shared memory. This prevents the risk that + * changes to the origin in shared memory cannot be rolled back if the + * transaction aborts. + */ + originid = replorigin_by_name(originname, true); + if (!OidIsValid(originid)) + originid = replorigin_create(originname); + CommitTransactionCommand(); pgstat_report_stat(true); @@ -1379,37 +1393,21 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos) CRS_USE_SNAPSHOT, origin_startpos); /* - * Setup replication origin tracking. The purpose of doing this before the - * copy is to avoid doing the copy again due to any error in setting up - * origin tracking. + * Advance the origin to the LSN got from walrcv_create_slot and then set + * up the origin. The advancement is WAL logged for the purpose of + * recovery. Locks are to prevent the replication origin from vanishing + * while advancing. + * + * The purpose of doing these before the copy is to avoid doing the copy + * again due to any error in advancing or setting up origin tracking. */ - originid = replorigin_by_name(originname, true); - if (!OidIsValid(originid)) - { - /* - * Origin tracking does not exist, so create it now. - * - * Then advance to the LSN got from walrcv_create_slot. This is WAL - * logged for the purpose of recovery. Locks are to prevent the - * replication origin from vanishing while advancing. - */ - originid = replorigin_create(originname); - - LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, - true /* go backward */ , true /* WAL log */ ); - UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); + replorigin_advance(originid, *origin_startpos, InvalidXLogRecPtr, + true /* go backward */ , true /* WAL log */ ); + UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); - replorigin_session_setup(originid, 0); - replorigin_session_origin = originid; - } - else - { - ereport(ERROR, - (errcode(ERRCODE_DUPLICATE_OBJECT), - errmsg("replication origin \"%s\" already exists", - originname))); - } + replorigin_session_setup(originid, 0); + replorigin_session_origin = originid; /* * If the user did not opt to run as the owner of the subscription diff --git a/src/test/subscription/t/004_sync.pl b/src/test/subscription/t/004_sync.pl index d5eac05a3b3..2c6ae2d023a 100644 --- a/src/test/subscription/t/004_sync.pl +++ b/src/test/subscription/t/004_sync.pl @@ -172,6 +172,12 @@ ok( $node_publisher->poll_query_until( 'postgres', 'SELECT count(*) = 0 FROM pg_replication_slots'), 'DROP SUBSCRIPTION during error can clean up the slots on the publisher'); +# After dropping the subscription, all replication origins, whether created by +# an apply worker or table sync worker, should have been cleaned up. +$result = $node_subscriber->safe_psql('postgres', + "SELECT count(*) FROM pg_replication_origin_status"); +is($result, qq(0), 'all replication origins have been cleaned up'); + $node_subscriber->stop('fast'); $node_publisher->stop('fast'); -- 2.39.5