рабочая логика клиента kinesis

Я хочу понять, когда метод processRecords IRecordProcessor вызывается из worker. Если мой предыдущий вызов processRecords еще не завершен, будет ли рабочий вызывать следующий processRecords? Начнет ли worker извлекать новые записи из kinesis или будет ждать завершения выполнения текущих записей.

В основном я хочу долго ждать, если processRecords получит какое-то исключение при сохранении записей во внешней базе данных, так как база данных не работает или какая-то другая ошибка. Итак, хотите подтвердить, что не будет никаких проблем, если рабочий не начнет извлекать новые записи до тех пор, пока не будет завершена обработка?


person user1846749    schedule 22.01.2017    source источник


Ответы (1)


Выдержка из других вопросов:

Приложение (с помощью KCL) будет продолжать опрашивать «Shard Iterator» в фоновом режиме, таким образом, вы будете уведомлены о новых данных, когда они поступят.

Источник: https://stackoverflow.com/a/35582161/1622134

А также под «рабочим» вы подразумеваете «рабочий» поток в приложении; который является исполняемым.

Каждый сегмент обрабатывается ровно одним рабочим процессом KCL и имеет только один соответствующий обработчик записей, поэтому вам никогда не потребуется несколько экземпляров для обработки одного сегмента. См. Worker.java в исходном коде KCL.

Источник: https://stackoverflow.com/a/34509567/1622134

Чтобы ответить на ваш вопрос, вы можете это сделать в своей реализации processRecords. При обработке записей используйте блок try-catch и записывайте контрольную точку в DynamoDB тогда и только тогда, когда часть попытки завершается успешно. Туда; если будет ошибка при записи во внешнюю БД, вы не потеряете записи и при перезапуске. Вы также должны сохранить эти данные записи (которые не могут быть вставлены в базу данных) в другое место для последующей обработки.

Также см. этот ответ: https://stackoverflow.com/a/32517002/1622134

person az3    schedule 23.01.2017
comment
В worker.java он вызывает runProcessLoop и в том, что он вызывает shardConsumer.consumeShard(), там он вызывает checkAndSubmitNextTask() в том, что он проверяет readyForNextTask или нет. Если notReady, новые записи не потребляются. Так как же возможно, что worker извлекает новые записи без обработки предыдущих записей. - person user1846749; 23.01.2017
comment
Если на вашей стороне происходит временное отключение базы данных (что предотвращает использование записей); вам следует остановить приложение Kinesis Consumer Application, пока это не будет исправлено. Или есть второй подход: в последней ссылке моего ответа есть строка, объясняющая ваш вопрос: Но если это не удается, запишите это в другое место, чтобы выяснить причину, по которой это не удалось. - Таким образом, вы можете обрабатывать записи, использованные во время сбоя базы данных, позже вручную. - person az3; 26.01.2017