Skip to content

Commit 7106862

Browse files
feat: Add setTopicPathOverride to consumer settings (#174)
* feat: Add setTopicPathOverride to consumer settings * fix: Clirr
1 parent 665890e commit 7106862

File tree

2 files changed

+42
-5
lines changed

2 files changed

+42
-5
lines changed

clirr-ignored-differences.xml

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
3+
<differences>
4+
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
5+
<difference>
6+
<differenceType>7013</differenceType>
7+
<className>**/*$Builder</className>
8+
<method>*</method>
9+
</difference>
10+
</differences>

src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java

+32-5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings;
4848
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
4949
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
50+
import java.util.Optional;
5051
import org.apache.kafka.clients.consumer.Consumer;
5152

5253
@AutoValue
@@ -61,29 +62,55 @@ public abstract class ConsumerSettings {
6162
// Optional parameters.
6263
abstract boolean autocommit();
6364

65+
abstract Optional<TopicPath> topicPathOverride();
66+
6467
public static Builder newBuilder() {
6568
return (new AutoValue_ConsumerSettings.Builder()).setAutocommit(false);
6669
}
6770

6871
@AutoValue.Builder
6972
public abstract static class Builder {
7073
// Required parameters.
74+
/**
75+
* The subscription path to use. Only the topic corresponding to this subscription can be
76+
* subscribed to.
77+
*/
7178
public abstract Builder setSubscriptionPath(SubscriptionPath path);
7279

80+
/** The per-partition flow control settings. */
7381
public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings);
7482

7583
// Optional parameters.
84+
/** The autocommit mode. */
7685
public abstract Builder setAutocommit(boolean autocommit);
7786

87+
/**
88+
* An override for the TopicPath used by this consumer.
89+
*
90+
* <p>When this is set, the topic path of the subscription will not be fetched: instead, the
91+
* topic used in methods will be compared with the provided TopicPath object.
92+
*
93+
* <p>This is useful if you do not have the pubsublite.subscriptions.get permission for the
94+
* subscription.
95+
*/
96+
public abstract Builder setTopicPathOverride(TopicPath topicPath);
97+
7898
public abstract ConsumerSettings build();
7999
}
80100

81101
public Consumer<byte[], byte[]> instantiate() throws ApiException {
82-
CloudRegion region = subscriptionPath().location().extractRegion();
83-
try (AdminClient adminClient =
84-
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) {
85-
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
86-
TopicPath topic = TopicPath.parse(subscription.getTopic());
102+
try {
103+
CloudRegion region = subscriptionPath().location().extractRegion();
104+
TopicPath topic;
105+
if (topicPathOverride().isPresent()) {
106+
topic = topicPathOverride().get();
107+
} else {
108+
try (AdminClient adminClient =
109+
AdminClient.create(AdminClientSettings.newBuilder().setRegion(region).build())) {
110+
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
111+
topic = TopicPath.parse(subscription.getTopic());
112+
}
113+
}
87114
AssignerFactory assignerFactory =
88115
receiver -> {
89116
try {

0 commit comments

Comments
 (0)