Настройка автоматического выключателя с исходящим адаптером для решения проблемы тайм-аута соединения

<int:service-activator input-channel="toKafka"  ref="conditionalProducerService" method="producerCircuitBreaker">

  <int:request-handler-advice-chain>
       <ref bean="circuitBreakerAdvice1" />
   </int:request-handler-advice-chain>
            </int:service-activator>

  <int:channel id="failedChannel2" />
  <int-kafka:outbound-channel-adapter
                            id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
                            <int:poller fixed-delay="1000" error-channel="failedChannel2" />
            </int-kafka:outbound-channel-adapter>


      <int:chain input-channel="failedChannel2">
        <int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
                            <int-stream:stderr-channel-adapter append-newline="true"/>
            </int:chain>

            <bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
                            <property name="threshold" value="2" />                                         
                            <property name="halfOpenAfter" value="12000" />                     
            </bean>

  public Message<?> producerCircuitBreaker(Message<?> payload) {
          throw  new RuntimeException("foo Pro");}

С приведенной выше конфигурацией мы пытаемся:

1. Ожидается, что сообщение о сбое будет распространено на error-channel = "failedChannel2", чего не происходит. Так как я не мог видеть преобразованный вывод в консоли.

2.CircuitBreaker работает для ServiceActivator (для исключения, связанного с приложением, здесь, как указано выше), но как мы можем настроить CB для сбойного случая для исходящего адаптера. Пример: время ожидания соединения истекло или сервер внезапно отключился / проблема с сетевым подключением / проблема с окружающей средой перед отправкой сообщения из канала SI на внешний (kafka) сервер. Можно ли настроить CB с исходящим адаптером для такой ситуации.

Согласно документу SI относительно рекомендаций по автоматическому выключателю, приведенному ниже.

«Обычно этот совет может использоваться для внешних служб, где может потребоваться некоторое время для отказа (например, тайм-аут при попытке установить сетевое соединение)».

Пожалуйста, подскажите, как этого добиться. Большое спасибо.

обновленная конфигурация:

        <int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />

 <int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
 <constructor-arg ref="producerContext"/>
</bean>
     <int:request-handler-advice-chain>
                                     <ref bean="circuitBreakerAdvice" />
                       </int:request-handler-advice-chain>

<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />

 <int:transformer input-channel="errorChannel"
                              order="1" ref="transformerService1" method="transformFailed">

                       </int:transformer>  

 public void transformFailed(Message<?> message) {
          APPLOGGER.log("transformer message test" + message);


 public class ProducerMessageHandler extends KafkaProducerMessageHandler{

            public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
                            super(kafkaProducerContext);
                            // TODO Auto-generated constructor stub
            }

            @Override
            public void handleMessageInternal(final Message<?> message) throws Exception {

                            //super.handleMessageInternal(message);
                            throw new RuntimeException("test foo");
            }

log :

Рекомендация применяется только к конечной точке, которой она назначена, а не к нисходящему потоку; к сожалению, схема kafka не позволяет применять ее к адаптеру исходящего канала. Я создал для этого проблему JIRA.


person sam    schedule 19.12.2015    source источник


Ответы (2)


Обходной путь - настроить KafkaProducerMessageHandler как <bean/> и ref его из <service-activator/>. Затем вы можете применить свой автоматический выключатель.

Другой обходной путь - использовать входящий шлюз ...

Я не уверен, почему вы не видите сообщение в канале ошибки; Обычно включение журнала DEBUG помогает отладить подобные вещи.

<int:service-activator ... ref="gw">
    <int:request-handler-advice-chain ...

</int:service-activator>

<int:gateway id="gw" default-request-channel="toKafka" 
         default-reply-timeout="0"
         error-channel="..." ... />

ИЗМЕНИТЬ

Я только что протестировал это, и он отлично работает ...

РЕДАКТИРОВАТЬ2

<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
    default-reply-timeout="0" />

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

Если вы не используете шлюз, вы можете обработать его с помощью канала очереди и опросчика. Для меня это тоже отлично работает ...

Или вы можете добавить промежуточный шлюз.

<int:channel id="toKafka">
    <int:queue />
</int:channel>

<int:service-activator input-channel="toKafka">
    <bean class="com.example.Foo" />
    <int:poller error-channel="errorChannel" fixed-delay="1000" />
    <int:request-handler-advice-chain>
        <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2"/>
            <property name="halfOpenAfter" value="12000"/>
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>

скопировано из чата для использования в будущем:

person Gary Russell    schedule 19.12.2015
comment
с 1-м обходным путем, предложенным выше (добавлен код выше в разделе update config), используя блок try / catch, мы можем настроить CB, но как распространить его на канал ошибки в этом случае. - person sam; 23.12.2015
comment
Я не вижу канала ошибки, настроенного в вашей обновленной конфигурации. - person sam; 05.01.2016
comment
да, вот в чем вопрос. как / где настроить канал ошибок в этом случае. - person Gary Russell; 05.01.2016
comment
Смотрите мою правку - это зависит от того, что запускает ваш поток (т.е. что находится выше по течению от адаптера kafka; в моем редактировании я использую шлюз обмена сообщениями). - person sam; 05.01.2016
comment
не уверен, в моем случае (пожалуйста, см. обновленную конфигурацию) он не переходит в канал ошибок. см. журнал, ожидается сообщение трансформатора. - person Gary Russell; 05.01.2016
comment
Вам нужно показать полный тестовый пример - как вы отправляете сообщение _1_? - person sam; 05.01.2016
comment
Позвольте нам продолжить это обсуждение в чате. - person Gary Russell; 05.01.2016
comment
Чтобы получить исключение TimeoutException (из-за того, что брокер не работает), вы должны установить _1_ на адаптере исходящего канала. Однако помните, что это значительно снизит производительность при работе брокера. - person sam; 06.01.2016
comment
01-05 @ 23: 44: 18,598 ОТЛАДКА org.springframework.integration.config.ServiceActivatorFactoryBean $ 1 - org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 получено сообщение: payload = {id = 145} helloaders, id = 145, id = e0591162-3b93-9bb6-0699-89b15b20e904}] DEBUG: - com.XXX.ProducerMessageHandler # 0 получил сообщение: GenericMessage [payload = hello, headers = {timestamp = 1452017658598, id = e0591162-3b93-9bb62099- ] получено исключение: org.springframework.messaging.MessageHandlingException: произошла ошибка в обработчике сообщений [com.XXX.ProducerMessageHandler # 0]; вложенное исключение - java.lang.RuntimeException: test foo 01-05 @ 23: 44: 18 606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend на канале 'toKafka', сообщение: GenericMessage [payload = hello, headers = {timestamp = 1452017658605, id = 61597941-b2f8-314d-141d-8f2c058dda4d}] 01-05 @ 23: 44: 18,606 ОТЛАДКА org.springframework.integration.config.ServiceActivatorFactoryBean $ 1 - org.spring.config.ServiceActivatorFactoryBean $ 1 - org.spring.spring.framework.spring.spring.framework.spring. message: GenericMessage [payload = hello, headers = {timestamp = 1452017658605, id = 61597941-b2f8-314d-141d-8f2c058dda4d}] DEBUG: - com.XXX.ProducerMessageHandler # 0 получено сообщение: GenericMessage = {helloload = { timestamp = 1452017658605, id = 61597941-b2f8-314d-141d-8f2c058dda4d}] получено исключение: org.springframework.messaging.MessageHandlingException: произошла ошибка в обработчике сообщений [com.XXX.ProducerMessageHandler # 0]; вложенное исключение - java.lang.RuntimeException: test foo 01-05 @ 23: 44: 18 606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - preSend на канале 'toKafka', сообщение: GenericMessage [payload = hello, headers = {timestamp = 1452017658606, id = 119afbf1-6104-feb1-eb44-f646aa932277}] 01-05 @ 23: 44: 18,606 DEBUG org.springframework.integration.config.ServiceActivatorFactoryBean.ServiceActivatorFactoryBean.ServiceActivatorFactoryBean.ServiceActivatorFactoryBean. message: GenericMessage [payload = hello, headers = {timestamp = 1452017658606, id = 119afbf1-6104-feb1-eb44-f646aa932277}] получено исключение: org.springframework.messaging.MessageHandlingException: произошла ошибка в обработчике сообщений. .config.ServiceActivatorFactoryBean $ 1 @ 6a0ef4b6]; вложенное исключение - org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice $ CircuitBreakerOpenException: прерыватель цепи открыт для org.springframework.integration.config. ServiceActivatorFactoryBean $ 1 @ 6a0ef4b6 01-05 @ 23: 44: 18,606 DEBUG org.springframework.integration.channel.PublishSubscribeChannel - предварительная отправка на канале 'toKafka', сообщение: GenericMessage [payload = hello-602 = 145 8efe-c827-e745-1387e6045e7d}] 01-05 @ 23: 44: 18,606 ОТЛАДКА org.springframework.integration.config.ServiceActivatorFactoryBean $ 1 - org.springframework.integration.config.ServiceBessageActivator4loadbase@mail.ru , headers = {timestamp = 1452017658606, id = 8dafe2e0-8efe-c827-e745-1387e6045e7d}] получено исключение: org.springframework.messaging.MessageHandlingException: произошла ошибка в обработчике сообщений [org.springframework.integration1 ]; вложенное исключение - org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice $ CircuitBreakerOpenException: прерыватель цепи открыт для org.springframework.integration.config.ServiceActivatorFactoryBean$1@6a0ef4b6 - person Gary Russell; 12.01.2016

Сэм: Привет, Гэри

вот как я отправляю сообщение

    for(int i=0;i<4;i++){ 
    try{ 
   toKafka.send(MessageBuilder 
   .withPayload("hello"). 
     build()); 
     }catch(Exception e){ 
     System.out.println("got exception : " + e); } }

Гэри: Итак, вы отправляете прямо на канал - вместо этого вам следует использовать MessagingGateway. Сэм: Привет, Гэри. спасибо. он работает со шлюзом.

настройка CB с помощью KafkaProducerMessageHandler - это нормально, но она покрывает любой сбой, описанный ниже.

public void handleMessageInternal (последнее сообщение сообщения) выдает исключение

но я хочу затронуть проблему с сетевыми ошибками, а также с недействительным списком брокеров / сервером, который он не покрывает, и я получаю исключение в консоли следующим образом:

журнал

хотите, чтобы CB также запускался в этом случае.

   12-24@16:46:46,250 DEBUGspringframework.integration.kafka.outbound.KafkaProducerMessageHandler - org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0 received message: GenericMessage [payload=TestVo[data=sample message]], headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@44286963, kafka_topic=tried_in, kafka_partitionId=2, id=7b596368-0aee-ddaa-2168-dc403e22c38f, timestamp=1450955805294}] 
   12-24@16:55:12,630 WARN apache.kafka.common.network.Selector - Error in I/O with /1.2.0.3 
   java.net.ConnectException: Connection refused: no further information 
   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) 
   at sun.nio.ch.SocketChannelImpl.finishConnect(Unknown Source) 
   at org.apache.kafka.common.network.Selector.poll(Selector.java:238) 
   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) 
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 
   at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 
   at java.lang.Thread.run(Unknown Source)

Гэри: исключение подключения должно возникать в handleMessageInternal (). Если исключение не генерируется, это ошибка. Я взгляну.

Будущее отбрасывается в handleMessageInternal - я открою проблему JIRA.

https://jira.spring.io/browse/INTEXT-218

Сэм: окей. Это покроет случай, когда сервер kafka по какой-то причине не работает?

Гэри: да; но вы можете уменьшить тайм-аут по умолчанию (60 с)

Привет @Gary, Спасибо за предложение. Но, похоже, у моей второй точки все еще есть проблема. CircuitBreaker не работает для самого ServiceActivator. В приведенном выше примере пробовал с помощью опросчика (кажется, работает один раз, но не работает в настоящее время). хочу иметь его для канала publishersubscriber.Поднял отдельный вопрос с подробностями о том же.

person sam    schedule 11.01.2016