Коннектор Hazelcast Jet для Apache Pulsar в качестве источника

Я не смог найти Hazlecast Jet коннектор источника для Apache Pulsar. Кто-нибудь пробовал это? Оцените любые направления, точки, источники, соображения, если мне нужно написать собственный stream коннектор для Pulsar в качестве источника для Jet?


person Vijay Veeraraghavan    schedule 30.12.2019    source источник
comment
Я устанавливаю соединение Pulsar и потребителя в SourceBuilder.timestampedStream, добавляю сообщения (consumer.receive (10, TimeUnit.MILLISECONDS)), прочитанные из PulsarConsumer, в буфер на fillBufferFn, наконец, закрывая потребителя и соединение с Pulsar на destroyFn. Это сработало. Однако как обрабатывать createSnapshotFn и restoreSnapshotFn? Поскольку я мало работал над Kafka, я не могу связать аналогичный подход с Pulsar. Есть ссылки?   -  person Vijay Veeraraghavan    schedule 06.01.2020
comment
createSnapshotFn должен возвращать текущий курсор сообщения, чтобы он был сохранен, а restoreSnapshotFn должен восстанавливать курсор и устанавливать его для клиента, чтобы получать сообщения с того места, где он остановился.   -  person eminn    schedule 08.01.2020


Ответы (2)


Первоначальная версия коннектора Jet для Apache Pulsar недавно реализована здесь . Он еще не был тщательно протестирован. Подробнее см. В проектном документе в котором указаны недостатки и недостатки коннектора, а также руководство. Если в этом есть что-то непонятное, вы можете спросить еще раз.

person Ufuk Yılmaz    schedule 15.04.2020
comment
Спасибо, Уфук, я сделаю вариант использования Pulsar в работе Jet. - person Vijay Veeraraghavan; 20.04.2020

На данный момент у Hazelcast Jet нет разъема для Apache Pulsar (версия 4.0). Если вы хотите внести свой вклад, вы можете взглянуть на Source Builder и его раздел справочного руководства в качестве отправной точки.

Также ознакомьтесь с существующими реализациями различных соединителей в репозитории модулей расширения Hazelcast Jet, который использует API конструктора исходного кода и вносит свой вклад в него.

person eminn    schedule 30.12.2019
comment
Я написал источник пульсара, сохранив Kafka-connect в качестве ссылки. Проблема, с которой я сталкиваюсь, заключается в том, что если я использую Pulsar reader для чтения сообщений Pulsar, в отсутствие этого ридера любое сообщение, написанное в Pulsar, не читается, но все новые входящие сообщения читаются. Кроме того, при перезапуске задания при таком подходе не перечитываются все сообщения с самого начала. В другом подходе, если я использую pulsar consumer, каждый раз, когда задание Jet перезапускается, оно всегда запускается с первого из-за того, что сообщения не подтверждены, поэтому сообщения не остаются необработанными, но возможна дублирование обработки. Любые комментарии? - person Vijay Veeraraghavan; 08.01.2020
comment
Вы должны использовать интерфейс Pulsar Reader с предоставленной пользователем начальной точкой в ​​начале. Если моментальные снимки Jet включены, он должен сохранять текущий идентификатор сообщения, а в случае сбоя необходимо восстановить идентификатор сообщения из снимка. Согласно документации Pulsar Reader, вы можно использовать только для неразмеченных тем. - person eminn; 09.01.2020
comment
Итак, в таком случае снимок необходимо сохранить? Рассмотрим ситуации, когда потоковое приложение может быть перезагружено в холодном режиме. - person Vijay Veeraraghavan; 11.01.2020
comment
Hazelcast Jet сохраняет данные моментального снимка в Hazelcast IMap, который сохраняет данные в памяти. С помощью функции перезапуска кластера без потерь данные могут быть сохранены на диске. - person eminn; 13.01.2020