From bb5384c4b0696dc7ceecaec828064b4e39d9af9f Mon Sep 17 00:00:00 2001 From: Robert Haas Date: Mon, 15 Aug 2011 22:43:49 -0400 Subject: [PATCH] Initial work on SnapArray implementation. --- src/backend/access/transam/xact.c | 100 +- src/backend/access/transam/xlog.c | 4 + src/backend/storage/ipc/Makefile | 2 +- src/backend/storage/ipc/README.snaparray | 153 +++ src/backend/storage/ipc/ipci.c | 3 + src/backend/storage/ipc/procarray.c | 37 + src/backend/storage/ipc/snaparray.c | 963 ++++++++++++++++++ src/backend/utils/misc/guc.c | 18 + src/backend/utils/misc/postgresql.conf.sample | 1 + src/include/storage/lwlock.h | 1 + src/include/storage/procarray.h | 2 + src/include/storage/snaparray.h | 25 + src/include/utils/guc.h | 3 + 13 files changed, 1266 insertions(+), 46 deletions(-) create mode 100644 src/backend/storage/ipc/README.snaparray create mode 100644 src/backend/storage/ipc/snaparray.c create mode 100644 src/include/storage/snaparray.h diff --git a/src/backend/access/transam/xact.c b/src/backend/access/transam/xact.c index 3dab45c2da..9c1e072d15 100644 --- a/src/backend/access/transam/xact.c +++ b/src/backend/access/transam/xact.c @@ -43,6 +43,7 @@ #include "storage/procarray.h" #include "storage/sinvaladt.h" #include "storage/smgr.h" +#include "storage/snaparray.h" #include "utils/combocid.h" #include "utils/guc.h" #include "utils/inval.h" @@ -266,7 +267,8 @@ static void CallSubXactCallbacks(SubXactEvent event, SubTransactionId parentSubid); static void CleanupTransaction(void); static void CommitTransaction(void); -static TransactionId RecordTransactionAbort(bool isSubXact); +static void RecordTransactionAbort(bool isSubXact, TransactionId xid, + int nchildren, TransactionId *children); static void StartTransaction(void); static void StartSubTransaction(void); @@ -900,20 +902,14 @@ AtSubStart_ResourceOwner(void) /* * RecordTransactionCommit - * - * Returns latest XID among xact and its children, or InvalidTransactionId - * if the xact has no XID. (We compute that here just because it's easier.) */ -static TransactionId -RecordTransactionCommit(void) +static void +RecordTransactionCommit(TransactionId xid, int nchildren, + TransactionId *children) { - TransactionId xid = GetTopTransactionIdIfAny(); bool markXidCommitted = TransactionIdIsValid(xid); - TransactionId latestXid = InvalidTransactionId; int nrels; RelFileNode *rels; - int nchildren; - TransactionId *children; int nmsgs = 0; SharedInvalidationMessage *invalMessages = NULL; bool RelcacheInitFileInval = false; @@ -921,7 +917,6 @@ RecordTransactionCommit(void) /* Get data needed for commit record */ nrels = smgrGetPendingDeletes(true, &rels); - nchildren = xactGetCommittedChildren(&children); if (XLogStandbyInfoActive()) nmsgs = xactGetCommittedInvalidationMessages(&invalMessages, &RelcacheInitFileInval); @@ -1159,9 +1154,6 @@ RecordTransactionCommit(void) END_CRIT_SECTION(); } - /* Compute latestXid while we have the child XIDs handy */ - latestXid = TransactionIdLatest(xid, nchildren, children); - /* * Wait for synchronous replication, if required. * @@ -1177,8 +1169,6 @@ cleanup: /* Clean up local data */ if (rels) pfree(rels); - - return latestXid; } @@ -1346,19 +1336,13 @@ AtSubCommit_childXids(void) /* * RecordTransactionAbort - * - * Returns latest XID among xact and its children, or InvalidTransactionId - * if the xact has no XID. (We compute that here just because it's easier.) */ -static TransactionId -RecordTransactionAbort(bool isSubXact) +static void +RecordTransactionAbort(bool isSubXact, TransactionId xid, int nchildren, + TransactionId *children) { - TransactionId xid = GetCurrentTransactionIdIfAny(); - TransactionId latestXid; int nrels; RelFileNode *rels; - int nchildren; - TransactionId *children; XLogRecData rdata[3]; int lastrdata = 0; xl_xact_abort xlrec; @@ -1367,14 +1351,14 @@ RecordTransactionAbort(bool isSubXact) * If we haven't been assigned an XID, nobody will care whether we aborted * or not. Hence, we're done in that case. It does not matter if we have * rels to delete (note that this routine is not responsible for actually - * deleting 'em). We cannot have any child XIDs, either. + * deleting 'em). */ if (!TransactionIdIsValid(xid)) { /* Reset XactLastRecEnd until the next transaction writes something */ if (!isSubXact) XactLastRecEnd.xrecoff = 0; - return InvalidTransactionId; + return; } /* @@ -1394,7 +1378,6 @@ RecordTransactionAbort(bool isSubXact) /* Fetch the data we need for the abort record */ nrels = smgrGetPendingDeletes(false, &rels); - nchildren = xactGetCommittedChildren(&children); /* XXX do we really need a critical section here? */ START_CRIT_SECTION(); @@ -1458,18 +1441,6 @@ RecordTransactionAbort(bool isSubXact) END_CRIT_SECTION(); - /* Compute latestXid while we have the child XIDs handy */ - latestXid = TransactionIdLatest(xid, nchildren, children); - - /* - * If we're aborting a subtransaction, we can immediately remove failed - * XIDs from PGPROC's cache of running child XIDs. We do that here for - * subxacts, because we already have the child XID array at hand. For - * main xacts, the equivalent happens just after this function returns. - */ - if (isSubXact) - XidCacheRemoveRunningXids(xid, nchildren, children, latestXid); - /* Reset XactLastRecEnd until the next transaction writes something */ if (!isSubXact) XactLastRecEnd.xrecoff = 0; @@ -1477,8 +1448,6 @@ RecordTransactionAbort(bool isSubXact) /* And clean up local data */ if (rels) pfree(rels); - - return latestXid; } /* @@ -1786,6 +1755,9 @@ CommitTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + TransactionId xid; + int nchildren; + TransactionId *children; ShowTransactionState("CommitTransaction"); @@ -1852,6 +1824,11 @@ CommitTransaction(void) */ PreCommit_Notify(); + /* Get XID information. */ + xid = GetTopTransactionIdIfAny(); + nchildren = xactGetCommittedChildren(&children); + latestXid = TransactionIdLatest(xid, nchildren, children); + /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -1867,7 +1844,7 @@ CommitTransaction(void) /* * Here is where we really truly commit. */ - latestXid = RecordTransactionCommit(); + RecordTransactionCommit(xid, nchildren, children); TRACE_POSTGRESQL_TRANSACTION_COMMIT(MyProc->lxid); @@ -1877,6 +1854,8 @@ CommitTransaction(void) * RecordTransactionCommit. */ ProcArrayEndTransaction(MyProc, latestXid); + if (TransactionIdIsNormal(xid)) + SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid); /* * This is all post-commit cleanup. Note that if an error is raised here, @@ -2224,6 +2203,9 @@ AbortTransaction(void) { TransactionState s = CurrentTransactionState; TransactionId latestXid; + TransactionId xid; + TransactionId *children; + int nchildren; /* Prevent cancel/die interrupt while cleaning up */ HOLD_INTERRUPTS(); @@ -2285,11 +2267,16 @@ AbortTransaction(void) AtAbort_Notify(); AtEOXact_RelationMap(false); + /* Get XID information. */ + xid = GetCurrentTransactionIdIfAny(); + nchildren = xactGetCommittedChildren(&children); + latestXid = TransactionIdLatest(xid, nchildren, children); + /* * Advertise the fact that we aborted in pg_clog (assuming that we got as * far as assigning an XID to advertise). */ - latestXid = RecordTransactionAbort(false); + RecordTransactionAbort(false, xid, nchildren, children); TRACE_POSTGRESQL_TRANSACTION_ABORT(MyProc->lxid); @@ -2299,6 +2286,8 @@ AbortTransaction(void) * RecordTransactionAbort. */ ProcArrayEndTransaction(MyProc, latestXid); + if (TransactionIdIsNormal(xid)) + SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid); /* * Post-abort cleanup. See notes in CommitTransaction() concerning @@ -4158,6 +4147,11 @@ AbortSubTransaction(void) */ if (s->curTransactionOwner) { + TransactionId latestXid; + TransactionId xid; + int nchildren; + TransactionId *children; + AfterTriggerEndSubXact(false); AtSubAbort_Portals(s->subTransactionId, s->parent->subTransactionId, @@ -4166,8 +4160,24 @@ AbortSubTransaction(void) s->parent->subTransactionId); AtSubAbort_Notify(); + /* Get XID information. */ + xid = GetCurrentTransactionIdIfAny(); + nchildren = xactGetCommittedChildren(&children); + latestXid = TransactionIdLatest(xid, nchildren, children); + /* Advertise the fact that we aborted in pg_clog. */ - (void) RecordTransactionAbort(true); + (void) RecordTransactionAbort(true, xid, nchildren, children); + + /* + * If we're aborting a subtransaction, we can immediately remove failed + * XIDs from PGPROC's cache of running child XIDs. + */ + if (TransactionIdIsValid(xid)) + { + XidCacheRemoveRunningXids(xid, nchildren, children, latestXid); + if (TransactionIdIsNormal(xid)) + SnapArrayRemoveRunningXids(xid, nchildren, children, latestXid); + } /* Post-abort cleanup */ if (TransactionIdIsValid(s->transactionId)) diff --git a/src/backend/access/transam/xlog.c b/src/backend/access/transam/xlog.c index befb507047..08618d6440 100644 --- a/src/backend/access/transam/xlog.c +++ b/src/backend/access/transam/xlog.c @@ -51,6 +51,7 @@ #include "storage/predicate.h" #include "storage/proc.h" #include "storage/procarray.h" +#include "storage/snaparray.h" #include "storage/reinit.h" #include "storage/smgr.h" #include "storage/spin.h" @@ -6927,6 +6928,9 @@ StartupXLOG(void) if (standbyState != STANDBY_DISABLED) ShutdownRecoveryTransactionEnvironment(); + /* Set initial snapshot for normal running. */ + SnapArrayInitNormalRunning(ShmemVariableCache->nextXid); + /* Shut down readFile facility, free space */ if (readFile >= 0) { diff --git a/src/backend/storage/ipc/Makefile b/src/backend/storage/ipc/Makefile index 743f30e1c7..9150eb74c9 100644 --- a/src/backend/storage/ipc/Makefile +++ b/src/backend/storage/ipc/Makefile @@ -16,6 +16,6 @@ endif endif OBJS = ipc.o ipci.o pmsignal.o procarray.o procsignal.o shmem.o shmqueue.o \ - sinval.o sinvaladt.o standby.o + sinval.o sinvaladt.o snaparray.o standby.o include $(top_srcdir)/src/backend/common.mk diff --git a/src/backend/storage/ipc/README.snaparray b/src/backend/storage/ipc/README.snaparray new file mode 100644 index 0000000000..e1ee63cfe4 --- /dev/null +++ b/src/backend/storage/ipc/README.snaparray @@ -0,0 +1,153 @@ +Snapshot Derivation +=================== + +An MVCC snapshot memorializes a point in the commit sequence. That is, the +effects of any transaction which committed before the snapshot was taken will +be visible to the snapshot. Transactions which abort, or which have not yet +committed at the time the snapshot is taken, will not be visible to the +snapshot. We need only be concerned about transactions to which an XID (or +XIDs, if there are subtransactions) have been assigned; any changes made to +the database which are not related to a particular XID are non-transactional +and do not obey MVCC semantics anyway. Furthermore, we can always find out +(from CLOG) whether or not an XID that is no longer running committed or +aborted. Thus, an MVCC snapshot need only be capable of answering the +following question: given an XID, had that XID ended (either by committing +or aborting) at the time the snapshot was taken? + +The snaparray is responsible for providing backends with sufficient data +to construct an MVCC snapshot that can answer this question. We +periodically record in shared memory (1) the highest-numbered XID which had +already ended at the time the snapshot was taken ("xmax"); and (2) the XIDs +of all lower-numbered transactions which had not yet ended at the time the +snapshot was taken ("running XIDs"). As a special case, transactions which +are performing a lazy VACUUM operation can be excluded from the set of running +XIDs, since they need not be seen as in-progress even when they are. + +It is important for performance to minimize the amount of work that must be +done at transaction end. Since most transactions change the set +of running XIDs only slightly (such as by removing themselves from that list), +we provide a method for the xmax and set of running XIDs to be updated +incrementally. We periodically record the current xmax and set of running +XIDs in shared memory, and then track all transaction commits which happen +subsequently. This provides enough information for backends to reconstruct +a complete snapshot, as follows: + +1. If the largest newly completed XID is greater than the original xmax, +then increase the xmax of the snapshot to that value, and add all XIDs +between the old and new values to the set of running XIDs. + +2. Remove all of the newly completed XIDs from the set of running XIDs +(except for the one which became the new xmax value, which is the only +one that won't be listed). + +Each backend must process the list of completed XIDs every time it wishes +to derive a snapshot, so we must not allow that list to get too long. +Periodically, we (A) read the most recently written xmax, running XIDs, and +newly completed XIDs; (B) perform the calculations described above to +incrementally update these values; and then (C) write the updated xmax and +list of running XIDs back to shared memory. This effectively truncates the +list of newly completed XIDs. + +Note that this machinery doesn't require any action whatsoever at the time +an XID is assigned. Newly assigned XIDs will always be greater than than +any already-completed XID, and will be added to the set of running XIDs +automatically when it is necessary to advance xmax. + +Lazy Vacuum +=========== + +XIDs allocated while performing a lazy vacuum don't need to be included in +the set of running XIDs. Instead of marking them complete when the +transaction ends, they are marked as complete when the transaction starts. +Although this advances xmax sooner than technically necessary, it has a +major advantage that makes it worth doing anyway: it prevents VACUUM from +holding back the xmin horizon. + +Subtransactions and Overflow +============================ + +The algorithm described in the previous section could be problematic in the +presence of large numbers of subtransactions, because the amount of space +required to store the list of in-progress XIDs could grow very large. It's +not really unbounded, because the XID space is only 32 bits wide, and at any +given time half of those XIDs are in the past and half are in the future, +which somewhat limits the effective range of XIDs that could be simultaneously +in use. Moreover, since dead rows can't be reclaimed until they're no longer +visible to any running transaction, performance would probably degenerate +severely well before the system reaching these theoretical limits. +Nonetheless, we can't assume that it will always be practical for a snapshot +to include every still-running XID. + +To address this problem, we allow XIDs assigned to subtransactions to be +removed from the list of running XIDs. Whenever we do this, we remember the +highest XID so removed ("highest removed subxid"). If the snapshot receives +an inquiry about an XID that might have been removed, it must consult +pg_subtrans and look up the corresponding parent XID, if any. The visibility +calculation can then be performed as normal using the parent XID, which is +guaranteed to be present in the array if indeed it's still running. This +check can be skipped if the XID is less than the oldest running XID ("xmin"), +due to the rule that a subtransaction XID must always be greater than the XID +of its parent transaction. + +Removing subtransaction XIDs from the set of running XIDs is a somewhat +laborious process, so we hope it won't happen too often. We iterate over +the ProcArray to get a list of all still-running toplevel XIDs, and then +remove any XIDs not found there. + +Shared Memory Organization +========================== + +The main shared memory structure used by the snaparray code is a ring buffer, +which acts as a circular message buffer. We maintain three pointers into this +ring buffer: (1) the location at which backends must begin reading to derive +a valid snapshot ("start pointer"), (2) the location at which they may stop +reading ("stop pointer"), and (3) the location beyond which no data has been +writter ("write pointer"). To prevent overflow, these points are stored as +64-bit integers and interpreted modulo the buffer size. There are two types +of messages which can be stored in this buffer: snapshot summaries, and newly +completed XIDs. + +Because the most common operation on this ring buffer is to record newly +completed XIDs, and because such messages are typically small, often just +a single XID, it is important that the representation of such messages be +compact. Therefore, XID completion messages consist of just the XIDs +themselves. + +To distinguish itself from an XID completion message, a snapshot summary +begins with InvalidTransactionId, followed by the remaining data items, +all as 4-byte quantities. The format in full is as follows: + + InvalidTransactionId + xmax + highest removed subxid + number of running xids + running xid 1 + ... + running xid n + +Write access to the ring buffer is serialized by SnapArrayLock; only one +transaction (that has an XID) can end at a time. When ending, a transaction +first may either write an XID completion message or may choose (if the distance +between the start pointer and the new stop pointer seems like it's getting too +large) to instead write a new snapshot summary. + +Reads from the ring buffer can proceed in parallel with other reads, and +generally with writes. However, if the ring buffer wraps around before a +read is complete, then the data is no good and the operation must be retried. +This is obviously undesirable from a performance perspective, but should be +rare with a ring buffer of adequate size. Under typical conditions, a +wraparound would require dozens if not hundreds of commits to complete +before the backend taking its snapshot can complete its memcpy(). However, +if it does happen, we recover by taking SnapArrayLock in shared mode and +repeating the operation. To minimize the chances of further wraparounds, +whenever we do this, we also set a flag requesting that the next backend +to commit compress subtransaction IDs out of the snapshot. + +There is one further fly in the ointment. Since the start pointer, stop +pointer, and write pointer are 64-bit integers, we can't assume that they +can be read and written atomically on all platforms. Thus, in general, +access to these variables must be protected by spinlocks. (On platforms +where 8 byte loads and stores are atomic, we could use these facilities, +together with memory barriers, in lieu of the spinlocks, which would be +advantageous on machines with many CPU cores. But we don't support that +yet.) diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 56c0bd8d49..8be87b033b 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -34,6 +34,7 @@ #include "storage/pmsignal.h" #include "storage/predicate.h" #include "storage/procarray.h" +#include "storage/snaparray.h" #include "storage/procsignal.h" #include "storage/sinvaladt.h" #include "storage/spin.h" @@ -115,6 +116,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) size = add_size(size, MultiXactShmemSize()); size = add_size(size, LWLockShmemSize()); size = add_size(size, ProcArrayShmemSize()); + size = add_size(size, SnapArrayShmemSize()); size = add_size(size, BackendStatusShmemSize()); size = add_size(size, SInvalShmemSize()); size = add_size(size, PMSignalShmemSize()); @@ -213,6 +215,7 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port) InitProcGlobal(); CreateSharedProcArray(); CreateSharedBackendStatus(); + SnapArrayShmemInit(); /* * Set up shared-inval messaging diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index 9489012a18..3bc0e2e923 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1364,6 +1364,43 @@ GetSnapshotData(Snapshot snapshot) return snapshot; } +/* + * GetTopXids -- write top-level in-progress XIDs to the provided array + * + * Only XIDs preceding xmax are captured. On a standby, master transactions + * are ignored. At most nxids results are written to xids, and the total + * number of matches that would have been written had enough space existed + * is returned. + */ +uint32 +GetTopXids(TransactionId xmax, uint32 nxids, TransactionId *xids) +{ + uint32 count = 0; + int index; + + ProcArrayStruct *arrayP = procArray; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + + for (index = 0; index < arrayP->numProcs; index++) + { + volatile PGPROC *proc = arrayP->procs[index]; + + /* Fetch xid just once - see GetNewTransactionId */ + TransactionId pxid = proc->xid; + + if (!TransactionIdIsValid(pxid) || !TransactionIdPrecedes(pxid, xmax)) + continue; + + if (count < nxids) + xids[count] = pxid; + ++count; + } + + LWLockRelease(ProcArrayLock); + return count; +} + /* * GetRunningTransactionData -- returns information about running transactions. * diff --git a/src/backend/storage/ipc/snaparray.c b/src/backend/storage/ipc/snaparray.c new file mode 100644 index 0000000000..a0f3538152 --- /dev/null +++ b/src/backend/storage/ipc/snaparray.c @@ -0,0 +1,963 @@ +/*------------------------------------------------------------------------- + * + * snaparray.c + * IPC infrastructure for MVCC snapshots. + * + * Backends frequently (often many times per transaction) require a list + * of all currently-running XIDs in order to construct an MVCC snapshot. + * Due to the frequency of this operation, it must be performed with + * minimal locking. Even shared lwlocks can be problematic, as the + * spinlock protecting the state of the LWLock will become a contention + * bottleneck on machines with 32+ CPU cores. + * + * We store snapshot-related information in a shared array called the + * snaparray, which acts as a ring buffer. The management of this ring + * buffer is documented in src/backend/storage/ipc/README.snaparray. + * + * During hot standby, we instead need a list of XIDs that were running + * on the master as of the current point in the WAL stream. We manage + * this list using the same machinery that's used on the primary, except + * that the array is updated through WAL replay rather than directly by + * individual backends. + * + * It is perhaps possible for a backend on the master to terminate without + * writing an abort record for its transaction (e.g. if the master crashed + * unexpectedly). That would tie up snaparray slots indefinitely, so the + * master periodically transmits snapshot information to the standby to + * make the necessary pruning possible. + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * IDENTIFICATION + * src/backend/storage/ipc/snaparray.c + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/transam.h" +#include "miscadmin.h" +#include "storage/lwlock.h" +#include "storage/proc.h" +#include "storage/procarray.h" +#include "storage/shmem.h" +#include "storage/spin.h" +#include "storage/snaparray.h" +#include "utils/catcache.h" +#include "utils/guc.h" + +typedef struct SnapArrayStruct +{ + /* this first group of fields never changes */ + uint32 ring_buffer_size; /* size of array */ + uint32 compaction_threshold; /* when should we remove subxids? */ + + slock_t start_stop_mutex; /* protects next group of fields */ + uint64 start_pointer; /* backends should start reading here */ + uint64 stop_pointer; /* backends should stop reading here */ + + slock_t write_mutex; /* protects next group of fields */ + uint64 write_pointer; /* last byte written + 1 */ + uint64 num_wraparounds; /* number of wraparounds */ + uint64 num_writes; /* number of writes */ + + /* next group of fields is protected by SnapArrayLock */ + uint32 last_summary_size; /* # entries in last snapshot summary */ + TransactionId xmax_threshold; /* xmax at which to rewrite snapshot */ + bool compaction_requested; /* snapshot compaction requested? */ + + TransactionId buffer[FLEXIBLE_ARRAY_MEMBER]; +} SnapArrayStruct; + +int snapshot_buffers = -1; + +static SnapArrayStruct *SnapArray; +static TransactionId xid_cmp_base; + +/* Local cache of latest SnapArray data. */ +static TransactionId *SnapArrayCache; +static uint32 SnapArrayCacheSize; +static uint32 SnapArrayCacheEntries; +static uint64 SnapArrayCacheStartPointer; +static uint64 SnapArrayCacheStopPointer; + +static void SnapArrayUpdateCache(void); +static uint32 SnapArrayComputeRunningXids(TransactionId xmax, + TransactionId new_xmax, + uint32 num_running_xids, + TransactionId *running_xids, + uint32 num_removed_xids, + TransactionId *removed_xids, + uint32 num_new_running_xids, + TransactionId *new_running_xids); +static void SnapArrayWriteSnapshotSummary(TransactionId xmax, + TransactionId latest_removed_subxid, + uint32 nxids, + TransactionId *running_xids); +static void SnapArrayReadData(uint64 start_location, uint64 stop_location, + TransactionId *buffer); +static void SnapArrayAdvanceWritePointer(uint64 nitems); +static void SnapArrayWriteData(uint64 write_location, uint32 nitems, + TransactionId *item); +static int xid_cmp(const void *a, const void *b); + +#define SNAPSHOT_SUMMARY_ITEMS 4 +#define SizeOfOneCompressedSnapshot() \ + mul_size(sizeof(TransactionId), add_size(MaxBackends, \ + SNAPSHOT_SUMMARY_ITEMS)) +#define BytesToKb(x) \ + (((x) / 1024) + (((x) % 1024) ? 1 : 0)) + +/* + * By default, we allow enough space in the snapshot array for 64 entries + * per backend. The result is in kilobytes. + * + * This should not be called until MaxBackends has received its final value. + */ +static Size +SnapArraySuggestedSize() +{ + Size bytes = mul_size(SizeOfOneCompressedSnapshot(), 64); + + return BytesToKb(bytes); +} + +/* + * GUC check hook for snapshot_buffers. + */ +bool +check_snapshot_buffers(int *newval, void **extra, GucSource source) +{ + Size minimum; + + /* + * -1 indicates a request for auto-tune. + */ + if (*newval == -1) + { + /* + * If we haven't yet changed the boot_val default of -1, just leave + * it be. We'll fix it later when SnapArrayShmemSize is called. + */ + if (snapshot_buffers == -1) + return true; + /* Otherwise, substitute the auto-tune value. */ + *newval = SnapArraySuggestedSize(); + } + + /* + * We must have at least enough buffer entries for one compressed snapshot. + * Performance might be terrible due to frequent rewrites, compressions, + * and wraparounds, but anything less could potentially fail outright. + */ + minimum = SizeOfOneCompressedSnapshot(); + if (*newval < BytesToKb(minimum)) + *newval = BytesToKb(minimum); + return true; +} + +/* + * Initialization of shared memory for SnapArray. + */ +Size +SnapArrayShmemSize(void) +{ + Size size; + + /* + * If the value of snapshot_buffers is -1, use the preferred auto-tune + * value. + */ + if (snapshot_buffers == -1) + { + char buf[32]; + snprintf(buf, sizeof(buf), UINT64_FORMAT, + (uint64) SnapArraySuggestedSize()); + SetConfigOption("snapshot_buffers", buf, PGC_POSTMASTER, + PGC_S_OVERRIDE); + } + Assert(snapshot_buffers > 0); + + /* Work out size of array. */ + size = offsetof(SnapArrayStruct, buffer); + size = add_size(size, mul_size(snapshot_buffers, 1024)); + + return size; +} + +/* + * Initialize the shared PGPROC array during postmaster startup. + */ +void +SnapArrayShmemInit(void) +{ + Size size; + bool found; + + /* Create or attach to the ProcArray shared structure */ + size = SnapArrayShmemSize(); + SnapArray = (SnapArrayStruct *) + ShmemInitStruct("Snap Array", size, &found); + + if (!found) + { + SnapArray->ring_buffer_size = + snapshot_buffers * (1024 / sizeof(TransactionId)); + SnapArray->compaction_threshold = + Min(64 * MaxBackends, SnapArray->ring_buffer_size / 4); + SnapArray->start_pointer = 0; + SnapArray->stop_pointer = 0; + SnapArray->write_pointer = 0; + SnapArray->compaction_requested = false; + SnapArray->xmax_threshold = InvalidTransactionId; + SpinLockInit(&SnapArray->start_stop_mutex); + SpinLockInit(&SnapArray->write_mutex); + } + + /* While we're at it, we initialize our backend-local cache. */ + if (!CacheMemoryContext) + CreateCacheMemoryContext(); + SnapArrayCacheEntries = 128; + SnapArrayCache = MemoryContextAlloc(CacheMemoryContext, + sizeof(TransactionId) * + SnapArrayCacheEntries); +} + +/* + * Recovery is over; prepare for normal running! + */ +void +SnapArrayInitNormalRunning(TransactionId xmax) +{ + /* + * This should be invoked after WAL replay is complete and before any + * write transactions are initiated, so in theory no one else can be + * holding the lock. We acquire it anyway on principal. + */ + LWLockAcquire(SnapArrayLock, LW_EXCLUSIVE); + + /* XXX. We need to carve out twophase XIDs here. */ + SnapArrayWriteSnapshotSummary(xmax, InvalidTransactionId, 0, NULL); + + LWLockRelease(SnapArrayLock); +} + +/* + * Remove running XIDs. + * + * Note that there is no external interface to *add* running XIDs; that happens + * implicitly. Removing an XID higher than the current xmax implicitly adds + * all the intermediate XIDs to the set. + */ +void +SnapArrayRemoveRunningXids(TransactionId xid, int nchildren, + TransactionId *children, TransactionId latest_xid) +{ + uint64 start_pointer; + uint64 stop_pointer; + uint64 write_pointer; + uint64 slots_used; + uint64 slots_remaining; + uint64 last_summary_size; + + Assert(TransactionIdIsValid(xid)); /* only call this if XID assigned */ + Assert(nchildren >= 0); /* number of children can't be < 0 */ + + /* Lock out concurrent writers. */ + LWLockAcquire(SnapArrayLock, LW_EXCLUSIVE); + + /* Compute used and remaining ring buffer slots. */ + start_pointer = SnapArray->start_pointer; + stop_pointer = SnapArray->stop_pointer; + write_pointer = SnapArray->write_pointer; + last_summary_size = (uint64) SnapArray->last_summary_size; + Assert(stop_pointer == write_pointer); + slots_used = stop_pointer - start_pointer; + Assert(slots_used > 0 && slots_used >= last_summary_size); + slots_remaining = write_pointer - start_pointer; + Assert(slots_remaining < SnapArray->ring_buffer_size); + + /* + * Decide whether to write a new snapshot summary in lieu of just + * inserting the newly-removed XIDs. We do this when any of the + * following conditions hold: + * + * (1) The number of recently completed XIDs would exceed MaxBackends. + * (Reason: We don't want backends to need to process too many recently + * completed XIDs to derive a snapshot.) + * + * (2) The new xmax forced by the removal operation would exceed the + * threshold set by SnapArrayWriteSnapshotSummary. (Reason: If the + * xmax value advances too far, backends will need to add many intermediate + * xids to their snapshots, which could be expensive in terms of both + * processing time and memory. + * + * (3) The number of remaining slots before wraparound is less than + * nxids. (Reason: We can't overwrite the snapshot summary without + * writing a new one. If we do, we're screwed.) + * + * (4) The compaction_requested flag is set, indicating that at least + * one other backend had to retry due to a wraparound condition. + * (Reason: The snapshot is evidently too large to be copied quickly + * enough.) + * + * We expect (1) to occur most frequently. Condition (2) is likely to + * trigger only if xmax jumps forward abruptly (for example, because + * a transaction with many subtransactions suddenly allocates XIDs to + * all of them). Condition (3) is a safeguard against disaster, but + * should be unlikely given any reasonable buffer size. Condition (4) + * is not necessary for correctness, but seems prudent, and like (3) + * should only really be a risk with very small buffers. + */ + if (slots_used + 1 + nchildren > last_summary_size + MaxBackends + || TransactionIdFollowsOrEquals(latest_xid, SnapArray->xmax_threshold) + || slots_remaining < 1 + nchildren + || SnapArray->compaction_requested) + { + uint32 nentries; + TransactionId *entries; + TransactionId xmax; + TransactionId new_xmax; + TransactionId highest_removed_subxid; + uint32 num_running_xids; + uint32 num_removed_xids; + TransactionId *running_xids; + TransactionId *removed_xids; + TransactionId *new_running_xids; + bool compact; + uint32 xids_added; + uint32 num_new_running_xids; + uint32 compaction_threshold; + uint32 certainly_removed_xids = 0; + + /* Extract current data from array and append our new data. */ + Assert(stop_pointer >= start_pointer + SNAPSHOT_SUMMARY_ITEMS); + Assert(start_pointer + last_summary_size <= stop_pointer); + nentries = (stop_pointer - start_pointer) + 1 + nchildren; + entries = palloc(sizeof(TransactionId) * nentries); + SnapArrayReadData(start_pointer, stop_pointer, entries); + entries[stop_pointer - start_pointer] = xid; + if (nchildren > 0) + memcpy(&entries[(stop_pointer - start_pointer) + 1], children, + sizeof(TransactionId) * nchildren); + + /* Data must begin with a snapshot summary. */ + Assert(entries[0] == InvalidTransactionId); + xmax = entries[1]; + highest_removed_subxid = entries[2]; + num_running_xids = (uint32) entries[3]; + Assert(last_summary_size == num_running_xids + SNAPSHOT_SUMMARY_ITEMS); + num_removed_xids = nentries - last_summary_size; + running_xids = entries + SNAPSHOT_SUMMARY_ITEMS; + removed_xids = running_xids + num_running_xids; + + /* Sort the removed XIDs. */ + xid_cmp_base = xmax; + qsort(removed_xids, num_removed_xids, sizeof(TransactionId), xid_cmp); + + /* Work out new xmax value. */ + new_xmax = removed_xids[num_removed_xids - 1]; + if (TransactionIdPrecedes(new_xmax, xmax)) + new_xmax = xmax; + + /* Work out number of new XIDs being added. */ + if (new_xmax >= xmax) + xids_added = new_xmax - xmax; + else + xids_added = new_xmax - xmax - FirstNormalTransactionId; + + /* + * Decide whether we need to compact away subxids. This is a bit + * hairy. + * + * If the compaction_requested flag has been set, then we always + * compact. Otherwise, the decision is based on the estimated size + * of the new snapshot summary relative to the compaction threshold. + * If no subxids have previously been compacted out the snapshot, + * then we know the exact numbers. Otherwise, to be conservative, + * we assume that the none of the XIDs which precede + * highest_removed_subxids will actually be found among the running + * XIDs. We could compute a more accurate answer, but it's not worth + * it. + */ + compaction_threshold = SnapArray->compaction_threshold; + if (SnapArray->compaction_requested) + compact = true; + else if (last_summary_size + xids_added + > compaction_threshold + num_removed_xids) + compact = true; + else if (!TransactionIdIsValid(highest_removed_subxid)) + compact = false; + else if (last_summary_size + xids_added < compaction_threshold) + compact = false; + else + { + uint32 low = 0; + uint32 high = num_removed_xids; + + /* Binary search for certain-to-be-removable XIDs. */ + while (low < high) + { + uint32 middle = (low + high) / 2; + if (TransactionIdFollows(removed_xids[middle], + highest_removed_subxid)) + high = middle; + else + low = middle + 1; + } + + /* Decide whether we think it'll fit. */ + certainly_removed_xids = num_removed_xids - high; + if (last_summary_size + xids_added + > compaction_threshold + certainly_removed_xids) + compact = true; + else + compact = false; + } + + /* Construct new list of running XIDs. */ + if (compact) + { + /* + * If we're compacting away subtransaction XIDs, then we obtain + * the new list of running transaction IDs from the ProcArray. + * There shouldn't be more than MaxBackends. + * + * XXX. What about prepared transactions??? + */ + new_running_xids = palloc(sizeof(TransactionId) * MaxBackends); + num_new_running_xids = + GetTopXids(new_xmax, MaxBackends, new_running_xids); + if (num_new_running_xids > MaxBackends) + elog(PANIC, "too many toplevel XIDs"); + + /* + * We could bound this more tightly, but for now we just punt. + */ + highest_removed_subxid = new_xmax; + TransactionIdRetreat(highest_removed_subxid); + } + else + { + uint32 result; + + /* + * Allocate space for new snapshot. + */ + Assert(xids_added > certainly_removed_xids); + num_new_running_xids = last_summary_size + xids_added + - certainly_removed_xids; + new_running_xids = palloc(sizeof(TransactionId) + * num_new_running_xids); + + /* + * Filter out the removed XIDs from the running XIDs, and add any + * XIDs between the old and new xmax that aren't listed as removed. + */ + result = + SnapArrayComputeRunningXids(xmax, new_xmax, + num_running_xids, running_xids, + num_removed_xids, removed_xids, + num_new_running_xids, + new_running_xids); + if (result > num_new_running_xids) + elog(PANIC, "snapshot size calculation is bogus"); + num_new_running_xids = result; + + /* + * If the highest removed subxid has aged out of the snapshot, + * we clear the value. Otherwise we might eventually have a + * problem when the XID space wraps around. + */ + if (num_new_running_xids == 0 + || TransactionIdPrecedes(highest_removed_subxid, + new_running_xids[0])) + highest_removed_subxid = InvalidTransactionId; + } + + /* Write the new snapshot. */ + SnapArrayWriteSnapshotSummary(new_xmax, highest_removed_subxid, + num_new_running_xids, new_running_xids); + + /* Free memory. */ + pfree(new_running_xids); + pfree(entries); + } + else + { + /* + * We've decided not to insert a new snapshot summary, so just + * append our completed XIDs. + */ + + /* Advance write pointer. */ + SnapArrayAdvanceWritePointer(1 + nchildren); + + /* Write data. Better not fail here... */ + SnapArrayWriteData(write_pointer, 1, &xid); + if (nchildren > 0) + SnapArrayWriteData(write_pointer + 1, nchildren, children); + + /* Update stop pointer. */ + SpinLockAcquire(&SnapArray->start_stop_mutex); + SnapArray->stop_pointer = write_pointer + 1 + nchildren; + SpinLockRelease(&SnapArray->start_stop_mutex); + } + + /* We're done. */ + LWLockRelease(SnapArrayLock); +} + +/* + * Construct an MVCC snapshot. + */ +void +SnapArrayGetSnapshotData(Snapshot snapshot) +{ + TransactionId xmax; + TransactionId new_xmax; + TransactionId highest_removed_subxid; + uint32 num_running_xids; + uint32 num_removed_xids; + uint32 num_new_running_xids; + TransactionId *running_xids; + TransactionId *removed_xids; + TransactionId *new_running_xids; + uint32 n; + uint32 xids_added; + uint32 certainly_removed_xids = 0; + bool needsort = false; + + /* Get latest information from shared memory. */ + SnapArrayUpdateCache(); + + /* Data must begin with a snapshot summary. */ + Assert(SnapArrayCacheSize > SNAPSHOT_SUMMARY_ITEMS); + Assert(SnapArrayCache[0] == InvalidTransactionId); + xmax = SnapArrayCache[1]; + highest_removed_subxid = SnapArrayCache[2]; + num_running_xids = (uint32) SnapArrayCache[3]; + num_removed_xids + = SnapArrayCacheSize - (num_running_xids + SNAPSHOT_SUMMARY_ITEMS); + running_xids = SnapArrayCache + SNAPSHOT_SUMMARY_ITEMS; + removed_xids = running_xids + num_running_xids; + + /* + * Scan the removed XIDs. This is enables us to work out the new xmax + * value, the number of XIDs we're certain to be able to remove from the + * running list (because they're newer than highest_removed_subxid), and + * whether or not the list of removed XIDs needs to be sorted. + */ + new_xmax = xmax; + for (n = 0; n < num_removed_xids; ++n) + { + TransactionId xid = removed_xids[n]; + + if (TransactionIdFollows(xid, new_xmax)) + new_xmax = removed_xids[n]; + if (TransactionIdFollows(xid, highest_removed_subxid)) + ++certainly_removed_xids; + if (n > 0 && TransactionIdPrecedes(xid, removed_xids[n-1])) + needsort = true; + } + + /* + * Sort the removed XIDs (unless they are already in order). + * + * This is actually mutating the underlying cache, which is OK, because + * changing the order of the removed XIDs doesn't change the semantics. + * We skip this if the data is already in order, which could happen + * either because we've sorted the same data on a previous trip through + * this function, or because all removed XIDs added since our last visit + * were removed in ascending XID order. + * + * NB: Some quicksort implementations don't perform well on data that's + * already mostly or entirely sorted. Skipping the sort in the case where + * the data is completely in order should ameliorate any problems in this + * area quite a bit, but we might need to pick another sort algorithm if + * this probes problematic. + */ + if (needsort) + { + xid_cmp_base = xmax; + qsort(removed_xids, num_removed_xids, sizeof(TransactionId), xid_cmp); + } + + /* Work out number of new XIDs being added. */ + if (new_xmax >= xmax) + xids_added = new_xmax - xmax; + else + xids_added = new_xmax - xmax - FirstNormalTransactionId; + + /* + * XXX. The rest of this function is a fantasy pending reorganization + * of what goes into a snapshot. + */ + num_new_running_xids = + num_running_xids + xids_added - certainly_removed_xids; + new_running_xids = palloc(sizeof(TransactionId) * num_new_running_xids); + num_new_running_xids = + SnapArrayComputeRunningXids(xmax, new_xmax, + num_running_xids, running_xids, + num_removed_xids, removed_xids, + num_new_running_xids, new_running_xids); +} + +/* + * Make certain that the latest data from the shared SnapArray has been copied + * into our backend-private cache. In general, this means that we must read + * the latest snapshot summary and any recently removed XIDs from shared + * memory, but we can optimize away duplicate reads of the same data. + */ +static void +SnapArrayUpdateCache(void) +{ + uint64 start_pointer; + uint64 stop_pointer; + uint64 write_pointer; + uint64 delta; + uint32 skip = 0; + + /* + * Read start and stop pointers. Once we do this, the clock is ticking. + * We must finish reading any data we care about before the buffer wraps + * around. That shouldn't be a big deal, since the buffer will normally + * be much larger than the amount of data we're copying, but we mustn't + * add any potentially slow operations after this point. + */ + SpinLockAcquire(&SnapArray->start_stop_mutex); + stop_pointer = SnapArray->stop_pointer; + start_pointer = SnapArray->start_pointer; + SpinLockRelease(&SnapArray->start_stop_mutex); + Assert(start_pointer < stop_pointer); + + /* If the stop pointer has not moved, we are done! */ + if (stop_pointer == SnapArrayCacheStopPointer) + return; + + /* If our local cache is not large enough to hold the data, grow it. */ + delta = stop_pointer - start_pointer; + if (delta < SnapArrayCacheSize) + { + Assert(delta < UINT_MAX); + SnapArrayCache = repalloc(SnapArrayCache, + sizeof(TransactionId) + * delta); + SnapArrayCacheEntries = delta; + } + + /* Copy the data. */ + if (start_pointer == SnapArrayCacheStartPointer) + { + /* We only need to copy the newly added data. */ + skip = stop_pointer - SnapArrayCacheStopPointer; + SnapArrayReadData(SnapArrayCacheStopPointer, stop_pointer, + SnapArrayCache + skip); + } + else + { + /* We need to recopy all of the data. */ + SnapArrayReadData(start_pointer, stop_pointer, SnapArrayCache); + } + + /* Did we suffer a wraparound? */ + SpinLockAcquire(&SnapArray->write_mutex); + write_pointer = SnapArray->write_pointer; + SpinLockRelease(&SnapArray->write_mutex); + if (write_pointer > start_pointer + skip + SnapArray->ring_buffer_size) + { + /* + * Oh, bummer. We'll have to redo. By acquiring the light-weight + * lock instead of the spinlock, we freeze out any concurrent updates, + * so there's no possibility of wraparound and no need to take + * the spinlock before reading the start and stop pointers. + * Unfortunately, this also reduces concurrency. And it's double + * work, so we hope it won't happen often. + * + * To reduce the chances that some other backend will immediately + * hit the same problem, we set the compaction_requested flag. This + * will cause the next backend that removes XIDs to remove subxids + * from the snapshot. + */ + LWLockAcquire(SnapArrayLock, LW_SHARED); + stop_pointer = SnapArray->stop_pointer; + start_pointer = SnapArray->start_pointer; + delta = stop_pointer - start_pointer; + if (delta < SnapArrayCacheSize) + { + Assert(delta < UINT_MAX); + SnapArrayCache = + repalloc(SnapArrayCache, + sizeof(TransactionId) * delta); + SnapArrayCacheEntries = delta; + } + SnapArrayReadData(start_pointer, stop_pointer, SnapArrayCache); + SnapArray->compaction_requested = true; + LWLockRelease(SnapArrayLock); + + /* Update statistics. */ + SpinLockAcquire(&SnapArray->write_mutex); + SnapArray->num_wraparounds++; + SpinLockRelease(&SnapArray->write_mutex); + } + + /* Bookkeeping. */ + SnapArrayCacheSize = delta; + SnapArrayCacheStartPointer = start_pointer; + SnapArrayCacheStopPointer = stop_pointer; +} + +/* + * Work out a new set of running XIDs given the existing set of running XIDs + * and a set of XIDs to be removed. + */ +static uint32 +SnapArrayComputeRunningXids(TransactionId xmax, + TransactionId new_xmax, + uint32 num_running_xids, + TransactionId *running_xids, + uint32 num_removed_xids, + TransactionId *removed_xids, + uint32 num_new_running_xids, + TransactionId *new_running_xids) +{ + uint32 i; + uint32 n = 0; + uint32 r = 0; + + /* + * Check each running XID to see whether its been removed. This is + * basically a merge join: both XID lists are sorted. + */ + for (i = 0; i < num_running_xids; ++i) + { + bool match = false; + + while (1) + { + if (n >= num_removed_xids) + break; + if (TransactionIdEquals(running_xids[i], removed_xids[n])) + match = true; + if (TransactionIdPrecedesOrEquals(running_xids[i], removed_xids[n])) + break; + ++n; + } + + if (!match) + { + if (r < num_new_running_xids) + new_running_xids[r] = running_xids[i]; + ++r; + } + } + + /* + * Next, we have to add any XIDs between the old and new xmax that have + * not been removed. Since the list of removed XIDs is sorted, we can do + * this in O(n+m) time, where n is the amount by which xmax has advanced + * and m is the number of removed XIDs greater than the old xmax. + */ + TransactionIdAdvance(xmax); + while (TransactionIdPrecedes(xmax, new_xmax)) + { + bool match = false; + + while (1) + { + if (n >= num_removed_xids) + break; + if (TransactionIdEquals(xmax, removed_xids[n])) + match = true; + if (TransactionIdPrecedesOrEquals(xmax, removed_xids[n])) + break; + ++n; + } + + if (!match) + { + if (r < num_new_running_xids) + new_running_xids[r] = xmax; + ++r; + } + + TransactionIdAdvance(xmax); + } + + /* All done. */ + return r; +} + +/* + * Write a new snapshot summary record into the SnapArray buffer. + */ +static void +SnapArrayWriteSnapshotSummary(TransactionId xmax, + TransactionId latest_removed_subxid, + uint32 nxids, + TransactionId *running_xids) +{ + TransactionId summary_info[SNAPSHOT_SUMMARY_ITEMS]; + TransactionId xmax_threshold; + uint64 write_pointer; + + /* + * When this function is invoked from elsewhere within snaparray.c, + * the caller should arrange to compact the snapshot if required. We + * include this last-ditch check mostly to protect against external + * callers. + */ + if (nxids + SNAPSHOT_SUMMARY_ITEMS > SnapArray->ring_buffer_size) + ereport(ERROR, + (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), + errmsg("snapshot too large for snapshot_buffers"))); + + /* + * InvalidTransactionId is used as a sentinel value, to mark the beginning + * of a new summary record. + * + * We cheat a bit and store the number of transaction IDs in one of the + * transaction ID slots. We could go the other way and cast all of the + * other values to uint32, but that would require a lot more adjustment + * if the size of a TransactionId were ever to change. + */ + summary_info[0] = InvalidTransactionId; + summary_info[1] = xmax; + summary_info[2] = latest_removed_subxid; + summary_info[3] = (TransactionId) nxids; + + /* Advance write pointer. */ + write_pointer = SnapArray->write_pointer; + SnapArrayAdvanceWritePointer(SNAPSHOT_SUMMARY_ITEMS + nxids); + + /* Write data. Better not fail here... */ + SnapArrayWriteData(write_pointer, SNAPSHOT_SUMMARY_ITEMS, summary_info); + SnapArrayWriteData(write_pointer + SNAPSHOT_SUMMARY_ITEMS, + nxids, running_xids); + + /* Update start and stop pointers. */ + SpinLockAcquire(&SnapArray->start_stop_mutex); + SnapArray->stop_pointer = write_pointer + SNAPSHOT_SUMMARY_ITEMS + nxids; + SnapArray->start_pointer = write_pointer; + SpinLockRelease(&SnapArray->start_stop_mutex); + + /* + * SnapArrayRemoveRunningXids uses these to decide when to summarize. + * See comments in that function for details. + */ + SnapArray->last_summary_size = SNAPSHOT_SUMMARY_ITEMS + nxids; + xmax_threshold = xmax + (4 * MaxBackends); + if (!TransactionIdIsNormal(xmax_threshold)) + xmax_threshold = FirstNormalTransactionId; + SnapArray->xmax_threshold = xmax_threshold; +} + +/* + * Read data from SnapArray, handling wraparound as needed. + * + * This does no synchronization at all and very minimal sanity checking. + * It's just here so that higher-level functions needn't duplicate the + * code to handle wraparound. + */ +static void +SnapArrayReadData(uint64 start_location, uint64 stop_location, + TransactionId *buffer) +{ + uint64 nxids = stop_location - start_location; + + Assert(nxids < SnapArray->ring_buffer_size); + Assert(nxids > 0); + + start_location %= SnapArray->ring_buffer_size; + if (start_location + nxids <= (uint64) SnapArray->ring_buffer_size) + memcpy(buffer, &SnapArray->buffer[start_location], + nxids * sizeof(TransactionId)); + else + { + uint64 entries_before_wrap; + + entries_before_wrap = SnapArray->ring_buffer_size - start_location; + memcpy(buffer, &SnapArray->buffer[start_location], + entries_before_wrap * sizeof(TransactionId)); + memcpy(buffer + entries_before_wrap, SnapArray->buffer, + (nxids - entries_before_wrap) * sizeof(TransactionId)); + } +} + +/* + * Before beginning to write the data, writers must advance the write pointer, + * so that concurrent readers can detect whether the data they were busy + * reading may have been overwritten. + */ +static void +SnapArrayAdvanceWritePointer(uint64 nitems) +{ + SpinLockAcquire(&SnapArray->write_mutex); + SnapArray->write_pointer += nitems; + SnapArray->num_writes++; + SpinLockRelease(&SnapArray->write_mutex); +} + +/* + * Write data into SnapArray, handling wraparound as needed. + * + * This does no synchronization at all and very minimal sanity checking. + * It's just here so that higher-level functions needn't duplicate the + * code to handle wraparound. + */ +static void +SnapArrayWriteData(uint64 write_location, uint32 nitems, TransactionId *item) +{ + write_location %= SnapArray->ring_buffer_size; + if (write_location + nitems <= SnapArray->ring_buffer_size) + memcpy(&SnapArray->buffer[write_location], item, + nitems * sizeof(TransactionId)); + else + { + uint64 entries_before_wrap; + + /* + * The actual constraint is tighter than what this assertion checks, + * but it'll prevent us from scribbling all over shared memory if + * our caller screws up. + */ + Assert(nitems <= SnapArray->ring_buffer_size); + + entries_before_wrap = SnapArray->ring_buffer_size - write_location; + memcpy(&SnapArray->buffer[write_location], item, + entries_before_wrap * sizeof(TransactionId)); + memcpy(SnapArray->buffer, item + entries_before_wrap, + (nitems - entries_before_wrap) * sizeof(TransactionId)); + } +} + +/* + * Helper routine for XID sorting. XIDs aren't totally ordered, so we require + * xid_cmp_base to be set to an appropriate value before using this function. + * xids will be sorted by their distance ahead of or behind this point. + */ +static int +xid_cmp(const void *a, const void *b) +{ + TransactionId *xa = (TransactionId *) a; + TransactionId *xb = (TransactionId *) b; + bool af; + bool bf; + + Assert(TransactionIdIsNormal(xid_cmp_base)); + + if (*xa == *xb) + return 0; + af = TransactionIdFollows(*xa, xid_cmp_base); + bf = TransactionIdFollows(*xb, xid_cmp_base); + if (af && !bf) + return 1; + else if (bf && !af) + return -1; + else if (TransactionIdFollows(*xa, *xb)) + return 1; + else + return -1; +} diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index f1d35a9a11..732263ccca 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -59,6 +59,7 @@ #include "replication/walreceiver.h" #include "replication/walsender.h" #include "storage/bufmgr.h" +#include "storage/snaparray.h" #include "storage/standby.h" #include "storage/fd.h" #include "storage/predicate.h" @@ -1618,6 +1619,23 @@ static struct config_int ConfigureNamesInt[] = check_temp_buffers, NULL, NULL }, + /* + * Each entry in a snapshot buffer is a TransactionId, and the total + * number of such entries must fit within a 32-bit unsigned integer. + * Meanwhile, this entry is in units of kilobytes. Hence, the + * odd-looking maximum value. + */ + { + {"snapshot_buffers", PGC_POSTMASTER, RESOURCES_MEM, + gettext_noop("Sets the amount of shared memory used for the snapshot buffer."), + NULL, + GUC_UNIT_KB + }, + &snapshot_buffers, + -1, -1, UINT_MAX / (1024 / sizeof(TransactionId)), + check_snapshot_buffers, NULL, NULL + }, + { {"port", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the TCP port the server listens on."), diff --git a/src/backend/utils/misc/postgresql.conf.sample b/src/backend/utils/misc/postgresql.conf.sample index 67b098bd6d..39d7c24691 100644 --- a/src/backend/utils/misc/postgresql.conf.sample +++ b/src/backend/utils/misc/postgresql.conf.sample @@ -115,6 +115,7 @@ # per transaction slot, plus lock space (see max_locks_per_transaction). # It is not advisable to set max_prepared_transactions nonzero unless you # actively intend to use prepared transactions. +#snapshot_buffers = -1 # -1 = auto-tune based on max_connections #work_mem = 1MB # min 64kB #maintenance_work_mem = 16MB # min 1MB #max_stack_depth = 2MB # min 100kB diff --git a/src/include/storage/lwlock.h b/src/include/storage/lwlock.h index 438a48d8dc..4b60cd685c 100644 --- a/src/include/storage/lwlock.h +++ b/src/include/storage/lwlock.h @@ -79,6 +79,7 @@ typedef enum LWLockId SerializablePredicateLockListLock, OldSerXidLock, SyncRepLock, + SnapArrayLock, /* Individual lock IDs end here */ FirstBufMappingLock, FirstLockMgrLock = FirstBufMappingLock + NUM_BUFFER_PARTITIONS, diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index a11d4385b7..6eb5eb6740 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -68,5 +68,7 @@ extern bool CountOtherDBBackends(Oid databaseId, extern void XidCacheRemoveRunningXids(TransactionId xid, int nxids, const TransactionId *xids, TransactionId latestXid); +extern uint32 GetTopXids(TransactionId xmax, uint32 nxids, + TransactionId *xids); #endif /* PROCARRAY_H */ diff --git a/src/include/storage/snaparray.h b/src/include/storage/snaparray.h new file mode 100644 index 0000000000..e677ba7b7c --- /dev/null +++ b/src/include/storage/snaparray.h @@ -0,0 +1,25 @@ +/*------------------------------------------------------------------------- + * + * snaparray.h + * IPC infrastructure for MVCC snapshots. + * + * Portions Copyright (c) 1996-2011, PostgreSQL Global Development Group + * Portions Copyright (c) 1994, Regents of the University of California + * + * src/include/storage/snaparray.h + * + *------------------------------------------------------------------------- + */ +#ifndef SNAPARRAY_H +#define SNAPARRAY_H + +extern Size SnapArrayShmemSize(void); +extern void SnapArrayShmemInit(void); +extern void SnapArrayInitNormalRunning(TransactionId xmax); +extern void SnapArrayRemoveRunningXids(uint32 xid, int nchildren, + TransactionId *children, TransactionId latest_xid); +extern void SnapArrayGetSnapshotData(Snapshot snapshot); + +extern int snapshot_buffers; + +#endif /* SNAPARRAY_H */ diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h index 8e3057a014..66cb7c0d77 100644 --- a/src/include/utils/guc.h +++ b/src/include/utils/guc.h @@ -378,4 +378,7 @@ extern void assign_search_path(const char *newval, void *extra); extern bool check_wal_buffers(int *newval, void **extra, GucSource source); extern void assign_xlog_sync_method(int new_sync_method, void *extra); +/* in storage/ipc/snaparray.c */ +extern bool check_snapshot_buffers(int *newval, void **extra, GucSource source); + #endif /* GUC_H */ -- 2.39.5