<int:service-activator input-channel="toKafka" ref="conditionalProducerService" method="producerCircuitBreaker">
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice1" />
</int:request-handler-advice-chain>
</int:service-activator>
<int:channel id="failedChannel2" />
<int-kafka:outbound-channel-adapter
id="kafkaOutboundChannelAdapter" kafka-producer-context-ref="producerContext" auto-startup="false" channel="toKafka" message-key="kafka_messageKey">
<int:poller fixed-delay="1000" error-channel="failedChannel2" />
</int-kafka:outbound-channel-adapter>
<int:chain input-channel="failedChannel2">
<int:transformer expression="'failed:' + payload.failedMessage.payload + ' with ' + payload.cause.message" />
<int-stream:stderr-channel-adapter append-newline="true"/>
</int:chain>
<bean id="circuitBreakerAdvice1" class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice">
<property name="threshold" value="2" />
<property name="halfOpenAfter" value="12000" />
</bean>
public Message<?> producerCircuitBreaker(Message<?> payload) {
throw new RuntimeException("foo Pro");}
С приведенной выше конфигурацией мы пытаемся:
1. Ожидается, что сообщение о сбое будет распространено на error-channel = "failedChannel2", чего не происходит. Так как я не мог видеть преобразованный вывод в консоли.
2.CircuitBreaker работает для ServiceActivator (для исключения, связанного с приложением, здесь, как указано выше), но как мы можем настроить CB для сбойного случая для исходящего адаптера. Пример: время ожидания соединения истекло или сервер внезапно отключился / проблема с сетевым подключением / проблема с окружающей средой перед отправкой сообщения из канала SI на внешний (kafka) сервер. Можно ли настроить CB с исходящим адаптером для такой ситуации.
Согласно документу SI относительно рекомендаций по автоматическому выключателю, приведенному ниже.
«Обычно этот совет может использоваться для внешних служб, где может потребоваться некоторое время для отказа (например, тайм-аут при попытке установить сетевое соединение)».
Пожалуйста, подскажите, как этого добиться. Большое спасибо.
обновленная конфигурация:
<int:gateway default-request-channel="toKafka" error-channel="errorChannel"
default-reply-timeout="0" />
<int:service-activator input-channel="toKafka">
<bean class="com.XXX.ProducerMessageHandler" >
<constructor-arg ref="producerContext"/>
</bean>
<int:request-handler-advice-chain>
<ref bean="circuitBreakerAdvice" />
</int:request-handler-advice-chain>
<bean id="transformerService1" class="com.XXX.KafkaTransformerTest" />
<int:transformer input-channel="errorChannel"
order="1" ref="transformerService1" method="transformFailed">
</int:transformer>
public void transformFailed(Message<?> message) {
APPLOGGER.log("transformer message test" + message);
public class ProducerMessageHandler extends KafkaProducerMessageHandler{
public ProducerMessageHandler(KafkaProducerContext kafkaProducerContext) {
super(kafkaProducerContext);
// TODO Auto-generated constructor stub
}
@Override
public void handleMessageInternal(final Message<?> message) throws Exception {
//super.handleMessageInternal(message);
throw new RuntimeException("test foo");
}
log :
Рекомендация применяется только к конечной точке, которой она назначена, а не к нисходящему потоку; к сожалению, схема kafka не позволяет применять ее к адаптеру исходящего канала. Я создал для этого проблему JIRA.