Skip to content

Commit da3304e

Browse files
authored
KAFKA-16371; fix lingering pending commit when handling OFFSET_METADATA_TOO_LARGE (apache#16072)
This patch was initially created in apache#15536. When there is a commit for multiple topic partitions and some, but not all, exceed the offset metadata limit, the pending commit is not properly cleaned up leading to UNSTABLE_OFFSET_COMMIT errors when trying to fetch the offsets with read_committed. This change makes it so the invalid commits are not added to the pendingOffsetCommits set. Co-authored-by: Kyle Phelps <[email protected]> Reviewers: Chia-Ping Tsai <[email protected]>, Justine Olshan <[email protected]>
1 parent 524ad1e commit da3304e

File tree

2 files changed

+130
-2
lines changed

2 files changed

+130
-2
lines changed

core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -494,9 +494,9 @@ class GroupMetadataManager(brokerId: Int,
494494

495495
if (isTxnOffsetCommit) {
496496
addProducerGroup(producerId, group.groupId)
497-
group.prepareTxnOffsetCommit(producerId, offsetMetadata)
497+
group.prepareTxnOffsetCommit(producerId, filteredOffsetMetadata)
498498
} else {
499-
group.prepareOffsetCommit(offsetMetadata)
499+
group.prepareOffsetCommit(filteredOffsetMetadata)
500500
}
501501

502502
appendForGroup(group, records, requestLocal, putCacheCallback, verificationGuards)

core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1664,6 +1664,134 @@ class GroupMetadataManagerTest {
16641664
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
16651665
}
16661666

1667+
@Test
1668+
def testOffsetMetadataTooLargePartialFailure(): Unit = {
1669+
val memberId = ""
1670+
val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
1671+
val validTopicIdPartition = new TopicIdPartition(topicIdPartition.topicId, 1, "foo")
1672+
val offset = 37
1673+
val requireStable = true;
1674+
1675+
groupMetadataManager.addOwnedPartition(groupPartitionId)
1676+
val group = new GroupMetadata(groupId, Empty, time)
1677+
groupMetadataManager.addGroup(group)
1678+
1679+
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
1680+
val offsets = immutable.Map(
1681+
topicIdPartition -> OffsetAndMetadata(offset, "s" * (offsetConfig.maxMetadataSize + 1) , time.milliseconds()),
1682+
validTopicIdPartition -> OffsetAndMetadata(offset, "", time.milliseconds())
1683+
)
1684+
1685+
expectAppendMessage(Errors.NONE)
1686+
1687+
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
1688+
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
1689+
commitErrors = Some(errors)
1690+
}
1691+
1692+
assertEquals(0, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
1693+
groupMetadataManager.storeOffsets(group, memberId, offsetTopicPartition, offsets, callback, verificationGuard = None)
1694+
assertTrue(group.hasOffsets)
1695+
1696+
assertEquals(Some(Map(
1697+
topicIdPartition -> Errors.OFFSET_METADATA_TOO_LARGE,
1698+
validTopicIdPartition -> Errors.NONE)
1699+
), commitErrors)
1700+
1701+
val cachedOffsets = groupMetadataManager.getOffsets(
1702+
groupId,
1703+
requireStable,
1704+
Some(Seq(topicIdPartition.topicPartition, validTopicIdPartition.topicPartition))
1705+
)
1706+
1707+
assertEquals(
1708+
Some(OffsetFetchResponse.INVALID_OFFSET),
1709+
cachedOffsets.get(topicIdPartition.topicPartition).map(_.offset)
1710+
)
1711+
assertEquals(
1712+
Some(Errors.NONE),
1713+
cachedOffsets.get(topicIdPartition.topicPartition).map(_.error)
1714+
)
1715+
assertEquals(
1716+
Some(offset),
1717+
cachedOffsets.get(validTopicIdPartition.topicPartition).map(_.offset)
1718+
)
1719+
1720+
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
1721+
}
1722+
1723+
@Test
1724+
def testTransactionalCommitOffsetWithOffsetMetadataTooLargePartialFailure(): Unit = {
1725+
val memberId = ""
1726+
val foo0 = new TopicIdPartition(Uuid.randomUuid(), 0, "foo")
1727+
val foo1 = new TopicIdPartition(Uuid.randomUuid(), 1, "foo")
1728+
val producerId = 232L
1729+
val producerEpoch = 0.toShort
1730+
1731+
groupMetadataManager.addOwnedPartition(groupPartitionId)
1732+
1733+
val group = new GroupMetadata(groupId, Empty, time)
1734+
groupMetadataManager.addGroup(group)
1735+
1736+
val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupMetadataManager.partitionFor(group.groupId))
1737+
val offsets = immutable.Map(
1738+
foo0 -> OffsetAndMetadata(37, "", time.milliseconds()),
1739+
foo1 -> OffsetAndMetadata(38, "s" * (offsetConfig.maxMetadataSize + 1), time.milliseconds())
1740+
)
1741+
1742+
val capturedResponseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
1743+
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
1744+
when(replicaManager.getMagic(any())).thenReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
1745+
var commitErrors: Option[immutable.Map[TopicIdPartition, Errors]] = None
1746+
1747+
def callback(errors: immutable.Map[TopicIdPartition, Errors]): Unit = {
1748+
commitErrors = Some(errors)
1749+
}
1750+
1751+
val verificationGuard = new VerificationGuard()
1752+
1753+
groupMetadataManager.storeOffsets(
1754+
group,
1755+
memberId,
1756+
offsetTopicPartition,
1757+
offsets,
1758+
callback,
1759+
producerId,
1760+
producerEpoch,
1761+
verificationGuard = Some(verificationGuard)
1762+
)
1763+
assertTrue(group.hasOffsets)
1764+
assertTrue(group.allOffsets.isEmpty)
1765+
1766+
verify(replicaManager).appendRecords(anyLong(),
1767+
anyShort(),
1768+
any(),
1769+
any(),
1770+
any[Map[TopicPartition, MemoryRecords]],
1771+
capturedResponseCallback.capture(),
1772+
any[Option[ReentrantLock]],
1773+
any(),
1774+
any(),
1775+
any(),
1776+
ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
1777+
verify(replicaManager).getMagic(any())
1778+
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
1779+
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
1780+
1781+
assertEquals(Some(Map(
1782+
foo0 -> Errors.NONE,
1783+
foo1 -> Errors.OFFSET_METADATA_TOO_LARGE
1784+
)), commitErrors)
1785+
1786+
assertTrue(group.hasOffsets)
1787+
assertTrue(group.allOffsets.isEmpty)
1788+
1789+
group.completePendingTxnOffsetCommit(producerId, isCommit = true)
1790+
assertTrue(group.hasOffsets)
1791+
assertFalse(group.allOffsets.isEmpty)
1792+
assertEquals(offsets.get(foo0), group.offset(foo0.topicPartition))
1793+
}
1794+
16671795
@Test
16681796
def testExpireOffset(): Unit = {
16691797
val memberId = ""

0 commit comments

Comments
 (0)