Новая потребительская группа выбирает сообщения с самого начала

У меня был потребитель Kafka, аннотированный StreamListener без идентификатора группы. Теперь, когда я добавил идентификатор группы через spring.cloud.stream.bindings.input.group, он начал обрабатывать все старые сообщения с самого начала, которые уже использовались этим потребителем, когда у него не было идентификатора группы.

Как я могу настроить его для выбора только необработанных сообщений, которые не были выбраны ранее?

Я попытался добавить spring.cloud.stream.bindings.input.startOffset к последним и resetOffsets к истине, но это не сработало.

Заранее спасибо. :)




Ответы (1)


Если вы уже указали идентификатор группы (это обязательное свойство, и я предполагаю, что spring сгенерировал его для вас), то установка начального смещения или auto.offset.reset не повлияет.

Вы можете снова изменить идентификатор, чтобы они работали, или вы можете использовать инструмент Kafka cli, чтобы настроить группу на нужные вам смещения.

person OneCricketeer    schedule 21.01.2021
comment
Я создал новый идентификатор группы, но он все еще выбирает с самого начала. github.com/spring-cloud/spring-cloud- поток-связыватель-кафка/проблемы/ - person Search Again; 22.01.2021
comment
Ответы, которые вы получили, такие же, как я сказал: группа создается, если вы ее не установите, если вы установите совершенно новую, тогда она будет использовать только функции сброса. Если вы хотите продолжить с того места, где вы ранее остановились в сгенерированной группе, вам нужно найти ее имя и вручную зафиксировать их обратно в новое имя группы. В противном случае вы читаете с самого начала и устанавливаете способ обработки дубликатов в ваших нижестоящих потребителях, и перезапуск этого приложения будет продолжать собирать самое последнее зафиксированное смещение (я полагаю, вы используете автоматические фиксации?) - person OneCricketeer; 22.01.2021