Context CancellationListener служб gRPC не запускается, когда клиент отменяет вызов службы

У меня есть потоковая служба, которая бесконечно передает потоки с сервера на клиент, пока клиент не отменит.

На стороне сервера у меня есть поток, который заполняет ehcache данными, полученными из базы данных.

Ehcache предоставляет обратные вызовы для событий кеша, то есть, когда элемент добавлен, когда элемент удален и т. Д. Меня интересует только уведомление клиентов, когда элемент помещается в кеш, поэтому, когда клиент подключается к моей службе gRPC, я регистрируюсь обратный вызов notifyElementPut() с кешем, который ссылается на подключенных клиентов StreamObserver:

public class GrpcAwareCacheEventListener extends CacheEventListenerAdapter {


  private StreamObserver<FooUpdateResponse> responseObserver;

  public GrpcAwareCacheEventListener(
      StreamObserver<FooUpdateResponse> responseObserver) {
    this.responseObserver = responseObserver;
  }


  @Override
  public void notifyElementPut(Ehcache cache, Element element) throws CacheException {

    Foo foo = (Foo) element.getObjectValue();
    if (foo != null) {
      responseObserver.onNext(
          FooResponse.newBuilder().setFoo(foo).build());
    }
  }
}

Моя потоковая служба foo выглядит следующим образом:

    public void streamFooUpdates(Empty request,
              StreamObserver<FooResponse> responseObserver) {

            final CacheEventListener eventListener = new GrpcAwareCacheEventListener(responseObserver);
            fooCache.getCacheEventNotificationService().registerListener(eventListener);
            Context.current().withCancellation().addListener(new CancellationListener() {

              public void cancelled(Context context) {
    log.info("inside context cancelled callback");      
  fooCache.getCacheEventNotificationService().unregisterListener(eventListener);
              }

            }, ForkJoinPool.commonPool());



          }

Все это работает нормально, клиент уведомляется обо всех обновлениях foo, пока он подключен.

Однако после того, как клиент отключит или явно отменит вызов, я ожидаю, что серверный обработчик отмены Context сработает, отменив регистрацию обратного вызова с кешем.

Это не так, независимо от того, закрывает ли клиент канал или явно отменяет вызов. (Я ожидаю, что контекст отмены на стороне сервера сработает для обоих этих событий). Мне интересно, неверна ли моя семантика отмены на стороне клиента, вот мой клиентский код, взятый из тестового примера:

Channel channel = ManagedChannelBuilder.forAddress("localhost", 25001)
        .usePlaintext().build();

    FooServiceGrpc.FooService stub = FooServiceGrpc
        .newStub(channel);


    ClientCallStreamObserver<FooResponse> cancellableObserver = new ClientCallStreamObserver<FooResponse>(){
      public void onNext(FooResponse response) {
        log.info("received foo: {}", response.getFoo());
      }

      public void onError(Throwable throwable) {

      }

      public void onCompleted() {

      }

      public boolean isReady() {
        return false;
      }

      public void setOnReadyHandler(Runnable runnable) {

      }

      public void disableAutoInboundFlowControl() {

      }

      public void request(int i) {

      }

      public void setMessageCompression(boolean b) {

      }

      public void cancel(@Nullable String s, @Nullable Throwable throwable) {

      }
    };

    stub.streamFooUpdates(Empty.newBuilder().build(), cancellableObserver);
    Thread.sleep(10000); // sleep 10 seconds while messages are received.
    cancellableObserver.cancel("cancelling from test", null); //explicit cancel
    ((ManagedChannel) chan).shutdown().awaitTermination(5, TimeUnit.SECONDS); //shutdown as well, for good measure.

    Thread.sleep(7000); //channel should be shutdown by now.

  }

Мне интересно, почему сервер не запускает обратный вызов «Контекст отменен».

Спасибо!


person mancini0    schedule 12.09.2018    source источник


Ответы (1)


Вы неправильно отменяете вызов клиента. StreamObserver во втором аргументе stub.streamFooUpdates() - это ваш обратный вызов. Вы не должны ничего называть по этому поводу StreamObserver.

Есть два способа отменить звонок со стороны клиента.

Вариант 1. Передайте ClientResponseObserver в качестве второго аргумента, реализуйте beforeStart(), что даст вам ClientCallStreamObserver, по которому вы можете вызвать cancel().

Вариант 2. Запустите stub.streamFooUpdates() внутри CancellableContext и отмените Context, чтобы отменить вызов. Обратите внимание, что CancellableContext всегда должен быть отменен, для этого предназначен блок finally.

CancellableContext withCancellation = Context.current().withCancellation();
try {
  withCancellation.run(() -> {
      stub.streamFooUpdates(...);
      Thread.sleep(10000);
      withCancellation.cancel(null);
  });
} finally {
  withCancellation.cancel(null);
}
person Kun Zhang    schedule 12.09.2018