Skip to content

Commit c540229

Browse files
authored
core: allow per-service/method executor (#8266)
Add ServerCallExecutorSupplier interface in serverBuilder to allow defining which executor to to handle the server call. Split StreamCreated() contextRunnable into two to support this new feature: one for method lookup, the other for server call handling. methodLookup() runs on default executor, handleServerCall() may run on the executorSupplier executor. callbacks are queued after methodLookup() and handleServerCall() on serializing executor to ensure stream listener is set when callbacks starts running. Make executor settable in serializing executor to allow switching executor for the server call handling runnable as a result of the outcome of the method lookup runnable.
1 parent 9a8bc10 commit c540229

9 files changed

+378
-43
lines changed

api/src/main/java/io/grpc/ForwardingServerBuilder.java

+6
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,12 @@ public T executor(@Nullable Executor executor) {
6161
return thisT();
6262
}
6363

64+
@Override
65+
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
66+
delegate().callExecutor(executorSupplier);
67+
return thisT();
68+
}
69+
6470
@Override
6571
public T addService(ServerServiceDefinition service) {
6672
delegate().addService(service);

api/src/main/java/io/grpc/ServerBuilder.java

+24
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,30 @@ public static ServerBuilder<?> forPort(int port) {
7474
*/
7575
public abstract T executor(@Nullable Executor executor);
7676

77+
78+
/**
79+
* Allows for defining a way to provide a custom executor to handle the server call.
80+
* This executor is the result of calling
81+
* {@link ServerCallExecutorSupplier#getExecutor(ServerCall, Metadata)} per RPC.
82+
*
83+
* <p>It's an optional parameter. If it is provided, the {@link #executor(Executor)} would still
84+
* run necessary tasks before the {@link ServerCallExecutorSupplier} is ready to be called, then
85+
* it switches over.
86+
*
87+
* <p>If it is provided, {@link #directExecutor()} optimization is disabled. But if calling
88+
* {@link ServerCallExecutorSupplier} returns null, the server call is still handled by the
89+
* default {@link #executor(Executor)} as a fallback.
90+
*
91+
* @param executorSupplier the server call executor provider
92+
* @return this
93+
* @since 1.39.0
94+
*
95+
* */
96+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
97+
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
98+
return thisT();
99+
}
100+
77101
/**
78102
* Adds a service implementation to the handler registry.
79103
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/*
2+
* Copyright 2021 The gRPC Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.grpc;
18+
19+
import java.util.concurrent.Executor;
20+
import javax.annotation.Nullable;
21+
22+
/**
23+
* Defines what executor handles the server call, based on each RPC call information at runtime.
24+
* */
25+
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8274")
26+
public interface ServerCallExecutorSupplier {
27+
28+
/**
29+
* Returns an executor to handle the server call.
30+
* It should never throw. It should return null to fallback to the default executor.
31+
* */
32+
@Nullable
33+
<ReqT, RespT> Executor getExecutor(ServerCall<ReqT, RespT> call, Metadata metadata);
34+
}

core/src/main/java/io/grpc/internal/AbstractServerImplBuilder.java

+7
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.grpc.HandlerRegistry;
2525
import io.grpc.Server;
2626
import io.grpc.ServerBuilder;
27+
import io.grpc.ServerCallExecutorSupplier;
2728
import io.grpc.ServerInterceptor;
2829
import io.grpc.ServerServiceDefinition;
2930
import io.grpc.ServerStreamTracer;
@@ -67,6 +68,12 @@ public T directExecutor() {
6768
return thisT();
6869
}
6970

71+
@Override
72+
public T callExecutor(ServerCallExecutorSupplier executorSupplier) {
73+
delegate().callExecutor(executorSupplier);
74+
return thisT();
75+
}
76+
7077
@Override
7178
public T executor(@Nullable Executor executor) {
7279
delegate().executor(executor);

core/src/main/java/io/grpc/internal/SerializingExecutor.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ private static AtomicHelper getAtomicHelper() {
5959
private static final int RUNNING = -1;
6060

6161
/** Underlying executor that all submitted Runnable objects are run on. */
62-
private final Executor executor;
62+
private Executor executor;
6363

6464
/** A list of Runnables to be run in order. */
6565
private final Queue<Runnable> runQueue = new ConcurrentLinkedQueue<>();
@@ -76,6 +76,15 @@ public SerializingExecutor(Executor executor) {
7676
this.executor = executor;
7777
}
7878

79+
/**
80+
* Only call this from this SerializingExecutor Runnable, so that the executor is immediately
81+
* visible to this SerializingExecutor executor.
82+
* */
83+
public void setExecutor(Executor executor) {
84+
Preconditions.checkNotNull(executor, "'executor' must not be null.");
85+
this.executor = executor;
86+
}
87+
7988
/**
8089
* Runs the given runnable strictly after all Runnables that were submitted
8190
* before it, and using the {@code executor} passed to the constructor. .
@@ -118,7 +127,8 @@ private void schedule(@Nullable Runnable removable) {
118127
public void run() {
119128
Runnable r;
120129
try {
121-
while ((r = runQueue.poll()) != null) {
130+
Executor oldExecutor = executor;
131+
while (oldExecutor == executor && (r = runQueue.poll()) != null ) {
122132
try {
123133
r.run();
124134
} catch (RuntimeException e) {

core/src/main/java/io/grpc/internal/ServerImpl.java

+114-41
Original file line numberDiff line numberDiff line change
@@ -46,12 +46,14 @@
4646
import io.grpc.InternalServerInterceptors;
4747
import io.grpc.Metadata;
4848
import io.grpc.ServerCall;
49+
import io.grpc.ServerCallExecutorSupplier;
4950
import io.grpc.ServerCallHandler;
5051
import io.grpc.ServerInterceptor;
5152
import io.grpc.ServerMethodDefinition;
5253
import io.grpc.ServerServiceDefinition;
5354
import io.grpc.ServerTransportFilter;
5455
import io.grpc.Status;
56+
import io.grpc.StatusException;
5557
import io.perfmark.Link;
5658
import io.perfmark.PerfMark;
5759
import io.perfmark.Tag;
@@ -125,6 +127,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
125127
private final InternalChannelz channelz;
126128
private final CallTracer serverCallTracer;
127129
private final Deadline.Ticker ticker;
130+
private final ServerCallExecutorSupplier executorSupplier;
128131

129132
/**
130133
* Construct a server.
@@ -159,6 +162,7 @@ public final class ServerImpl extends io.grpc.Server implements InternalInstrume
159162
this.serverCallTracer = builder.callTracerFactory.create();
160163
this.ticker = checkNotNull(builder.ticker, "ticker");
161164
channelz.addServer(this);
165+
this.executorSupplier = builder.executorSupplier;
162166
}
163167

164168
/**
@@ -469,11 +473,11 @@ private void streamCreatedInternal(
469473
final Executor wrappedExecutor;
470474
// This is a performance optimization that avoids the synchronization and queuing overhead
471475
// that comes with SerializingExecutor.
472-
if (executor == directExecutor()) {
476+
if (executorSupplier != null || executor != directExecutor()) {
477+
wrappedExecutor = new SerializingExecutor(executor);
478+
} else {
473479
wrappedExecutor = new SerializeReentrantCallsDirectExecutor();
474480
stream.optimizeForDirectExecutor();
475-
} else {
476-
wrappedExecutor = new SerializingExecutor(executor);
477481
}
478482

479483
if (headers.containsKey(MESSAGE_ENCODING_KEY)) {
@@ -499,52 +503,124 @@ private void streamCreatedInternal(
499503

500504
final JumpToApplicationThreadServerStreamListener jumpListener
501505
= new JumpToApplicationThreadServerStreamListener(
502-
wrappedExecutor, executor, stream, context, tag);
506+
wrappedExecutor, executor, stream, context, tag);
503507
stream.setListener(jumpListener);
504-
// Run in wrappedExecutor so jumpListener.setListener() is called before any callbacks
505-
// are delivered, including any errors. Callbacks can still be triggered, but they will be
506-
// queued.
507-
508-
final class StreamCreated extends ContextRunnable {
509-
StreamCreated() {
508+
final SettableFuture<ServerCallParameters<?,?>> future = SettableFuture.create();
509+
// Run in serializing executor so jumpListener.setListener() is called before any callbacks
510+
// are delivered, including any errors. MethodLookup() and HandleServerCall() are proactively
511+
// queued before any callbacks are queued at serializing executor.
512+
// MethodLookup() runs on the default executor.
513+
// When executorSupplier is enabled, MethodLookup() may set/change the executor in the
514+
// SerializingExecutor before it finishes running.
515+
// Then HandleServerCall() and callbacks would switch to the executorSupplier executor.
516+
// Otherwise, they all run on the default executor.
517+
518+
final class MethodLookup extends ContextRunnable {
519+
MethodLookup() {
510520
super(context);
511521
}
512522

513523
@Override
514524
public void runInContext() {
515-
PerfMark.startTask("ServerTransportListener$StreamCreated.startCall", tag);
525+
PerfMark.startTask("ServerTransportListener$MethodLookup.startCall", tag);
516526
PerfMark.linkIn(link);
517527
try {
518528
runInternal();
519529
} finally {
520-
PerfMark.stopTask("ServerTransportListener$StreamCreated.startCall", tag);
530+
PerfMark.stopTask("ServerTransportListener$MethodLookup.startCall", tag);
521531
}
522532
}
523533

524534
private void runInternal() {
525-
ServerStreamListener listener = NOOP_LISTENER;
535+
ServerMethodDefinition<?, ?> wrapMethod;
536+
ServerCallParameters<?, ?> callParams;
526537
try {
527538
ServerMethodDefinition<?, ?> method = registry.lookupMethod(methodName);
528539
if (method == null) {
529540
method = fallbackRegistry.lookupMethod(methodName, stream.getAuthority());
530541
}
531542
if (method == null) {
532543
Status status = Status.UNIMPLEMENTED.withDescription(
533-
"Method not found: " + methodName);
544+
"Method not found: " + methodName);
534545
// TODO(zhangkun83): this error may be recorded by the tracer, and if it's kept in
535546
// memory as a map whose key is the method name, this would allow a misbehaving
536547
// client to blow up the server in-memory stats storage by sending large number of
537548
// distinct unimplemented method
538549
// names. (https://github.com/grpc/grpc-java/issues/2285)
539550
stream.close(status, new Metadata());
540551
context.cancel(null);
552+
future.cancel(false);
541553
return;
542554
}
543-
listener = startCall(stream, methodName, method, headers, context, statsTraceCtx, tag);
555+
wrapMethod = wrapMethod(stream, method, statsTraceCtx);
556+
callParams = maySwitchExecutor(wrapMethod, stream, headers, context, tag);
557+
future.set(callParams);
544558
} catch (Throwable t) {
545559
stream.close(Status.fromThrowable(t), new Metadata());
546560
context.cancel(null);
561+
future.cancel(false);
547562
throw t;
563+
}
564+
}
565+
566+
private <ReqT, RespT> ServerCallParameters<ReqT, RespT> maySwitchExecutor(
567+
final ServerMethodDefinition<ReqT, RespT> methodDef,
568+
final ServerStream stream,
569+
final Metadata headers,
570+
final Context.CancellableContext context,
571+
final Tag tag) {
572+
final ServerCallImpl<ReqT, RespT> call = new ServerCallImpl<>(
573+
stream,
574+
methodDef.getMethodDescriptor(),
575+
headers,
576+
context,
577+
decompressorRegistry,
578+
compressorRegistry,
579+
serverCallTracer,
580+
tag);
581+
if (executorSupplier != null) {
582+
Executor switchingExecutor = executorSupplier.getExecutor(call, headers);
583+
if (switchingExecutor != null) {
584+
((SerializingExecutor)wrappedExecutor).setExecutor(switchingExecutor);
585+
}
586+
}
587+
return new ServerCallParameters<>(call, methodDef.getServerCallHandler());
588+
}
589+
}
590+
591+
final class HandleServerCall extends ContextRunnable {
592+
HandleServerCall() {
593+
super(context);
594+
}
595+
596+
@Override
597+
public void runInContext() {
598+
PerfMark.startTask("ServerTransportListener$HandleServerCall.startCall", tag);
599+
PerfMark.linkIn(link);
600+
try {
601+
runInternal();
602+
} finally {
603+
PerfMark.stopTask("ServerTransportListener$HandleServerCall.startCall", tag);
604+
}
605+
}
606+
607+
private void runInternal() {
608+
ServerStreamListener listener = NOOP_LISTENER;
609+
ServerCallParameters<?,?> callParameters;
610+
try {
611+
if (future.isCancelled()) {
612+
return;
613+
}
614+
if (!future.isDone() || (callParameters = future.get()) == null) {
615+
Status status = Status.INTERNAL.withDescription(
616+
"Unexpected failure retrieving server call parameters.");
617+
throw new StatusException(status);
618+
}
619+
listener = startWrappedCall(methodName, callParameters, headers);
620+
} catch (Throwable ex) {
621+
stream.close(Status.fromThrowable(ex), new Metadata());
622+
context.cancel(null);
623+
throw new IllegalStateException(ex);
548624
} finally {
549625
jumpListener.setListener(listener);
550626
}
@@ -568,7 +644,8 @@ public void cancelled(Context context) {
568644
}
569645
}
570646

571-
wrappedExecutor.execute(new StreamCreated());
647+
wrappedExecutor.execute(new MethodLookup());
648+
wrappedExecutor.execute(new HandleServerCall());
572649
}
573650

574651
private Context.CancellableContext createContext(
@@ -593,9 +670,8 @@ private Context.CancellableContext createContext(
593670
}
594671

595672
/** Never returns {@code null}. */
596-
private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String fullMethodName,
597-
ServerMethodDefinition<ReqT, RespT> methodDef, Metadata headers,
598-
Context.CancellableContext context, StatsTraceContext statsTraceCtx, Tag tag) {
673+
private <ReqT, RespT> ServerMethodDefinition<?,?> wrapMethod(ServerStream stream,
674+
ServerMethodDefinition<ReqT, RespT> methodDef, StatsTraceContext statsTraceCtx) {
599675
// TODO(ejona86): should we update fullMethodName to have the canonical path of the method?
600676
statsTraceCtx.serverCallStarted(
601677
new ServerCallInfoImpl<>(
@@ -609,34 +685,31 @@ private <ReqT, RespT> ServerStreamListener startCall(ServerStream stream, String
609685
ServerMethodDefinition<ReqT, RespT> interceptedDef = methodDef.withServerCallHandler(handler);
610686
ServerMethodDefinition<?, ?> wMethodDef = binlog == null
611687
? interceptedDef : binlog.wrapMethodDefinition(interceptedDef);
612-
return startWrappedCall(fullMethodName, wMethodDef, stream, headers, context, tag);
688+
return wMethodDef;
689+
}
690+
691+
private final class ServerCallParameters<ReqT, RespT> {
692+
ServerCallImpl<ReqT, RespT> call;
693+
ServerCallHandler<ReqT, RespT> callHandler;
694+
695+
public ServerCallParameters(ServerCallImpl<ReqT, RespT> call,
696+
ServerCallHandler<ReqT, RespT> callHandler) {
697+
this.call = call;
698+
this.callHandler = callHandler;
699+
}
613700
}
614701

615702
private <WReqT, WRespT> ServerStreamListener startWrappedCall(
616703
String fullMethodName,
617-
ServerMethodDefinition<WReqT, WRespT> methodDef,
618-
ServerStream stream,
619-
Metadata headers,
620-
Context.CancellableContext context,
621-
Tag tag) {
622-
623-
ServerCallImpl<WReqT, WRespT> call = new ServerCallImpl<>(
624-
stream,
625-
methodDef.getMethodDescriptor(),
626-
headers,
627-
context,
628-
decompressorRegistry,
629-
compressorRegistry,
630-
serverCallTracer,
631-
tag);
632-
633-
ServerCall.Listener<WReqT> listener =
634-
methodDef.getServerCallHandler().startCall(call, headers);
635-
if (listener == null) {
704+
ServerCallParameters<WReqT, WRespT> params,
705+
Metadata headers) {
706+
ServerCall.Listener<WReqT> callListener =
707+
params.callHandler.startCall(params.call, headers);
708+
if (callListener == null) {
636709
throw new NullPointerException(
637-
"startCall() returned a null listener for method " + fullMethodName);
710+
"startCall() returned a null listener for method " + fullMethodName);
638711
}
639-
return call.newServerStreamListener(listener);
712+
return params.call.newServerStreamListener(callListener);
640713
}
641714
}
642715

0 commit comments

Comments
 (0)