Я новичок в Spring-Kafka и пытаюсь реализовать повторную попытку в случае сбоя или любого исключения во время обработки сообщений kafka с использованием Spring Kafka RetryTemplate.
Я использовал следующий код:
// Это KafkaListenerContainerFactory:
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRetryTemplate(retryTemplate());
factory.setRecoveryCallback(retryContext -> {
ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
logger.info("Recovery is called for message {} ", consumerRecord.value());
return Optional.empty();
});
return factory;
}
// Повторная попытка шаблона
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
// Todo: take from config
fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
// Todo: take from config
simpleRetryPolicy.setMaxAttempts(3);
retryTemplate.setRetryPolicy(simpleRetryPolicy);
return retryTemplate;
}
//
Это потребительская фабрика
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
Когда возникает какое-либо исключение, оно повторяется, как ожидалось, в соответствии с политикой повтора. После того, как максимальное количество повторных попыток исчерпания, он вызывает метод обратного вызова восстановления. Но вскоре после этого он выдает java.lang.IllegalStateException: этот обработчик ошибок не может обработать 'org.apache.kafka.clients.consumer.CommitFailedException' ; информация о записях недоступна.
Похоже, что он не может зафиксировать смещение, поскольку потребитель теперь отключен от группы, потому что он простаивал в течение длительного времени (backoffperiod * (maxretry-1)) перед следующим опросом.
Нужно ли мне добавлять max.poll.interval.ms с большим значением?
Есть ли какой-либо другой способ добиться этого, чтобы эта ошибка сбоя фиксации не возникала, даже если у потребителя так много времени на обработку и запланировано повторение попытки с большим интервалом.
Пожалуйста помоги мне с этим.