Skip to content

Commit f484754

Browse files
feat: Compute head offset for Spark connector micro batch mode. (#439)
1 parent 624b123 commit f484754

File tree

10 files changed

+301
-98
lines changed

10 files changed

+301
-98
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClient.java

+14
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.pubsublite.Partition;
2323
import com.google.cloud.pubsublite.TopicPath;
2424
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
25+
import com.google.cloud.pubsublite.proto.Cursor;
2526

2627
public interface TopicStatsClient extends ApiBackgroundResource {
2728
static TopicStatsClient create(TopicStatsClientSettings settings) throws ApiException {
@@ -43,4 +44,17 @@ static TopicStatsClient create(TopicStatsClientSettings settings) throws ApiExce
4344
*/
4445
ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
4546
TopicPath path, Partition partition, Offset start, Offset end);
47+
48+
/**
49+
* Compute the head cursor for the partition. The head cursor's offset is guaranteed to be before
50+
* or equal to all messages which have not yet been acknowledged to be published, and greater than
51+
* the offset of any message whose publish has already been acknowledged. It is 0 if there have
52+
* never been messages on the partition.
53+
*
54+
* @param path The topic to compute head cursor on
55+
* @param partition The partition to compute head cursor for
56+
* @return A future that will have either an error {@link ApiException} or the head cursor on
57+
* success.
58+
*/
59+
ApiFuture<Cursor> computeHeadCursor(TopicPath path, Partition partition);
4660
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClientImpl.java

+18
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,18 @@
1616
package com.google.cloud.pubsublite.internal;
1717

1818
import com.google.api.core.ApiFuture;
19+
import com.google.api.core.ApiFutures;
1920
import com.google.cloud.pubsublite.CloudRegion;
2021
import com.google.cloud.pubsublite.Offset;
2122
import com.google.cloud.pubsublite.Partition;
2223
import com.google.cloud.pubsublite.TopicPath;
24+
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
25+
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
2326
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
2427
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
2528
import com.google.cloud.pubsublite.proto.Cursor;
2629
import com.google.cloud.pubsublite.v1.TopicStatsServiceClient;
30+
import com.google.common.util.concurrent.MoreExecutors;
2731

2832
public class TopicStatsClientImpl extends ApiResourceAggregation implements TopicStatsClient {
2933
private final CloudRegion region;
@@ -53,4 +57,18 @@ public ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
5357
.setEndCursor(Cursor.newBuilder().setOffset(end.value()).build())
5458
.build());
5559
}
60+
61+
@Override
62+
public ApiFuture<Cursor> computeHeadCursor(TopicPath path, Partition partition) {
63+
return ApiFutures.transform(
64+
serviceClient
65+
.computeHeadCursorCallable()
66+
.futureCall(
67+
ComputeHeadCursorRequest.newBuilder()
68+
.setTopic(path.toString())
69+
.setPartition(partition.value())
70+
.build()),
71+
ComputeHeadCursorResponse::getHeadCursor,
72+
MoreExecutors.directExecutor());
73+
}
5674
}

pubsublite-spark-sql-streaming/pom.xml

+10
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,11 @@
104104
<artifactId>jackson-core</artifactId>
105105
<version>2.12.0</version>
106106
</dependency>
107+
<dependency>
108+
<groupId>com.github.ben-manes.caffeine</groupId>
109+
<artifactId>caffeine</artifactId>
110+
<version>2.8.8</version>
111+
</dependency>
107112
<dependency>
108113
<groupId>com.fasterxml.jackson.core</groupId>
109114
<artifactId>jackson-databind</artifactId>
@@ -135,6 +140,11 @@
135140
<scope>test</scope>
136141
<version>3.6.0</version>
137142
</dependency>
143+
<dependency>
144+
<groupId>com.google.guava</groupId>
145+
<artifactId>guava-testlib</artifactId>
146+
<scope>test</scope>
147+
</dependency>
138148
</dependencies>
139149

140150
<build>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.spark;
18+
19+
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
20+
import com.github.benmanes.caffeine.cache.Caffeine;
21+
import com.github.benmanes.caffeine.cache.Ticker;
22+
import com.google.api.core.ApiFutureCallback;
23+
import com.google.api.core.ApiFutures;
24+
import com.google.cloud.pubsublite.Offset;
25+
import com.google.cloud.pubsublite.Partition;
26+
import com.google.cloud.pubsublite.TopicPath;
27+
import com.google.cloud.pubsublite.internal.TopicStatsClient;
28+
import com.google.cloud.pubsublite.proto.Cursor;
29+
import com.google.common.annotations.VisibleForTesting;
30+
import com.google.common.util.concurrent.MoreExecutors;
31+
import java.util.HashSet;
32+
import java.util.Map;
33+
import java.util.Set;
34+
import java.util.concurrent.CompletableFuture;
35+
import java.util.concurrent.Executor;
36+
import java.util.concurrent.TimeUnit;
37+
38+
/**
39+
* Rate limited HeadOffsetReader, utilizing a LoadingCache that refreshes all partitions head
40+
* offsets for the topic at most once per minute.
41+
*/
42+
public class LimitingHeadOffsetReader implements PerTopicHeadOffsetReader {
43+
44+
private final TopicStatsClient topicStatsClient;
45+
private final TopicPath topic;
46+
private final long topicPartitionCount;
47+
private final AsyncLoadingCache<Partition, Offset> cachedHeadOffsets;
48+
49+
@VisibleForTesting
50+
public LimitingHeadOffsetReader(
51+
TopicStatsClient topicStatsClient, TopicPath topic, long topicPartitionCount, Ticker ticker) {
52+
this.topicStatsClient = topicStatsClient;
53+
this.topic = topic;
54+
this.topicPartitionCount = topicPartitionCount;
55+
this.cachedHeadOffsets =
56+
Caffeine.newBuilder()
57+
.ticker(ticker)
58+
.expireAfterWrite(1, TimeUnit.MINUTES)
59+
.buildAsync(this::loadHeadOffset);
60+
}
61+
62+
private CompletableFuture<Offset> loadHeadOffset(Partition partition, Executor executor) {
63+
64+
CompletableFuture<Offset> result = new CompletableFuture<>();
65+
ApiFutures.addCallback(
66+
topicStatsClient.computeHeadCursor(topic, partition),
67+
new ApiFutureCallback<Cursor>() {
68+
@Override
69+
public void onFailure(Throwable t) {
70+
result.completeExceptionally(t);
71+
}
72+
73+
@Override
74+
public void onSuccess(Cursor c) {
75+
result.complete(Offset.of(c.getOffset()));
76+
}
77+
},
78+
MoreExecutors.directExecutor());
79+
return result;
80+
}
81+
82+
@Override
83+
public PslSourceOffset getHeadOffset() {
84+
Set<Partition> keySet = new HashSet<>();
85+
for (int i = 0; i < topicPartitionCount; i++) {
86+
keySet.add(Partition.of(i));
87+
}
88+
CompletableFuture<Map<Partition, Offset>> future = cachedHeadOffsets.getAll(keySet);
89+
try {
90+
return PslSourceOffset.builder().partitionOffsetMap(future.get()).build();
91+
} catch (Throwable t) {
92+
throw new IllegalStateException("Unable to compute head offset for topic: " + topic, t);
93+
}
94+
}
95+
96+
@Override
97+
public void close() {
98+
topicStatsClient.close();
99+
}
100+
}

pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/HeadOffsetReader.java renamed to pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PerTopicHeadOffsetReader.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@
1616

1717
package com.google.cloud.pubsublite.spark;
1818

19-
import com.google.cloud.pubsublite.TopicPath;
20-
import com.google.cloud.pubsublite.internal.CheckedApiException;
2119
import java.io.Closeable;
2220

23-
public interface HeadOffsetReader extends Closeable {
21+
public interface PerTopicHeadOffsetReader extends Closeable {
2422

25-
// Gets the head offsets for all partitions in a topic. Blocks.
26-
PslSourceOffset getHeadOffset(TopicPath topic) throws CheckedApiException;
23+
// Gets the head offsets for all partitions in the topic. Blocks.
24+
PslSourceOffset getHeadOffset();
2725

2826
@Override
2927
void close();

pubsublite-spark-sql-streaming/src/main/java/com/google/cloud/pubsublite/spark/PslDataSource.java

+10-88
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,13 @@
1616

1717
package com.google.cloud.pubsublite.spark;
1818

19-
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
20-
19+
import com.github.benmanes.caffeine.cache.Ticker;
2120
import com.google.auto.service.AutoService;
2221
import com.google.cloud.pubsublite.AdminClient;
23-
import com.google.cloud.pubsublite.Partition;
2422
import com.google.cloud.pubsublite.PartitionLookupUtils;
2523
import com.google.cloud.pubsublite.SubscriptionPath;
2624
import com.google.cloud.pubsublite.TopicPath;
27-
import com.google.cloud.pubsublite.internal.CheckedApiException;
2825
import com.google.cloud.pubsublite.internal.CursorClient;
29-
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
30-
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
31-
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
32-
import com.google.cloud.pubsublite.internal.wire.ServiceClients;
33-
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
34-
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
35-
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
36-
import com.google.common.collect.ImmutableMap;
37-
import java.io.IOException;
3826
import java.util.Objects;
3927
import java.util.Optional;
4028
import org.apache.spark.sql.sources.DataSourceRegister;
@@ -69,20 +57,10 @@ public ContinuousReader createContinuousReader(
6957
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
7058
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
7159
long topicPartitionCount = PartitionLookupUtils.numPartitions(subscriptionPath, adminClient);
72-
MultiPartitionCommitter committer =
73-
new MultiPartitionCommitterImpl(
74-
topicPartitionCount,
75-
(partition) ->
76-
CommitterBuilder.newBuilder()
77-
.setSubscriptionPath(subscriptionPath)
78-
.setPartition(partition)
79-
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
80-
.build());
81-
8260
return new PslContinuousReader(
8361
cursorClient,
84-
committer,
85-
getSubscriberFactory(new PslCredentialsProvider(pslDataSourceOptions), subscriptionPath),
62+
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
63+
pslDataSourceOptions.getSubscriberFactory(),
8664
subscriptionPath,
8765
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
8866
topicPartitionCount);
@@ -98,7 +76,6 @@ public MicroBatchReader createMicroBatchReader(
9876

9977
PslDataSourceOptions pslDataSourceOptions =
10078
PslDataSourceOptions.fromSparkDataSourceOptions(options);
101-
PslCredentialsProvider credentialsProvider = new PslCredentialsProvider(pslDataSourceOptions);
10279
CursorClient cursorClient = pslDataSourceOptions.newCursorClient();
10380
AdminClient adminClient = pslDataSourceOptions.newAdminClient();
10481
SubscriptionPath subscriptionPath = pslDataSourceOptions.subscriptionPath();
@@ -110,72 +87,17 @@ public MicroBatchReader createMicroBatchReader(
11087
"Unable to get topic for subscription " + subscriptionPath, t);
11188
}
11289
long topicPartitionCount = PartitionLookupUtils.numPartitions(topicPath, adminClient);
113-
MultiPartitionCommitter committer =
114-
new MultiPartitionCommitterImpl(
115-
topicPartitionCount,
116-
(partition) ->
117-
CommitterBuilder.newBuilder()
118-
.setSubscriptionPath(subscriptionPath)
119-
.setPartition(partition)
120-
.setServiceClient(pslDataSourceOptions.newCursorServiceClient())
121-
.build());
122-
12390
return new PslMicroBatchReader(
12491
cursorClient,
125-
committer,
126-
getSubscriberFactory(new PslCredentialsProvider(pslDataSourceOptions), subscriptionPath),
92+
pslDataSourceOptions.newMultiPartitionCommitter(topicPartitionCount),
93+
pslDataSourceOptions.getSubscriberFactory(),
94+
new LimitingHeadOffsetReader(
95+
pslDataSourceOptions.newTopicStatsClient(),
96+
topicPath,
97+
topicPartitionCount,
98+
Ticker.systemTicker()),
12799
subscriptionPath,
128-
PslSparkUtils.toSparkSourceOffset(getHeadOffset(topicPath)),
129100
Objects.requireNonNull(pslDataSourceOptions.flowControlSettings()),
130101
topicPartitionCount);
131102
}
132-
133-
private static PslSourceOffset getHeadOffset(TopicPath topicPath) {
134-
// TODO(jiangmichael): Replace it with real implementation.
135-
HeadOffsetReader headOffsetReader =
136-
new HeadOffsetReader() {
137-
@Override
138-
public PslSourceOffset getHeadOffset(TopicPath topic) {
139-
return PslSourceOffset.builder()
140-
.partitionOffsetMap(
141-
ImmutableMap.of(
142-
Partition.of(0), com.google.cloud.pubsublite.Offset.of(50),
143-
Partition.of(1), com.google.cloud.pubsublite.Offset.of(50)))
144-
.build();
145-
}
146-
147-
@Override
148-
public void close() {}
149-
};
150-
try {
151-
return headOffsetReader.getHeadOffset(topicPath);
152-
} catch (CheckedApiException e) {
153-
throw new IllegalStateException("Unable to get head offset for topic " + topicPath, e);
154-
}
155-
}
156-
157-
private static PartitionSubscriberFactory getSubscriberFactory(
158-
PslCredentialsProvider credentialsProvider, SubscriptionPath subscriptionPath) {
159-
return (partition, consumer) -> {
160-
PubsubContext context = PubsubContext.of(Constants.FRAMEWORK);
161-
SubscriberServiceSettings.Builder settingsBuilder =
162-
SubscriberServiceSettings.newBuilder().setCredentialsProvider(credentialsProvider);
163-
ServiceClients.addDefaultMetadata(
164-
context, RoutingMetadata.of(subscriptionPath, partition), settingsBuilder);
165-
try {
166-
SubscriberServiceClient serviceClient =
167-
SubscriberServiceClient.create(
168-
addDefaultSettings(subscriptionPath.location().region(), settingsBuilder));
169-
return SubscriberBuilder.newBuilder()
170-
.setSubscriptionPath(subscriptionPath)
171-
.setPartition(partition)
172-
.setContext(context)
173-
.setServiceClient(serviceClient)
174-
.setMessageConsumer(consumer)
175-
.build();
176-
} catch (IOException e) {
177-
throw new IllegalStateException("Failed to create subscriber service.", e);
178-
}
179-
};
180-
}
181103
}

0 commit comments

Comments
 (0)