Skip to content

Commit b587638

Browse files
authored
fix: a possible race condition that we used table schema out of the lock. (#1575)
Also clean up the code to reconnect after 10MB.
1 parent 6412fb2 commit b587638

File tree

1 file changed

+13
-33
lines changed

1 file changed

+13
-33
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriter.java

Lines changed: 13 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -134,41 +134,21 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
134134
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
135135
this.streamWriter = streamWriterBuilder.setWriterSchema(this.protoSchema).build();
136136
}
137-
}
138137

139-
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
140-
// Any error in convertJsonToProtoMessage will throw an
141-
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
142-
// of JSON data.
143-
long currentRequestSize = 0;
144-
for (int i = 0; i < jsonArr.length(); i++) {
145-
JSONObject json = jsonArr.getJSONObject(i);
146-
Message protoMessage =
147-
JsonToProtoMessage.convertJsonToProtoMessage(
148-
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
149-
rowsBuilder.addSerializedRows(protoMessage.toByteString());
150-
currentRequestSize += protoMessage.getSerializedSize();
151-
}
152-
// Need to make sure refreshAppendAndSetDescriptor finish first before this can run
153-
synchronized (this) {
154-
this.totalMessageSize += currentRequestSize;
155-
this.absTotal += currentRequestSize;
156-
// Reconnect on every 9.5MB.
157-
if (this.totalMessageSize > 9500000 && this.reconnectAfter10M) {
158-
streamWriter.close();
159-
// Create a new underlying StreamWriter aka establish a new connection.
160-
this.streamWriter = streamWriterBuilder.setWriterSchema(protoSchema).build();
161-
this.totalMessageSize = this.protoSchema.getSerializedSize() + currentRequestSize;
162-
this.absTotal += currentRequestSize;
163-
// Allow first request to pass.
138+
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
139+
// Any error in convertJsonToProtoMessage will throw an
140+
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt
141+
// processing
142+
// of JSON data.
143+
long currentRequestSize = 0;
144+
for (int i = 0; i < jsonArr.length(); i++) {
145+
JSONObject json = jsonArr.getJSONObject(i);
146+
Message protoMessage =
147+
JsonToProtoMessage.convertJsonToProtoMessage(
148+
this.descriptor, this.tableSchema, json, ignoreUnknownFields);
149+
rowsBuilder.addSerializedRows(protoMessage.toByteString());
150+
currentRequestSize += protoMessage.getSerializedSize();
164151
}
165-
LOG.fine(
166-
"Sending a total of:"
167-
+ this.totalMessageSize
168-
+ " "
169-
+ currentRequestSize
170-
+ " "
171-
+ this.absTotal);
172152
final ApiFuture<AppendRowsResponse> appendResponseFuture =
173153
this.streamWriter.append(rowsBuilder.build(), offset);
174154
return appendResponseFuture;

0 commit comments

Comments
 (0)