Skip to content

Commit 305f71e

Browse files
feat: allow java client to handle schema change during same stream name (#1964)
* feat: Split writer into connection worker and wrapper, this is a prerequisite for multiplexing client * feat: add connection worker pool skeleton, used for multiplexing client * feat: add Load api for connection worker for multiplexing client * feat: add multiplexing support to connection worker. We will treat every new stream name as a switch of destinationt * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: port the multiplexing client core algorithm and basic tests also fixed a tiny bug inside fake bigquery write impl for getting thre response from offset * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: wire multiplexing connection pool to stream writer * feat: some fixes for multiplexing client * feat: fix some todos, and reject the mixed behavior of passed in client or not * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: fix the bug that we may peek into the write_stream field but it's possible the proto schema does not contain this field * feat: add getInflightWaitSeconds implementation * feat: Add schema comparision in connection loop to ensure schema update for the same stream name can be notified * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * feat: add schema update support to multiplexing * fix: fix windows build bug: windows Instant resolution is different with linux * fix: fix another failing tests for windows build * fix: fix another test failure for Windows build * feat: Change new thread for each retry to be a thread pool to avoid create/tear down too much threads if lots of retries happens * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: add back the background executor provider that's accidentally removed * feat: throw error when use connection pool for explicit stream * fix: Add precision truncation to the passed in value from JSON float and double type. * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * modify the bom version * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix deadlockissue in ConnectionWorkerPool * fix: fix deadlock issue during close + append for multiplexing * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * fix: fix one potential root cause of deadlock issue for non-multiplexing case * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * Add timeout to inflight queue waiting, and also add some extra log * feat: allow java client lib handle switch table schema for the same stream name * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent 3159b12 commit 305f71e

File tree

4 files changed

+38
-31
lines changed

4 files changed

+38
-31
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,20 +49,20 @@ If you are using Maven without BOM, add this to your dependencies:
4949
If you are using Gradle 5.x or later, add this to your dependencies:
5050

5151
```Groovy
52-
implementation platform('com.google.cloud:libraries-bom:26.4.0')
52+
implementation platform('com.google.cloud:libraries-bom:26.5.0')
5353
5454
implementation 'com.google.cloud:google-cloud-bigquerystorage'
5555
```
5656
If you are using Gradle without BOM, add this to your dependencies:
5757

5858
```Groovy
59-
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.3'
59+
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.28.4'
6060
```
6161

6262
If you are using SBT, add this to your dependencies:
6363

6464
```Scala
65-
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.3"
65+
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.28.4"
6666
```
6767

6868
## Authentication

google-cloud-bigquerystorage/clirr-ignored-differences.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@
115115
<difference>
116116
<differenceType>7009</differenceType>
117117
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool</className>
118-
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteClient, boolean)</method>
118+
<method>ConnectionWorkerPool(long, long, java.time.Duration, com.google.api.gax.batching.FlowController$LimitExceededBehavior, java.lang.String, com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings)</method>
119119
</difference>
120120
<difference>
121121
<differenceType>7009</differenceType>

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ public ConnectionWorker(
222222
Status.fromCode(Code.INVALID_ARGUMENT)
223223
.withDescription("Writer schema must be provided when building this writer."));
224224
}
225-
this.writerSchema = writerSchema;
226225
this.maxInflightRequests = maxInflightRequests;
227226
this.maxInflightBytes = maxInflightBytes;
228227
this.limitExceededBehavior = limitExceededBehavior;
@@ -432,7 +431,7 @@ private void appendLoop() {
432431

433432
// Indicate whether we are at the first request after switching destination.
434433
// True means the schema and other metadata are needed.
435-
boolean firstRequestForDestinationSwitch = true;
434+
boolean firstRequestForTableOrSchemaSwitch = true;
436435
// Represent whether we have entered multiplexing.
437436
boolean isMultiplexing = false;
438437

@@ -483,25 +482,35 @@ private void appendLoop() {
483482
resetConnection();
484483
// Set firstRequestInConnection to indicate the next request to be sent should include
485484
// metedata. Reset everytime after reconnection.
486-
firstRequestForDestinationSwitch = true;
485+
firstRequestForTableOrSchemaSwitch = true;
487486
}
488487
while (!localQueue.isEmpty()) {
489488
AppendRowsRequest originalRequest = localQueue.pollFirst().message;
490489
AppendRowsRequest.Builder originalRequestBuilder = originalRequest.toBuilder();
491-
492-
// Consider we enter multiplexing if we met a different non empty stream name.
493-
if (!originalRequest.getWriteStream().isEmpty()
494-
&& !streamName.isEmpty()
495-
&& !originalRequest.getWriteStream().equals(streamName)) {
490+
// Always respect the first writer schema seen by the loop.
491+
if (writerSchema == null) {
492+
writerSchema = originalRequest.getProtoRows().getWriterSchema();
493+
}
494+
// Consider we enter multiplexing if we met a different non empty stream name or we meet
495+
// a new schema for the same stream name.
496+
// For the schema comparision we don't use message differencer to speed up the comparing
497+
// process. `equals(...)` can bring us false positive, e.g. two repeated field can be
498+
// considered the same but is not considered equals(). However as long as it's never provide
499+
// false negative we will always correctly pass writer schema to backend.
500+
if ((!originalRequest.getWriteStream().isEmpty()
501+
&& !streamName.isEmpty()
502+
&& !originalRequest.getWriteStream().equals(streamName))
503+
|| (originalRequest.getProtoRows().hasWriterSchema()
504+
&& !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema))) {
496505
streamName = originalRequest.getWriteStream();
506+
writerSchema = originalRequest.getProtoRows().getWriterSchema();
497507
isMultiplexing = true;
498-
firstRequestForDestinationSwitch = true;
508+
firstRequestForTableOrSchemaSwitch = true;
499509
}
500510

501-
if (firstRequestForDestinationSwitch) {
511+
if (firstRequestForTableOrSchemaSwitch) {
502512
// If we are at the first request for every table switch, including the first request in
503513
// the connection, we will attach both stream name and table schema to the request.
504-
// We don't support change of schema change during multiplexing for the saeme stream name.
505514
destinationSet.add(streamName);
506515
if (this.traceId != null) {
507516
originalRequestBuilder.setTraceId(this.traceId);
@@ -511,17 +520,11 @@ private void appendLoop() {
511520
originalRequestBuilder.clearWriteStream();
512521
}
513522

514-
// We don't use message differencer to speed up the comparing process.
515-
// `equals(...)` can bring us false positive, e.g. two repeated field can be considered the
516-
// same but is not considered equals(). However as long as it's never provide false negative
517-
// we will always correctly pass writer schema to backend.
518-
if (firstRequestForDestinationSwitch
519-
|| !originalRequest.getProtoRows().getWriterSchema().equals(writerSchema)) {
520-
writerSchema = originalRequest.getProtoRows().getWriterSchema();
521-
} else {
523+
// During non table/schema switch requests, clear writer schema.
524+
if (!firstRequestForTableOrSchemaSwitch) {
522525
originalRequestBuilder.getProtoRowsBuilder().clearWriterSchema();
523526
}
524-
firstRequestForDestinationSwitch = false;
527+
firstRequestForTableOrSchemaSwitch = false;
525528

526529
// Send should only throw an exception if there is a problem with the request. The catch
527530
// block will handle this case, and return the exception with the result.

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerTest.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -247,10 +247,10 @@ public void testAppendInSameStream_switchSchema() throws Exception {
247247
// We will get the request as the pattern of:
248248
// (writer_stream: t1, schema: schema1)
249249
// (writer_stream: _, schema: _)
250-
// (writer_stream: _, schema: schema3)
251-
// (writer_stream: _, schema: _)
252-
// (writer_stream: _, schema: schema1)
253-
// (writer_stream: _, schema: _)
250+
// (writer_stream: t1, schema: schema3)
251+
// (writer_stream: t1, schema: _)
252+
// (writer_stream: t1, schema: schema1)
253+
// (writer_stream: t1, schema: _)
254254
switch (i % 4) {
255255
case 0:
256256
if (i == 0) {
@@ -261,19 +261,23 @@ public void testAppendInSameStream_switchSchema() throws Exception {
261261
.isEqualTo("foo");
262262
break;
263263
case 1:
264-
assertThat(serverRequest.getWriteStream()).isEmpty();
264+
if (i == 1) {
265+
assertThat(serverRequest.getWriteStream()).isEmpty();
266+
} else {
267+
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
268+
}
265269
// Schema is empty if not at the first request after table switch.
266270
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
267271
break;
268272
case 2:
269-
assertThat(serverRequest.getWriteStream()).isEmpty();
273+
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
270274
// Schema is populated after table switch.
271275
assertThat(
272276
serverRequest.getProtoRows().getWriterSchema().getProtoDescriptor().getName())
273277
.isEqualTo("bar");
274278
break;
275279
case 3:
276-
assertThat(serverRequest.getWriteStream()).isEmpty();
280+
assertThat(serverRequest.getWriteStream()).isEqualTo(TEST_STREAM_1);
277281
// Schema is empty if not at the first request after table switch.
278282
assertThat(serverRequest.getProtoRows().hasWriterSchema()).isFalse();
279283
break;

0 commit comments

Comments
 (0)