Как добавить глобальный перехватчик исключений на сервер gRPC?

Как в gRPC добавить глобальный перехватчик исключений, который перехватывает любые RuntimeException и передает значимую информацию клиенту?

например, метод divide может выдавать ArithmeticException с сообщением / by zero. На стороне сервера я могу написать:

@Override
public void divide(DivideRequest request, StreamObserver<DivideResponse> responseObserver) {
  int dom = request.getDenominator();
  int num = request.getNumerator();

  double result = num / dom;
  responseObserver.onNext(DivideResponse.newBuilder().setValue(result).build());
  responseObserver.onCompleted();
}

Если клиент передаст знаменатель = 0, он получит:

Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN

И сервер выводит

Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$2@62e95ade
java.lang.ArithmeticException: / by zero

Клиент не знает, что происходит.

Если я хочу передать сообщение / by zero клиенту, мне нужно изменить сервер на: (как описано в этом вопросе < / а>)

  try {
    double result = num / dom;
    responseObserver.onNext(DivideResponse.newBuilder().setValue(result).build());
    responseObserver.onCompleted();
  } catch (Exception e) {
    logger.error("onError : {}" , e.getMessage());
    responseObserver.onError(new StatusRuntimeException(Status.INTERNAL.withDescription(e.getMessage())));
  }

И если клиент отправит знаменатель = 0, он получит:

Exception in thread "main" io.grpc.StatusRuntimeException: INTERNAL: / by zero

Хорошо, / by zero передан клиенту.

Но проблема в том, что в действительно корпоративной среде будет много RuntimeException, и если я хочу передать эти сообщения об исключении клиенту, мне придется попробовать поймать каждый метод, что очень громоздко.

Есть ли какой-либо глобальный перехватчик, который перехватывает каждый метод, перехватывает RuntimeException и запускает onError и передает сообщение об ошибке клиенту? Так что мне не нужно иметь дело с RuntimeException в моем серверном коде.

Большое спасибо !

Примечание :

<grpc.version>1.0.1</grpc.version>
com.google.protobuf:proton:3.1.0
io.grpc:protoc-gen-grpc-java:1.0.1

person smallufo    schedule 30.09.2016    source источник


Ответы (9)


Код ниже будет улавливать все исключения времени выполнения. Также см. Ссылку https://github.com/grpc/grpc-java/issues/1552

public class GlobalGrpcExceptionHandler implements ServerInterceptor {

   @Override
   public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
         Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next) {
      ServerCall.Listener<ReqT> delegate = next.startCall(call, requestHeaders);
      return new SimpleForwardingServerCallListener<ReqT>(delegate) {
         @Override
         public void onHalfClose() {
            try {
               super.onHalfClose();
            } catch (Exception e) {
               call.close(Status.INTERNAL
                .withCause (e)
                .withDescription("error message"), new Metadata());
            }
         }
      };
   }
}
person mahesh    schedule 30.05.2018


В Kotlin вам нужно иначе структурировать ServerInterceptor. Я использовал grpc-kotlin в Micronaut, и исключения никогда не появлялись в SimpleForwardingServerCallListener onHalfClose или других обработчиках.

person markfickett    schedule 13.11.2020

Если вы хотите перехватывать исключения в всех конечных точках gRPC (включая те, которые обрабатывают клиентские потоки) и перехватчиках, вам, вероятно, понадобится нечто подобное следующему :

import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;

public class GlobalGrpcExceptionHandler implements ServerInterceptor {

    @Override
    public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
                                                                 Metadata requestHeaders,
                                                                 ServerCallHandler<ReqT, RespT> serverCallHandler) {
        try {
            ServerCall.Listener<ReqT> delegate = serverCallHandler.startCall(serverCall, requestHeaders);
            return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate) {
                @Override
                public void onMessage(ReqT message) {
                    try {
                        super.onMessage(message); // Here onNext is called (in case of client streams)
                    } catch (Throwable e) {
                        handleEndpointException(e, serverCall);
                    }
                }

                @Override
                public void onHalfClose() {
                    try {
                        super.onHalfClose(); // Here onCompleted is called (in case of client streams)
                    } catch (Throwable e) {
                        handleEndpointException(e, serverCall);
                    }
                }
            };
        } catch (Throwable t) {
            return handleInterceptorException(t, serverCall);
        }
    }

    private <ReqT, RespT> void handleEndpointException(Throwable t, ServerCall<ReqT, RespT> serverCall) {
        serverCall.close(Status.INTERNAL
                         .withCause(t)
                         .withDescription("An exception occurred in the endpoint implementation"), new Metadata());
    }

    private <ReqT, RespT> ServerCall.Listener<ReqT> handleInterceptorException(Throwable t, ServerCall<ReqT, RespT> serverCall) {
        serverCall.close(Status.INTERNAL
                         .withCause(t)
                         .withDescription("An exception occurred in a **subsequent** interceptor"), new Metadata());

        return new ServerCall.Listener<ReqT>() {
            // no-op
        };
    }
}

ОТКАЗ ОТ ОТВЕТСТВЕННОСТИ: я собрал это, изучив реализацию, я не читал его в документации и не уверен, что он изменится. Для справки я имею в виду io.grpc версию 1.30.

person burubum    schedule 16.02.2021

Вы читали grpc java примеры для перехватчика?

Итак, в моем случае мы используем код и сообщение в качестве стандарта для определения того, какую ошибку сервер отправил клиенту.

Примеры: сервер отправляет ответ, например

{
  code: 409,
  message: 'Id xxx aldready exist'
}

Таким образом, в клиенте вы можете настроить клиентский перехватчик для получения этого кода и ответа с помощью Reflection. Fyi, мы используем Lognet Spring Boot starter для grpc в качестве сервера и Spring boot для клиента.

person Reza Adha Hamonangan    schedule 18.09.2017

public class GrpcExceptionHandler implements ServerInterceptor {

private final Logger logger = LoggerFactory.getLogger (GrpcExceptionHandler.class);

@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall (ServerCall<ReqT, RespT> call,
                                                              Metadata headers,
                                                              ServerCallHandler<ReqT, RespT> next) {
    logger.info ("GRPC call at: {}", Instant.now ());
    ServerCall.Listener<ReqT> listener;

    try {
        listener = next.startCall (call, headers);
    } catch (Throwable ex) {
        logger.error ("Uncaught exception from grpc service");
        call.close (Status.INTERNAL
                .withCause (ex)
                .withDescription ("Uncaught exception from grpc service"), null);
        return new ServerCall.Listener<ReqT>() {};
    }

    return listener;
}

}

Образец перехватчика выше.

Вам, конечно же, нужно загрузить его, прежде чем ожидать от него чего-либо;

serverBuilder.addService (ServerInterceptors.intercept (bindableService, interceptor));

ОБНОВЛЕНИЕ

public interface ServerCallHandler<RequestT, ResponseT> {
  /**
   * Produce a non-{@code null} listener for the incoming call. Implementations are free to call
   * methods on {@code call} before this method has returned.
   *
   * <p>If the implementation throws an exception, {@code call} will be closed with an error.
   * Implementations must not throw an exception if they started processing that may use {@code
   * call} on another thread.
   *
   * @param call object for responding to the remote client.
   * @return listener for processing incoming request messages for {@code call}
   */
  ServerCall.Listener<RequestT> startCall(
      ServerCall<RequestT, ResponseT> call,
      Metadata headers);
}

К сожалению, другой контекст потока означает отсутствие области обработки исключений, поэтому мой ответ - это не то решение, которое вы ищете.

person Gladmir    schedule 04.10.2016
comment
Извините, это не работает. Эта строка logger.error ("Uncaught exception from grpc service"); не дошла! Я тоже чувствую себя странно. - person smallufo; 04.10.2016
comment
Что ж, перехват действительно происходит, но, как указано в документации, next.startCall(call, headers) немедленно возвращается и выполняется в другом потоке, и в конечном итоге мы теряем область стека для исключения. К сожалению, я не знаю, возможно ли какое-либо обходное решение на данный момент. - person Gladmir; 04.10.2016

Если вы можете преобразовать приложение сервера gRPC для весенней загрузки с помощью yidongnan / grpc-spring-boot- starter, тогда вы можете написать @GrpcAdvice, аналогично Spring Boot @ControllerAdvice как

    @GrpcAdvice
    public class ExceptionHandler {
    
      @GrpcExceptionHandler(ValidationErrorException.class)
      public StatusRuntimeException handleValidationError(ValidationErrorException cause) {
    
         Status.INVALID_ARGUMENT.withDescription("Invalid Argument")
            .asRuntimeException()
      }
    }
person Pankaj    schedule 30.05.2021

На Kotlin добавление try / catch для методов слушателя не работает, по какой-то причине исключения проглатываются.

По ссылке, опубликованной @markficket, я реализовал решение, создающее реализацию SimpleForwardingServerCall.

class ErrorHandlerServerInterceptor : ServerInterceptor {

    private inner class StatusExceptionHandlingServerCall<ReqT, RespT>(delegate: ServerCall<ReqT, RespT>) 
        : ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate) {

        override fun close(status: Status, trailers: Metadata) {
            status.run {
                when {
                    isOk -> status
                    cause is MyException -> myExceptionHandler(cause as MyException)
                    else -> defaultExceptionHandler(cause)
                }
            }
                .let { super.close(it, trailers) }
        }

        private fun myExceptionHandler(cause: MyException): Status = cause.run { ... }

        private fun defaultExceptionHandler(cause: Throwable?): Status = cause?.run { ... }

    }

    override fun <ReqT : Any, RespT : Any> interceptCall(
        call: ServerCall<ReqT, RespT>,
        metadata: Metadata,
        next: ServerCallHandler<ReqT, RespT>
    ): ServerCall.Listener<ReqT> =
        next.startCall(StatusExceptionHandlingServerCall(call), metadata)

}

Тогда конечно вам нужно добавить перехватчик при создании сервера.

ServerBuilder
  .forPort(port)
  .executor(Dispatchers.IO.asExecutor())
  .addService(service)
  .intercept(ErrorHandlerServerInterceptor())
  .build()

А затем вы можете просто выбросить исключения в свои методы gRPC.

override suspend fun someGrpcCall(request: Request): Response {
  ... code ...
  throw NotFoundMyException("Cannot find entity")
}
person Maximiliano De Lorenzo    schedule 19.07.2021

Я использовал АОП для решения глобальных ошибок rpc, и мне это удобно. Я использую АОП в guice, и способ его использования весной должен быть аналогичным

  1. определить метод-перехватчик

```

public class ServerExceptionInterceptor implements MethodInterceptor {
    final static Logger logger = LoggerFactory.getLogger(ServerExceptionInterceptor.class);

    public Object invoke(MethodInvocation invocation) throws Throwable {
        try {
            return  invocation.proceed();
        } catch (Exception ex) {
            String stackTrace = Throwables.getStackTraceAsString(ex);
            logger.error("##grpc server side error, {}", stackTrace);
            Object[] args = invocation.getArguments();
            StreamObserver<?> responseObserver = (StreamObserver<?>)args[1];
            responseObserver.onError(Status.INTERNAL
                    .withDescription(stackTrace)
                    .withCause(ex)
                    .asRuntimeException());

            return null;
        }
    }

    @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RUNTIME)
    public @interface WrapError {
        String value() default "";
    }

}

```

  1. добавить @WrapError ко всем методам rpc @Override @WrapError public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) { HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build(); logger.info("#rpc server, sayHello, planId: {}", req.getName()); if(true) throw new RuntimeException("testing-rpc-error"); //simulate an exception responseObserver.onNext(reply); responseObserver.onCompleted(); }

    1. bind the interceptor in guice module

ServerExceptionInterceptor interceptor = new ServerExceptionInterceptor(); requestInjection(interceptor); bindInterceptor(Matchers.any(), Matchers.annotatedWith(WrapError.class), interceptor);

4. тестирование

person Iceberg    schedule 07.05.2018