typedef struct ConsumerSync
{
- LWLockId cs_lwlock; /* Synchronize access to the consumer queue */
+ LWLock *cs_lwlock; /* Synchronize access to the consumer queue */
Latch cs_latch; /* The latch consumer is waiting on */
} ConsumerSync;
* is SharedQueue
*/
static HTAB *SharedQueues = NULL;
-
+static LWLockPadded *SQueueLocks = NULL;
/*
* Pool of synchronization items
&found);
if (!found)
{
- int i;
+ int i, l;
+ int nlocks = (NUM_SQUEUES * (MaxDataNodes-1));
+ LWLockTranche tranche;
+
+ /* Initialize LWLocks for queues */
+ SQueueLocks = (LWLockPadded *) ShmemAlloc(sizeof(LWLockPadded) * nlocks);
+
+ tranche.name = "Shared Queue Locks";
+ tranche.array_base = SQueueLocks;
+ tranche.array_stride = sizeof(LWLockPadded);
+
+ /* Register the trannche tranche in the main tranches array */
+ LWLockRegisterTranche(LWTRANCHE_SHARED_QUEUES, &tranche);
+
+ Assert(SQueueLocks == GetNamedLWLockTranche("Shared Queue Locks"));
+ l = 0;
for (i = 0; i < NUM_SQUEUES; i++)
{
SQueueSync *sqs = GET_SQUEUE_SYNC(i);
for (j = 0; j < MaxDataNodes-1; j++)
{
InitSharedLatch(&sqs->sqs_consumer_sync[j].cs_latch);
- sqs->sqs_consumer_sync[j].cs_lwlock = LWLockAssign();
+ sqs->sqs_consumer_sync[j].cs_lwlock = &(SQueueLocks[l++]).lock;
}
}
}
int numLocks = 0;
int i;
-#ifdef XCP
- /* squeue.c needs one per consumer node in each shared queue.
- * Max number of consumers is MaxDataNodes-1 */
- numLocks += NUM_SQUEUES * (MaxDataNodes-1);
-#endif
for (i = 0; i < NamedLWLockTrancheRequests; i++)
numLocks += NamedLWLockTrancheRequestArray[i].num_lwlocks;
LWTRANCHE_BUFFER_MAPPING,
LWTRANCHE_LOCK_MANAGER,
LWTRANCHE_PREDICATE_LOCK_MANAGER,
- LWTRANCHE_FIRST_USER_DEFINED
+ LWTRANCHE_FIRST_USER_DEFINED,
+ LWTRANCHE_SHARED_QUEUES
} BuiltinTrancheIds;
/*