Как обработать в Reactor Netty исключение io.netty.channel.ConnectTimeoutException

Я пытаюсь использовать Reactor Netty TcpClient реактивным способом для взаимодействия с хостами, которые могут быть недоступны. Вот пример логики инициализации канала:

ConnectionProvider connectionProvider = ConnectionProvider.fixed("fixed", 50);
TcpClient.create(connectionProvider)
  .host(host).port(port)
  .wiretap(true)
  .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 50)
  .doOnConnect(x -> log.trace("Connect to {}:{}", host, port))
  .doOnConnected(conn -> log.trace("Connected {}", conn.channel()))
  .connect()
  .subscribe(this::utilizeConnection);

вывод, который я получаю:

2019-09-04 08:23:13.612 TRACE 71988 --- [ioEventLoop-4-3] c.c.pcb.poc.network.tcp.NettyTcpSender   : Connect to 192.168.88.210:2000
2019-09-04 08:23:13.684  WARN 71988 --- [actor-tcp-nio-4] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.operationComplete()

reactor.core.Exceptions$ErrorCallbackNotImplemented: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.88.210:2000
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out: /192.168.88.210:2000
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe$1.run(AbstractNioChannel.java:267) ~[netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:127) ~[netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:405) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) [netty-transport-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) [netty-common-4.1.36.Final.jar:4.1.36.Final]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoCreate] :
    reactor.core.publisher.Mono.create(Mono.java:183)
    reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:130)
Error has been observed by the following operator(s):
    |_  Mono.create ⇢ reactor.netty.resources.PooledConnectionProvider.acquire(PooledConnectionProvider.java:130)
    |_  Mono.doOnSubscribe ⇢ reactor.netty.tcp.TcpClientDoOn.connect(TcpClientDoOn.java:58)

«Входящий» и «исходящий» имеют специальный метод для обработки своих ошибок, но они работают поверх экземпляра Connection, который не будет создан, если у вас будет «тайм-аут соединения».

Я пытался:

  1. Исключение, которое я получаю, заключено в ErrorCallbackNotImplemented. Но я не смог найти способ реализовать какой-либо ErrorCallback

  2. Журнал содержит предупреждающее сообщение от io.netty.util.concurrent.DefaultPromise. но я не мог найти способ сделать собственное обещание, чтобы справиться с этим должным образом.

  3. Я не обнаружил никаких конфигураций, которые могут каким-либо образом перехватывать тайм-ауты соединения.

  4. обходной путь. Заблокированный подход к созданию соединения (.block () вместо .subscribe ()) позволит мне перехватить любое соединение, создающее исключения в простом блоке try-catch, но я потеряю преимущества реактивного подхода с таким обходным путем.

Может ли кто-нибудь предложить мне хоть что-нибудь, чтобы помочь мне найти правильный способ справиться с исключением io.netty.channel.ConnectTimeoutException?


person wdscr.driver    schedule 04.09.2019    source источник


Ответы (1)


Не забудьте реализовать свой обратный вызов ошибки

Обычно reactor.core.Exceptions$ErrorCallbackNotImplemented происходит, когда есть подписка на .subscribe метод, основанный на labmda (то же самое для Mono и Flux).

Если вы собираетесь посмотреть исходники, здесь и здесь, вы найдете место, куда бросается reactor.core.Exceptions$ErrorCallbackNotImplemented!

Очки действий

Чтобы обработать исходный io.netty.channel.ConnectTimeoutException, я бы рекомендовал посмотреть Обработка ошибок раздел исходной документации Project Reactor

person Oleh Dokuka    schedule 10.09.2019