Проблема с синтаксисом Hazelcast-Jet DrainTo

Я пытаюсь агрегировать с помощью Jet, источник и приемник - это тема Kafka, требование - принимать сообщения GPB (google proto buf) из источника и публиковать сообщения GPB. Проблема в том, что я могу опубликовать Double, но не сообщение GPB, и это вызывает ошибку компиляции.

Это отлично работает:

    Pipeline p = Pipeline.create();
    p.drawFrom(KafkaSources.<String, Balance> kafka(<properties>, <topic>)) 
    .map(s->s.getValue() ).groupingKey(x->x.account)
    .rollingAggregator(AggregateOperations.summingDouble(Balance::amount))
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

Несмотря на то, что приведенный выше код работает нормально, он публикует double, чтобы поглотить тему, в то время как мое требование - опубликовать GPB, имеющий атрибут double, чтобы поглотить тему. Когда я пытаюсь сделать это, помещая map перед drainTo, у меня возникает синтаксическая ошибка. Вот что я пробовал:

    .rollingAggregator(AggregateOperation.summingDouble(Balance::amount))
    .map(k->Amount.newBuilder().setAmount(k.getValue()).build())
    .drainTo(KafkaSinks.kafka(<prop>,<sinktopic>));

Сумма - это сообщение GPB с атрибутом double. Это дает мне синтаксическую ошибку, которую я не понимаю. Не могли бы вы помочь мне пройти через это.

Не могли бы вы поделиться некоторыми документами или ссылками, где есть разные агрегаты для разных сценариев? Я просмотрел образцы, демонстрации Hazelcast, не все, но несколько, но не нашел там своего варианта использования. Большое спасибо.


person Abhishek    schedule 30.01.2019    source источник
comment
Кто-нибудь сможет мне помочь, пожалуйста. Есть ли что-то, что нужно улучшить в этом вопросе?   -  person Abhishek    schedule 04.02.2019


Ответы (1)


Я предполагаю, что синтаксическая ошибка была такой:

Несовместимые типы. Требуется раковина ‹? super Amount>, но 'kafka' было выведено как Sink ‹Entry‹ K, V >>: не существует экземпляров переменных типа K, V, так что String соответствует Entry ‹K, V>

(В следующий раз, пожалуйста, поделитесь исключением, ваш код зависит от не разделяемых классов, и я не могу его скомпилировать.)

Это означает, что приемник Kafka ожидает java.util.Map.Entry на входе, но вы дали его Amount. Вам нужно map это сделать так:

.map(entry-> Util.entry(entry.getKey(), Amount.newBuilder().setAmount(entry.getValue()).build()))
person Oliv    schedule 08.02.2019