Skip to content

Commit 2c7f088

Browse files
authored
fix: update grpc based ReadObject rpcs to remove race condition between cancellation and message handling (#2708)
Update GapicUnbufferedReadableByteChannel to manage the grpc stream itself rather than using the stream iterator provided by gax. This allows us to ensure the cancellation is observed and our draining performs before returning from close(). As a side effect of not using the gax stream iterator, we now must handle stream restarts ourselves. GrpcStorageOptions.ReadObjectResumptionStrategy has been removed entirely, while RetryingDependencies and ResultRetryAlgorithm are now plumbed all the way down to the GapicUnbufferedReadableByteChannel.
1 parent 260e8ea commit 2c7f088

9 files changed

+327
-188
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java

+26-7
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020

2121
import com.google.api.core.ApiFutures;
2222
import com.google.api.core.SettableApiFuture;
23+
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2324
import com.google.api.gax.rpc.ServerStreamingCallable;
2425
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
26+
import com.google.cloud.storage.Retrying.RetryingDependencies;
2527
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
2628
import com.google.common.util.concurrent.MoreExecutors;
2729
import com.google.storage.v2.Object;
@@ -43,27 +45,32 @@ public static GapicDownloadSessionBuilder create() {
4345
return INSTANCE;
4446
}
4547

46-
/**
47-
* Any retry capability must be defined within the provided ServerStreamingCallable. The
48-
* ultimately produced channel will not do any retries of its own.
49-
*/
5048
public ReadableByteChannelSessionBuilder byteChannel(
5149
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
50+
RetryingDependencies retryingDependencies,
51+
ResultRetryAlgorithm<?> resultRetryAlgorithm,
5252
ResponseContentLifecycleManager responseContentLifecycleManager) {
53-
return new ReadableByteChannelSessionBuilder(read, responseContentLifecycleManager);
53+
return new ReadableByteChannelSessionBuilder(
54+
read, retryingDependencies, resultRetryAlgorithm, responseContentLifecycleManager);
5455
}
5556

5657
public static final class ReadableByteChannelSessionBuilder {
5758

5859
private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
60+
private final RetryingDependencies retryingDependencies;
61+
private final ResultRetryAlgorithm<?> resultRetryAlgorithm;
5962
private final ResponseContentLifecycleManager responseContentLifecycleManager;
6063
private boolean autoGzipDecompression;
6164
private Hasher hasher;
6265

6366
private ReadableByteChannelSessionBuilder(
6467
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
68+
RetryingDependencies retryingDependencies,
69+
ResultRetryAlgorithm<?> resultRetryAlgorithm,
6570
ResponseContentLifecycleManager responseContentLifecycleManager) {
6671
this.read = read;
72+
this.retryingDependencies = retryingDependencies;
73+
this.resultRetryAlgorithm = resultRetryAlgorithm;
6774
this.responseContentLifecycleManager = responseContentLifecycleManager;
6875
this.hasher = Hasher.noop();
6976
this.autoGzipDecompression = false;
@@ -105,12 +112,24 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() {
105112
if (autoGzipDecompression) {
106113
return new GzipReadableByteChannel(
107114
new GapicUnbufferedReadableByteChannel(
108-
resultFuture, read, object, hasher, responseContentLifecycleManager),
115+
resultFuture,
116+
read,
117+
object,
118+
hasher,
119+
retryingDependencies,
120+
resultRetryAlgorithm,
121+
responseContentLifecycleManager),
109122
ApiFutures.transform(
110123
resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor()));
111124
} else {
112125
return new GapicUnbufferedReadableByteChannel(
113-
resultFuture, read, object, hasher, responseContentLifecycleManager);
126+
resultFuture,
127+
read,
128+
object,
129+
hasher,
130+
retryingDependencies,
131+
resultRetryAlgorithm,
132+
responseContentLifecycleManager);
114133
}
115134
};
116135
}

0 commit comments

Comments
 (0)