Как сделать так, чтобы шторм не писал сообщение дважды в локальный файл?

Я создаю топо для получения сообщений от kafka, а затем grep какое-то ключевое слово, если оно подходит, записываю в локальный файл.

Я использую OpaqueTridentKafkaSpout storm-kafka, чтобы кортеж не пропустил и не повторился, но рассмотрим одну ситуацию: при записи сообщения в локальный файл возникает какая-то ошибка (например, не хватает места). В этот момент некоторые сообщения были записаны в локальный файл, а другие нет, если носик повторно отправит сообщение, сообщение будет записано дважды.

Как с этим справиться?


person jinhong_lu    schedule 03.07.2015    source источник
comment
если сообщение отправлено снова, оно будет обработано снова. Ваша забота должна быть о том, что происходит с кортежами в случае сбоя. Вы должны смотреть на то, что пишет в файлы, чтобы понять, как обрабатывается сбой (если кортежи не выполнены или подтверждены). Если кортежи не удались, носик снова прочитает их из kafka, если они будут подтверждены, они будут потреблены, и носик не будет пытаться их прочитать.   -  person SQL.injection    schedule 03.07.2015


Ответы (2)


Это просто. Код, который записывает в файл, должен сделать следующее:

1) Подтвердите кортеж — только если запись в файл прошла успешно. 2) Fail the tuple - Если запись в файл НЕ была успешной.

Для всех кортежей, которые были подтверждены, Kafka spout НЕ будет повторно отправлять их. Неудачные кортежи будут сброшены носителем.

person redTiger    schedule 18.08.2015

Для этой цели вы должны разработать свою стратегию якорения. Я предлагаю вам уменьшить размер пакета из kafkaspoutconfig и сохранить выбранные вами сообщения в списке. Когда все сообщения в пакете обработаны, вы можете записать содержимое списка в локальный файл.

Как вы знаете, Trident обрабатывает поток в пакетном режиме, если ваша система выдает какую-либо ошибку при обработке любого из кортежей в потоке, весь пакет будет отброшен.

В вашем случае вы можете окружить блоком кода try catch, который отвечает за запись в локальный файл, а в блоке catch вы должны бросить backtype.storm.topology.ReportedFailedException. Таким образом, вы можете обеспечить ровно одну семантику.

Также вы должны использовать транзакционный носик кафки, чтобы обеспечить ровно одну семантику.

person serkan kucukbay    schedule 21.08.2015