Spring Kafka Consumer Retry с длительным интервалом отсрочки, дающим org.apache.kafka.clients.consumer.CommitFailedException

Я новичок в Spring-Kafka и пытаюсь реализовать повторную попытку в случае сбоя или любого исключения во время обработки сообщений kafka с использованием Spring Kafka RetryTemplate.

Я использовал следующий код:

// Это KafkaListenerContainerFactory:

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryRetry() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());
    factory.setRecoveryCallback(retryContext -> {
        ConsumerRecord consumerRecord = (ConsumerRecord) retryContext.getAttribute("record");
        logger.info("Recovery is called for message {} ", consumerRecord.value());
        return Optional.empty();
    });
    return factory;
}

// Повторная попытка шаблона

public RetryTemplate retryTemplate() {
    RetryTemplate retryTemplate = new RetryTemplate();
    FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
    // Todo: take from config
    fixedBackOffPolicy.setBackOffPeriod(240000);// 240seconds
    retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
    // Todo: take from config
    simpleRetryPolicy.setMaxAttempts(3);
    retryTemplate.setRetryPolicy(simpleRetryPolicy);
    return retryTemplate;
}
    
//

Это потребительская фабрика

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

Когда возникает какое-либо исключение, оно повторяется, как ожидалось, в соответствии с политикой повтора. После того, как максимальное количество повторных попыток исчерпания, он вызывает метод обратного вызова восстановления. Но вскоре после этого он выдает java.lang.IllegalStateException: этот обработчик ошибок не может обработать 'org.apache.kafka.clients.consumer.CommitFailedException' ; информация о записях недоступна.

Похоже, что он не может зафиксировать смещение, поскольку потребитель теперь отключен от группы, потому что он простаивал в течение длительного времени (backoffperiod * (maxretry-1)) перед следующим опросом.

Нужно ли мне добавлять max.poll.interval.ms с большим значением?

Есть ли какой-либо другой способ добиться этого, чтобы эта ошибка сбоя фиксации не возникала, даже если у потребителя так много времени на обработку и запланировано повторение попытки с большим интервалом.

Пожалуйста помоги мне с этим.


person meena    schedule 14.09.2020    source источник


Ответы (1)


Суммарная задержка backOff должна быть меньше max.poll.interval.ms, чтобы избежать перебалансировки.

Теперь предпочтительнее использовать SeekToCurrentErrorHandler вместо RetryTemplate, потому что тогда только каждая задержка (вместо совокупности) должна быть меньше max.poll.interval.ms

здесь.

person Gary Russell    schedule 14.09.2020
comment
Спасибо, Гэри, за пояснение, теперь я использую следующий код вместо RetryTemplate. ConcurrentKafkaListenerContainerFactory ‹String, String› factory = new ConcurrentKafkaListenerContainerFactory ‹› (); factory.setConsumerFactory (consumerFactory ()); FixedBackOff fixedbackOff = новый FixedBackOff (kafkaProperties.getBackoffPeriod (), kafkaProperties.getMaxAttempts ()); factory.setErrorHandler (новый SeekToCurrentErrorHandler (fixedbackOff)); У меня все работает нормально. Только один момент для подтверждения, здесь я должен использовать backoffperiod меньше max.poll.interval.ms. Верно? - person meena; 16.09.2020
comment
Это правильно; в дальнейшем не помещайте код в комментарии; читать сложно - лучше отредактировать вопрос и комментарий, что вы это сделали. - person Gary Russell; 16.09.2020