Skip to content

Commit 4616adb

Browse files
authored
feat: Add ignoreUnknownField support in JsonWriter (#1455)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/java-bigquerystorage/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> ☕️
1 parent 973afcc commit 4616adb

File tree

5 files changed

+117
-17
lines changed

5 files changed

+117
-17
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public class JsonStreamWriter implements AutoCloseable {
5252
private StreamWriter.Builder streamWriterBuilder;
5353
private Descriptor descriptor;
5454
private TableSchema tableSchema;
55+
private boolean ignoreUnknownFields = false;
5556

5657
/**
5758
* Constructs the JsonStreamWriter
@@ -80,6 +81,7 @@ private JsonStreamWriter(Builder builder)
8081
this.streamWriter = streamWriterBuilder.build();
8182
this.streamName = builder.streamName;
8283
this.tableSchema = builder.tableSchema;
84+
this.ignoreUnknownFields = builder.ignoreUnknownFields;
8385
}
8486

8587
/**
@@ -135,7 +137,8 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
135137
for (int i = 0; i < jsonArr.length(); i++) {
136138
JSONObject json = jsonArr.getJSONObject(i);
137139
Message protoMessage =
138-
JsonToProtoMessage.convertJsonToProtoMessage(this.descriptor, this.tableSchema, json);
140+
JsonToProtoMessage.convertJsonToProtoMessage(
141+
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
139142
rowsBuilder.addSerializedRows(protoMessage.toByteString());
140143
}
141144
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
@@ -249,6 +252,7 @@ public static final class Builder {
249252
private String endpoint;
250253
private boolean createDefaultStream = false;
251254
private String traceId;
255+
private boolean ignoreUnknownFields = false;
252256

253257
private static String streamPatternString =
254258
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
@@ -350,6 +354,18 @@ public Builder setTraceId(String traceId) {
350354
return this;
351355
}
352356

357+
/**
358+
* Setter for a ignoreUnkownFields, if true, unknown Json fields to BigQuery will be ignored
359+
* instead of error out.
360+
*
361+
* @param ignoreUnknownFields
362+
* @return Builder
363+
*/
364+
public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) {
365+
this.ignoreUnknownFields = ignoreUnknownFields;
366+
return this;
367+
}
368+
353369
/**
354370
* Builds JsonStreamWriter
355371
*

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java

Lines changed: 51 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ public static DynamicMessage convertJsonToProtoMessage(Descriptor protoSchema, J
6464
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
6565
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");
6666

67-
return convertJsonToProtoMessageImpl(protoSchema, null, json, "root", /*topLevel=*/ true);
67+
return convertJsonToProtoMessageImpl(
68+
protoSchema, null, json, "root", /*topLevel=*/ true, false);
6869
}
6970

7071
/**
@@ -85,7 +86,39 @@ public static DynamicMessage convertJsonToProtoMessage(
8586
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");
8687

8788
return convertJsonToProtoMessageImpl(
88-
protoSchema, tableSchema.getFieldsList(), json, "root", /*topLevel=*/ true);
89+
protoSchema,
90+
tableSchema.getFieldsList(),
91+
json,
92+
"root",
93+
/*topLevel=*/ true,
94+
/*ignoreUnknownFields*/ false);
95+
}
96+
97+
/**
98+
* Converts Json data to protocol buffer messages given the protocol buffer descriptor.
99+
*
100+
* @param protoSchema
101+
* @param tableSchema bigquery table schema is needed for type conversion of DATETIME, TIME,
102+
* NUMERIC, BIGNUMERIC
103+
* @param json
104+
* @param ignoreUnknownFields allows unknown fields in JSON input to be ignored.
105+
* @throws IllegalArgumentException when JSON data is not compatible with proto descriptor.
106+
*/
107+
public static DynamicMessage convertJsonToProtoMessage(
108+
Descriptor protoSchema, TableSchema tableSchema, JSONObject json, boolean ignoreUnknownFields)
109+
throws IllegalArgumentException {
110+
Preconditions.checkNotNull(json, "JSONObject is null.");
111+
Preconditions.checkNotNull(protoSchema, "Protobuf descriptor is null.");
112+
Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
113+
Preconditions.checkState(json.length() != 0, "JSONObject is empty.");
114+
115+
return convertJsonToProtoMessageImpl(
116+
protoSchema,
117+
tableSchema.getFieldsList(),
118+
json,
119+
"root",
120+
/*topLevel=*/ true,
121+
ignoreUnknownFields);
89122
}
90123

91124
/**
@@ -102,7 +135,8 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
102135
List<TableFieldSchema> tableSchema,
103136
JSONObject json,
104137
String jsonScope,
105-
boolean topLevel)
138+
boolean topLevel,
139+
boolean ignoreUnknownFields)
106140
throws IllegalArgumentException {
107141

108142
DynamicMessage.Builder protoMsg = DynamicMessage.newBuilder(protoSchema);
@@ -117,9 +151,11 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
117151
String jsonLowercaseName = jsonName.toLowerCase();
118152
String currentScope = jsonScope + "." + jsonName;
119153
FieldDescriptor field = protoSchema.findFieldByName(jsonLowercaseName);
120-
if (field == null) {
154+
if (field == null && !ignoreUnknownFields) {
121155
throw new IllegalArgumentException(
122156
String.format("JSONObject has fields unknown to BigQuery: %s.", currentScope));
157+
} else if (field == null) {
158+
continue;
123159
}
124160
TableFieldSchema fieldSchema = null;
125161
if (tableSchema != null) {
@@ -137,9 +173,10 @@ private static DynamicMessage convertJsonToProtoMessageImpl(
137173
}
138174
}
139175
if (!field.isRepeated()) {
140-
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
176+
fillField(protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
141177
} else {
142-
fillRepeatedField(protoMsg, field, fieldSchema, json, jsonName, currentScope);
178+
fillRepeatedField(
179+
protoMsg, field, fieldSchema, json, jsonName, currentScope, ignoreUnknownFields);
143180
}
144181
}
145182

@@ -174,7 +211,8 @@ private static void fillField(
174211
TableFieldSchema fieldSchema,
175212
JSONObject json,
176213
String exactJsonKeyName,
177-
String currentScope)
214+
String currentScope,
215+
boolean ignoreUnknownFields)
178216
throws IllegalArgumentException {
179217

180218
java.lang.Object val = json.get(exactJsonKeyName);
@@ -303,7 +341,8 @@ private static void fillField(
303341
fieldSchema == null ? null : fieldSchema.getFieldsList(),
304342
json.getJSONObject(exactJsonKeyName),
305343
currentScope,
306-
/*topLevel =*/ false));
344+
/*topLevel =*/ false,
345+
ignoreUnknownFields));
307346
return;
308347
}
309348
break;
@@ -331,7 +370,8 @@ private static void fillRepeatedField(
331370
TableFieldSchema fieldSchema,
332371
JSONObject json,
333372
String exactJsonKeyName,
334-
String currentScope)
373+
String currentScope,
374+
boolean ignoreUnknownFields)
335375
throws IllegalArgumentException {
336376

337377
JSONArray jsonArray;
@@ -478,7 +518,8 @@ private static void fillRepeatedField(
478518
fieldSchema == null ? null : fieldSchema.getFieldsList(),
479519
jsonArray.getJSONObject(i),
480520
currentScope,
481-
/*topLevel =*/ false));
521+
/*topLevel =*/ false,
522+
ignoreUnknownFields));
482523
} else {
483524
fail = true;
484525
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -519,11 +519,7 @@ private void cleanupInflightRequests() {
519519
} finally {
520520
this.lock.unlock();
521521
}
522-
log.fine(
523-
"Cleaning "
524-
+ localQueue.size()
525-
+ " inflight requests with error: "
526-
+ finalStatus.toString());
522+
log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus);
527523
while (!localQueue.isEmpty()) {
528524
localQueue.pollFirst().appendResult.setException(finalStatus);
529525
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -496,4 +496,48 @@ public void testSimpleSchemaUpdate() throws Exception {
496496
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
497497
}
498498
}
499+
500+
@Test
501+
public void testWithoutIgnoreUnknownFields() throws Exception {
502+
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
503+
try (JsonStreamWriter writer =
504+
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).build()) {
505+
JSONObject foo = new JSONObject();
506+
foo.put("test_int", 10);
507+
JSONObject bar = new JSONObject();
508+
bar.put("test_unknown", 10);
509+
JSONArray jsonArr = new JSONArray();
510+
jsonArr.put(foo);
511+
jsonArr.put(bar);
512+
try {
513+
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
514+
Assert.fail("expected ExecutionException");
515+
} catch (Exception ex) {
516+
assertEquals(
517+
ex.getMessage(), "JSONObject has fields unknown to BigQuery: root.test_unknown.");
518+
}
519+
}
520+
}
521+
522+
@Test
523+
public void testWithIgnoreUnknownFields() throws Exception {
524+
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_INT).build();
525+
try (JsonStreamWriter writer =
526+
JsonStreamWriter.newBuilder(TEST_STREAM, tableSchema)
527+
.setChannelProvider(channelProvider)
528+
.setIgnoreUnknownFields(true)
529+
.setCredentialsProvider(NoCredentialsProvider.create())
530+
.build()) {
531+
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
532+
JSONObject foo = new JSONObject();
533+
foo.put("test_int", 10);
534+
JSONObject bar = new JSONObject();
535+
bar.put("test_unknown", 10);
536+
JSONArray jsonArr = new JSONArray();
537+
jsonArr.put(foo);
538+
jsonArr.put(bar);
539+
ApiFuture<AppendRowsResponse> appendFuture = writer.append(jsonArr);
540+
appendFuture.get();
541+
}
542+
}
499543
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -352,7 +352,9 @@ public void testJsonStreamWriterWithDefaultStream()
352352
bigquery.create(tableInfo);
353353
TableName parent = TableName.of(ServiceOptions.getDefaultProjectId(), DATASET, tableName);
354354
try (JsonStreamWriter jsonStreamWriter =
355-
JsonStreamWriter.newBuilder(parent.toString(), tableSchema).build()) {
355+
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
356+
.setIgnoreUnknownFields(true)
357+
.build()) {
356358
LOG.info("Sending one message");
357359
JSONObject row1 = new JSONObject();
358360
row1.put("test_str", "aaa");
@@ -365,6 +367,7 @@ public void testJsonStreamWriterWithDefaultStream()
365367
BigDecimalByteStringEncoder.encodeToNumericByteString(new BigDecimal("-9000000"))
366368
.toByteArray()
367369
}));
370+
row1.put("unknown_field", "a");
368371
row1.put(
369372
"test_datetime",
370373
CivilTimeEncoder.encodePacked64DatetimeMicros(LocalDateTime.of(2020, 10, 1, 12, 0)));

0 commit comments

Comments
 (0)