Я пытаюсь агрегировать с помощью Jet, источник и приемник - это тема Kafka, требование - принимать сообщения GPB (google proto buf) из источника и публиковать сообщения GPB. Проблема в том, что я могу опубликовать Double
, но не сообщение GPB, и это вызывает ошибку компиляции.
Это отлично работает:
Pipeline p = Pipeline.create();
p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>))
.map(s->s.getValue() ).groupingKey(x->x.account)
.rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));
Несмотря на то, что приведенный выше код работает нормально, он публикует double
, чтобы поглотить тему, в то время как мое требование - опубликовать GPB, имеющий атрибут double
, чтобы поглотить тему. Когда я пытаюсь сделать это, помещая map
перед drainTo
, у меня возникает синтаксическая ошибка. Вот что я пробовал:
.rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
.map(k->Amount.newBuilder().setAmount(k.getValue()).build())
.drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));
Сумма - это сообщение GPB с атрибутом double
. Это дает мне синтаксическую ошибку, которую я не понимаю. Не могли бы вы помочь мне пройти через это.
Не могли бы вы поделиться некоторыми документами или ссылками, где есть разные агрегаты для разных сценариев? Я просмотрел образцы, демонстрации Hazelcast, не все, но несколько, но не нашел там своего варианта использования. Большое спасибо.