Skip to content

Commit 587c91c

Browse files
fix: Remove explicit partitions which are not particularly useful and add Watch workaround. (#675)
* fix: Remove explicit partitions which are not particularly useful and add Watch workaround. Explicit partitions can always be added back, but this reduces the public API surface unless they're needed. * fix: lint
1 parent 65ced46 commit 587c91c

File tree

4 files changed

+6
-27
lines changed

4 files changed

+6
-27
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -185,8 +185,7 @@ private void handleConfig(long partitionCount) {
185185
if (partitionCount < currentSize) {
186186
log.atWarning().log(
187187
"Received an unexpected decrease in partition count. Previous partition count %s, new count %s",
188-
currentSize,
189-
partitionCount);
188+
currentSize, partitionCount);
190189
return;
191190
}
192191
ImmutableMap.Builder<Partition, Publisher<MessageMetadata>> mapBuilder =

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java

+2-14
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import java.util.function.Consumer;
3535
import java.util.stream.Collectors;
3636
import org.apache.beam.sdk.io.range.OffsetRange;
37-
import org.apache.beam.sdk.transforms.Create;
3837
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
3938
import org.apache.beam.sdk.transforms.PTransform;
4039
import org.apache.beam.sdk.transforms.ParDo;
@@ -123,19 +122,8 @@ private TopicPath getTopicPath() {
123122
@Override
124123
public PCollection<SequencedMessage> expand(PBegin input) {
125124
PCollection<SubscriptionPartition> subscriptionPartitions;
126-
if (options.partitions().isEmpty()) {
127-
subscriptionPartitions =
128-
input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
129-
} else {
130-
subscriptionPartitions =
131-
input.apply(
132-
Create.of(
133-
options.partitions().stream()
134-
.map(
135-
partition ->
136-
SubscriptionPartition.of(options.subscriptionPath(), partition))
137-
.collect(Collectors.toList())));
138-
}
125+
subscriptionPartitions =
126+
input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath()));
139127

140128
return subscriptionPartitions.apply(
141129
ParDo.of(

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java

-10
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,7 @@
4141
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
4242
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
4343
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
44-
import com.google.common.collect.ImmutableSet;
4544
import java.io.Serializable;
46-
import java.util.Set;
4745
import org.checkerframework.checker.nullness.qual.Nullable;
4846
import org.joda.time.Duration;
4947

@@ -70,11 +68,6 @@ public abstract class SubscriberOptions implements Serializable {
7068
/** Per-partition flow control parameters for this subscription. */
7169
public abstract FlowControlSettings flowControlSettings();
7270

73-
/**
74-
* A set of partitions. If empty, continuously poll the set of partitions using an admin client.
75-
*/
76-
public abstract Set<Partition> partitions();
77-
7871
/**
7972
* The minimum wall time to pass before allowing bundle closure.
8073
*
@@ -110,7 +103,6 @@ public abstract class SubscriberOptions implements Serializable {
110103
public static Builder newBuilder() {
111104
Builder builder = new AutoValue_SubscriberOptions.Builder();
112105
return builder
113-
.setPartitions(ImmutableSet.of())
114106
.setFlowControlSettings(DEFAULT_FLOW_CONTROL)
115107
.setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
116108
}
@@ -205,8 +197,6 @@ public abstract static class Builder {
205197
public abstract Builder setSubscriptionPath(SubscriptionPath path);
206198

207199
// Optional parameters
208-
public abstract Builder setPartitions(Set<Partition> partitions);
209-
210200
public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);
211201

212202
public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);

pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriptionPartitionLoader.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ public PollResult<Partition> apply(TopicPath element, Context c) {
8686
IntStream.range(0, partitionCount)
8787
.mapToObj(Partition::of)
8888
.collect(Collectors.toList());
89-
return PollResult.incomplete(Instant.now(), partitions);
89+
return PollResult.incomplete(Instant.now(), partitions)
90+
// TODO(BEAM-12459): Remove when this is fixed upstream
91+
.withWatermark(Instant.now());
9092
}
9193
})
9294
.withPollInterval(pollDuration)

0 commit comments

Comments
 (0)