Skip to content

Commit c1cf1d1

Browse files
fix: Add admin client in producer settings (#82)
1 parent c6efa5e commit c1cf1d1

File tree

1 file changed

+7
-6
lines changed

1 file changed

+7
-6
lines changed

src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java

+7-6
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,16 @@ public abstract static class Builder {
4848
public abstract ProducerSettings build();
4949
}
5050

51+
private AdminClient newAdminClient() {
52+
return AdminClient.create(
53+
AdminClientSettings.newBuilder().setRegion(topicPath().location().region()).build());
54+
}
55+
5156
public Producer<byte[], byte[]> instantiate() throws ApiException {
5257
PartitionCountWatchingPublisherSettings publisherSettings =
5358
PartitionCountWatchingPublisherSettings.newBuilder()
5459
.setTopic(topicPath())
60+
.setAdminClient(newAdminClient())
5561
.setPublisherFactory(
5662
partition -> {
5763
try {
@@ -72,12 +78,7 @@ public Producer<byte[], byte[]> instantiate() throws ApiException {
7278
}
7379
})
7480
.build();
75-
SharedBehavior shared =
76-
new SharedBehavior(
77-
AdminClient.create(
78-
AdminClientSettings.newBuilder()
79-
.setRegion(topicPath().location().region())
80-
.build()));
81+
SharedBehavior shared = new SharedBehavior(newAdminClient());
8182
return new PubsubLiteProducer(publisherSettings.instantiate(), shared, topicPath());
8283
}
8384
}

0 commit comments

Comments
 (0)