Skip to content

Commit 13f2138

Browse files
feat: Add support for increasing partitions to the kafka shim (#37)
* feat: Add support for increasing partitions to the kafka shim * Updated to address comments
1 parent 5a5ff68 commit 13f2138

File tree

9 files changed

+140
-40
lines changed

9 files changed

+140
-40
lines changed

pom.xml

+4-4
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,26 @@
33
<parent>
44
<groupId>com.google.cloud</groupId>
55
<artifactId>google-cloud-pubsublite-parent</artifactId>
6-
<version>0.6.5</version>
6+
<version>0.7.0</version>
77
</parent>
88
<modelVersion>4.0.0</modelVersion>
99
<groupId>com.google.cloud</groupId>
1010
<artifactId>pubsublite-kafka</artifactId>
1111
<version>0.1.2-SNAPSHOT</version><!-- {x-version-update:pubsublite-kafka:current} -->
1212
<packaging>jar</packaging>
1313
<name>Pub/Sub Lite Kafka Shim</name>
14-
<url>https://github.com/googleapis/java-pubsublite</url>
14+
<url>https://github.com/googleapis/java-pubsublite-kafka</url>
1515
<description>Kafka Producer and Consumer for Google Cloud Pub/Sub Lite</description>
1616
<dependencies>
1717
<dependency>
1818
<groupId>com.google.api.grpc</groupId>
1919
<artifactId>proto-google-cloud-pubsublite-v1</artifactId>
20-
<version>0.6.1</version>
20+
<version>0.7.0</version>
2121
</dependency>
2222
<dependency>
2323
<groupId>com.google.cloud</groupId>
2424
<artifactId>google-cloud-pubsublite</artifactId>
25-
<version>0.6.1</version>
25+
<version>0.7.0</version>
2626
</dependency>
2727
<dependency>
2828
<groupId>org.apache.kafka</groupId>

src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java

+5-9
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.google.cloud.pubsublite.AdminClient;
2222
import com.google.cloud.pubsublite.AdminClientSettings;
2323
import com.google.cloud.pubsublite.CloudZone;
24-
import com.google.cloud.pubsublite.PartitionLookupUtils;
2524
import com.google.cloud.pubsublite.SubscriptionPath;
2625
import com.google.cloud.pubsublite.TopicPath;
2726
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
@@ -73,7 +72,6 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
7372
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) {
7473
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
7574
TopicPath topic = TopicPath.parse(subscription.getTopic());
76-
long partitionCount = PartitionLookupUtils.numPartitions(topic);
7775
AssignerFactory assignerFactory =
7876
receiver -> {
7977
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
@@ -110,14 +108,12 @@ public Consumer<byte[], byte[]> instantiate() throws ApiException {
110108

111109
CursorClient cursorClient =
112110
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());
113-
111+
SharedBehavior shared =
112+
new SharedBehavior(
113+
AdminClient.create(
114+
AdminClientSettings.newBuilder().setRegion(topic.location().region()).build()));
114115
return new PubsubLiteConsumer(
115-
subscriptionPath(),
116-
topic,
117-
partitionCount,
118-
consumerFactory,
119-
assignerFactory,
120-
cursorClient);
116+
subscriptionPath(), topic, shared, consumerFactory, assignerFactory, cursorClient);
121117
} catch (Exception e) {
122118
throw ExtractStatus.toCanonical(e).underlying;
123119
}

src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java

+14-12
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,13 @@
1616

1717
package com.google.cloud.pubsublite.kafka;
1818

19-
import static com.google.cloud.pubsublite.ProjectLookupUtils.toCanonical;
20-
2119
import com.google.api.gax.rpc.ApiException;
2220
import com.google.auto.value.AutoValue;
23-
import com.google.cloud.pubsublite.PartitionLookupUtils;
21+
import com.google.cloud.pubsublite.AdminClient;
22+
import com.google.cloud.pubsublite.AdminClientSettings;
2423
import com.google.cloud.pubsublite.TopicPath;
25-
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
24+
import com.google.cloud.pubsublite.internal.wire.*;
2625
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
27-
import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder;
28-
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
2926
import org.apache.kafka.clients.producer.Producer;
3027

3128
@AutoValue
@@ -48,18 +45,23 @@ public abstract static class Builder {
4845
}
4946

5047
public Producer<byte[], byte[]> instantiate() throws ApiException {
51-
TopicPath canonicalTopic = toCanonical(topicPath());
52-
RoutingPublisherBuilder.Builder routingBuilder =
53-
RoutingPublisherBuilder.newBuilder()
54-
.setTopic(canonicalTopic)
48+
PartitionCountWatchingPublisherSettings.Builder publisherSettings =
49+
PartitionCountWatchingPublisherSettings.newBuilder()
50+
.setTopic(topicPath())
5551
.setPublisherFactory(
5652
partition ->
5753
SinglePartitionPublisherBuilder.newBuilder()
5854
.setContext(PubsubContext.of(FRAMEWORK))
59-
.setTopic(canonicalTopic)
55+
.setTopic(topicPath())
6056
.setPartition(partition)
6157
.build());
58+
SharedBehavior shared =
59+
new SharedBehavior(
60+
AdminClient.create(
61+
AdminClientSettings.newBuilder()
62+
.setRegion(topicPath().location().region())
63+
.build()));
6264
return new PubsubLiteProducer(
63-
routingBuilder.build(), PartitionLookupUtils.numPartitions(canonicalTopic), canonicalTopic);
65+
new PartitionCountWatchingPublisher(publisherSettings.build()), shared, topicPath());
6466
}
6567
}

src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java

+9-4
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
7070
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
7171
private final SubscriptionPath subscriptionPath;
7272
private final TopicPath topicPath;
73-
private final long partitionCount;
73+
private final SharedBehavior shared;
7474
private final ConsumerFactory consumerFactory;
7575
private final AssignerFactory assignerFactory;
7676
private final CursorClient cursorClient;
@@ -80,13 +80,13 @@ class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
8080
PubsubLiteConsumer(
8181
SubscriptionPath subscriptionPath,
8282
TopicPath topicPath,
83-
long partitionCount,
83+
SharedBehavior shared,
8484
ConsumerFactory consumerFactory,
8585
AssignerFactory assignerFactory,
8686
CursorClient cursorClient) {
8787
this.subscriptionPath = subscriptionPath;
8888
this.topicPath = topicPath;
89-
this.partitionCount = partitionCount;
89+
this.shared = shared;
9090
this.consumerFactory = consumerFactory;
9191
this.assignerFactory = assignerFactory;
9292
this.cursorClient = cursorClient;
@@ -440,7 +440,7 @@ public List<PartitionInfo> partitionsFor(String s) {
440440
@Override
441441
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
442442
checkTopic(topic);
443-
return SharedBehavior.partitionsFor(partitionCount, topicPath);
443+
return shared.partitionsFor(topicPath, timeout);
444444
}
445445

446446
@Override
@@ -511,6 +511,11 @@ public void close(Duration timeout) {
511511
} catch (Exception e) {
512512
logger.atSevere().withCause(e).log("Error closing cursor client during Consumer shutdown.");
513513
}
514+
try {
515+
shared.close();
516+
} catch (Exception e) {
517+
logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
518+
}
514519
unsubscribe();
515520
}
516521

src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,21 @@
5151
import org.apache.kafka.common.errors.UnsupportedVersionException;
5252

5353
class PubsubLiteProducer implements Producer<byte[], byte[]> {
54+
private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
5455
private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION =
5556
new UnsupportedVersionException(
5657
"Pub/Sub Lite is a non-transactional system and does not support producer transactions.");
5758
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
5859

60+
private final SharedBehavior shared;
5961
private final Publisher<PublishMetadata> publisher;
6062
private final TopicPath topicPath;
61-
private final long partitionCount;
6263

6364
PubsubLiteProducer(
64-
Publisher<PublishMetadata> publisher, long partitionCount, TopicPath topicPath) {
65+
Publisher<PublishMetadata> publisher, SharedBehavior shared, TopicPath topicPath) {
6566
this.publisher = publisher;
67+
this.shared = shared;
6668
this.topicPath = topicPath;
67-
this.partitionCount = partitionCount;
6869
this.publisher.addListener(
6970
new Listener() {
7071
@Override
@@ -175,7 +176,7 @@ public void flush() {
175176
@Override
176177
public List<PartitionInfo> partitionsFor(String s) {
177178
checkTopic(s);
178-
return SharedBehavior.partitionsFor(partitionCount, topicPath);
179+
return shared.partitionsFor(topicPath, INFINITE_DURATION);
179180
}
180181

181182
@Override
@@ -190,6 +191,11 @@ public void close() {
190191

191192
@Override
192193
public void close(Duration duration) {
194+
try {
195+
shared.close();
196+
} catch (Exception e) {
197+
logger.atSevere().withCause(e).log("Error closing admin client during Producer shutdown.");
198+
}
193199
try {
194200
publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS);
195201
} catch (TimeoutException e) {

src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java

+18-4
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,24 @@
1818

1919
import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;
2020

21+
import com.google.cloud.pubsublite.AdminClient;
2122
import com.google.cloud.pubsublite.Partition;
2223
import com.google.cloud.pubsublite.TopicPath;
2324
import com.google.common.collect.ImmutableList;
25+
import java.time.Duration;
2426
import java.util.List;
27+
import java.util.concurrent.TimeUnit;
2528
import org.apache.kafka.common.PartitionInfo;
2629

2730
/** Shared behavior for producer and consumer. */
28-
final class SharedBehavior {
29-
private SharedBehavior() {}
31+
final class SharedBehavior implements AutoCloseable {
32+
private final AdminClient client;
3033

31-
static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
34+
SharedBehavior(AdminClient client) {
35+
this.client = client;
36+
}
37+
38+
private static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
3239
return new PartitionInfo(
3340
topic.toString(),
3441
(int) partition.value(),
@@ -37,8 +44,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
3744
PubsubLiteNode.NODES);
3845
}
3946

40-
static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
47+
List<PartitionInfo> partitionsFor(TopicPath topic, Duration timeout) {
4148
try {
49+
long partitionCount =
50+
client.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
4251
ImmutableList.Builder<PartitionInfo> result = ImmutableList.builder();
4352
for (int i = 0; i < partitionCount; ++i) {
4453
result.add(toPartitionInfo(topic, Partition.of(i)));
@@ -48,4 +57,9 @@ static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
4857
throw toKafka(t);
4958
}
5059
}
60+
61+
@Override
62+
public void close() {
63+
client.close();
64+
}
5165
}

src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java

+20-1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
import com.google.api.core.ApiFutures;
3232
import com.google.api.core.SettableApiFuture;
33+
import com.google.cloud.pubsublite.AdminClient;
3334
import com.google.cloud.pubsublite.CloudZone;
3435
import com.google.cloud.pubsublite.Offset;
3536
import com.google.cloud.pubsublite.Partition;
@@ -52,6 +53,7 @@
5253
import com.google.common.collect.Multimaps;
5354
import com.google.common.reflect.ImmutableTypeToInstanceMap;
5455
import java.time.Duration;
56+
import java.util.List;
5557
import java.util.concurrent.atomic.AtomicReference;
5658
import java.util.regex.Pattern;
5759
import org.apache.kafka.clients.consumer.Consumer;
@@ -61,6 +63,7 @@
6163
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
6264
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
6365
import org.apache.kafka.common.KafkaException;
66+
import org.apache.kafka.common.PartitionInfo;
6467
import org.apache.kafka.common.TopicPartition;
6568
import org.apache.kafka.common.errors.TimeoutException;
6669
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -98,6 +101,7 @@ private static <T> T example(Class<T> klass) {
98101
@Mock ConsumerFactory consumerFactory;
99102
@Mock AssignerFactory assignerFactory;
100103
@Mock CursorClient cursorClient;
104+
@Mock AdminClient adminClient;
101105

102106
@Mock Assigner assigner;
103107
@Mock SingleSubscriptionConsumer underlying;
@@ -111,7 +115,7 @@ public void setUp() {
111115
new PubsubLiteConsumer(
112116
example(SubscriptionPath.class),
113117
example(TopicPath.class),
114-
3,
118+
new SharedBehavior(adminClient),
115119
consumerFactory,
116120
assignerFactory,
117121
cursorClient);
@@ -455,4 +459,19 @@ public void seek() {
455459
example(Partition.class),
456460
SeekRequest.newBuilder().setNamedTarget(NamedTarget.HEAD).build());
457461
}
462+
463+
@Test
464+
public void partitionsFor() {
465+
when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
466+
.thenReturn(ApiFutures.immediateFuture(2L));
467+
List<PartitionInfo> info = consumer.partitionsFor(example(TopicPath.class).toString());
468+
assertThat(info.size()).isEqualTo(2L);
469+
}
470+
471+
@Test
472+
public void close() {
473+
consumer.close();
474+
verify(adminClient).close();
475+
verify(cursorClient).close();
476+
}
458477
}

src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
import static org.mockito.Mockito.verify;
2424
import static org.mockito.Mockito.when;
2525

26+
import com.google.api.core.ApiFutures;
2627
import com.google.api.core.SettableApiFuture;
2728
import com.google.api.gax.rpc.StatusCode.Code;
29+
import com.google.cloud.pubsublite.AdminClient;
2830
import com.google.cloud.pubsublite.Message;
2931
import com.google.cloud.pubsublite.Offset;
3032
import com.google.cloud.pubsublite.Partition;
@@ -35,18 +37,21 @@
3537
import com.google.cloud.pubsublite.internal.Publisher;
3638
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
3739
import com.google.common.collect.ImmutableMap;
40+
import java.util.List;
3841
import java.util.concurrent.Future;
3942
import java.util.concurrent.TimeUnit;
4043
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
4144
import org.apache.kafka.clients.producer.Producer;
4245
import org.apache.kafka.clients.producer.ProducerRecord;
4346
import org.apache.kafka.clients.producer.RecordMetadata;
47+
import org.apache.kafka.common.PartitionInfo;
4448
import org.apache.kafka.common.TopicPartition;
4549
import org.apache.kafka.common.errors.UnsupportedVersionException;
4650
import org.junit.Before;
4751
import org.junit.Test;
4852
import org.junit.runner.RunWith;
4953
import org.junit.runners.JUnit4;
54+
import org.mockito.Mock;
5055
import org.mockito.MockitoAnnotations;
5156
import org.mockito.Spy;
5257

@@ -64,13 +69,16 @@ abstract static class FakePublisher extends FakeApiService
6469
example(TopicPath.class).toString(), (int) example(Partition.class).value());
6570

6671
@Spy FakePublisher underlying;
72+
@Mock AdminClient adminClient;
6773

6874
Producer<byte[], byte[]> producer;
6975

7076
@Before
7177
public void setUp() {
7278
MockitoAnnotations.initMocks(this);
73-
producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class));
79+
producer =
80+
new PubsubLiteProducer(
81+
underlying, new SharedBehavior(adminClient), example(TopicPath.class));
7482
verify(underlying).startAsync();
7583
verify(underlying).awaitRunning();
7684
}
@@ -207,7 +215,16 @@ public void flush() throws Exception {
207215
@Test
208216
public void close() throws Exception {
209217
producer.close();
218+
verify(adminClient).close();
210219
verify(underlying).stopAsync();
211220
verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
212221
}
222+
223+
@Test
224+
public void partitionsFor() {
225+
when(adminClient.getTopicPartitionCount(example(TopicPath.class)))
226+
.thenReturn(ApiFutures.immediateFuture(2L));
227+
List<PartitionInfo> info = producer.partitionsFor(example(TopicPath.class).toString());
228+
assertThat(info.size()).isEqualTo(2L);
229+
}
213230
}

0 commit comments

Comments
 (0)