Ручное подтверждение сообщений: Spring Cloud Stream Kafka

Сценарий, который я хочу реализовать, - это принять сообщение от Kafka, обработать его, если какое-то условие не выполняется, я не хочу подтверждать сообщение. Для этого я нашел в справочной документации Spring Cloud Stream,

autoCommitOffset Следует ли автоматически фиксировать смещения после обработки сообщения. Если установлено значение false, заголовок подтверждения будет доступен в заголовках сообщений для позднего подтверждения.

По умолчанию: true.

У меня вопрос: после установки autoCommitOffset на false, как я могу подтвердить сообщение? Был бы очень признателен пример кода.


person Rahul Vanimisetty    schedule 14.06.2016    source источник
comment
Я не уверен, что такое Sprint Cloud Stream Kafka. Однако, если вы используете обычный клиент Kafka, он должен предоставлять метод под названием commit () или аналогичный для фиксации смещений вручную.   -  person Matthias J. Sax    schedule 15.06.2016


Ответы (1)


Я дал ответ на вопрос здесь https://github.com/spring-cloud/spring-cloud-stream/issues/575

По сути, все сводится к настройке spring.cloud.stream.kafka.bindings.input.consumer.autoCommitOffset=false

а затем обрабатываем заголовок подтверждения:

@SpringBootApplication
@EnableBinding(Sink.class)
   public class ManuallyAcknowdledgingConsumer {

      public static void main(String[] args) {
         SpringApplication.run(ManuallyAcknowdledgingConsumer.class, args);
      }

      @StreamListener(Sink.INPUT)
      public void process(Message<?> message) {
         System.out.println(message.getPayload());
         Acknowledgment acknowledgment = message.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
        if (acknowledgment != null) {
           System.out.println("Acknowledgment provided");
           acknowledgment.acknowledge();
        }
    }
}
person Marius Bogoevici    schedule 15.06.2016
comment
Спасибо, Мариус. Кроме того, поделитесь операторами импорта, чтобы узнать API класса подтверждения. Хотел узнать, есть ли способ не подтверждать сообщение. - person Rahul Vanimisetty; 21.06.2016
comment
Это ответ. Спасибо, Мариус! - person Peter Davis; 26.02.2017
comment
@ marius-bogoevici: доступна ли эта функция в функциональных интерфейсах Spring Cloud Stream? - person pradosh nair; 22.04.2020