Skip to content

Commit 9ead606

Browse files
authored
netty: Reduce race window size between GOAWAY and new streams
The race between new streams and transport shutdown is #2562, but it is still far from being generally solved. This reduces the race window of new streams from (transport selection → stream created on network thread) to (transport selection → stream enqueued on network thread). Since only a single thread now needs to do work in the stream creation race window, the window should be dramatically smaller. This only reduces GOAWAY races when the server performs a graceful shutdown (using two GOAWAYs), as that is the only non-racy way on-the-wire to shutdown a connection in HTTP/2.
1 parent 4974b51 commit 9ead606

File tree

5 files changed

+55
-3
lines changed

5 files changed

+55
-3
lines changed

netty/src/main/java/io/grpc/netty/ClientTransportLifecycleManager.java

+13-2
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,25 @@ public void notifyReady() {
4343
listener.transportReady();
4444
}
4545

46-
public void notifyShutdown(Status s) {
46+
/**
47+
* Marks transport as shutdown, but does not set the error status. This must eventually be
48+
* followed by a call to notifyShutdown.
49+
*/
50+
public void notifyGracefulShutdown(Status s) {
4751
if (transportShutdown) {
4852
return;
4953
}
5054
transportShutdown = true;
55+
listener.transportShutdown(s);
56+
}
57+
58+
public void notifyShutdown(Status s) {
59+
notifyGracefulShutdown(s);
60+
if (shutdownStatus != null) {
61+
return;
62+
}
5163
shutdownStatus = s;
5264
shutdownThrowable = s.asException();
53-
listener.transportShutdown(s);
5465
}
5566

5667
public void notifyInUse(boolean inUse) {

netty/src/main/java/io/grpc/netty/NettyClientHandler.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -755,10 +755,21 @@ public boolean visit(Http2Stream stream) throws Http2Exception {
755755

756756
/**
757757
* Handler for a GOAWAY being received. Fails any streams created after the
758-
* last known stream.
758+
* last known stream. May only be called during a read.
759759
*/
760760
private void goingAway(Status status) {
761+
lifecycleManager.notifyGracefulShutdown(status);
762+
// Try to allocate as many in-flight streams as possible, to reduce race window of
763+
// https://github.com/grpc/grpc-java/issues/2562 . To be of any help, the server has to
764+
// gracefully shut down the connection with two GOAWAYs. gRPC servers generally send a PING
765+
// after the first GOAWAY, so they can very precisely detect when the GOAWAY has been
766+
// processed and thus this processing must be in-line before processing additional reads.
767+
768+
// This can cause reentrancy, but should be minor since it is normal to handle writes in
769+
// response to a read. Also, the call stack is rather shallow at this point
770+
clientWriteQueue.drainNow();
761771
lifecycleManager.notifyShutdown(status);
772+
762773
final Status goAwayStatus = lifecycleManager.getShutdownStatus();
763774
final int lastKnownStream = connection().local().lastStreamKnownByPeer();
764775
try {

netty/src/main/java/io/grpc/netty/WriteQueue.java

+13
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,19 @@ void enqueue(Runnable runnable, boolean flush) {
101101
}
102102
}
103103

104+
/**
105+
* Executes enqueued work directly on the current thread. This can be used to trigger writes
106+
* before performing additional reads. Must be called from the event loop. This method makes no
107+
* guarantee that the work queue is empty when it returns.
108+
*/
109+
void drainNow() {
110+
Preconditions.checkState(channel.eventLoop().inEventLoop(), "must be on the event loop");
111+
if (queue.peek() == null) {
112+
return;
113+
}
114+
flush();
115+
}
116+
104117
/**
105118
* Process the queue of commands and dispatch them to the stream. This method is only
106119
* called in the event loop

netty/src/test/java/io/grpc/netty/NettyClientHandlerTest.java

+13
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,19 @@ public void receivedGoAwayShouldNotAffectEarlyStreamId() throws Exception {
361361
assertTrue(future.isDone());
362362
}
363363

364+
@Test
365+
public void receivedGoAwayShouldNotAffectRacingQueuedStreamId() throws Exception {
366+
// This command has not actually been executed yet
367+
ChannelFuture future = writeQueue().enqueue(
368+
newCreateStreamCommand(grpcHeaders, streamTransportState), true);
369+
channelRead(goAwayFrame(streamId));
370+
verify(streamListener, never())
371+
.closed(any(Status.class), any(Metadata.class));
372+
verify(streamListener, never())
373+
.closed(any(Status.class), any(RpcProgress.class), any(Metadata.class));
374+
assertTrue(future.isDone());
375+
}
376+
364377
@Test
365378
public void receivedResetWithRefuseCode() throws Exception {
366379
ChannelFuture future = enqueue(newCreateStreamCommand(grpcHeaders, streamTransportState));

netty/src/test/java/io/grpc/netty/NettyHandlerTestBase.java

+4
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,10 @@ public int compareTo(Delayed o) {
203203
}
204204
}
205205

206+
protected final WriteQueue writeQueue() {
207+
return writeQueue;
208+
}
209+
206210
protected final T handler() {
207211
return handler;
208212
}

0 commit comments

Comments
 (0)