Максимальное агрегирование с Hazelcast-jet

Я хочу сделать простой максимум для всего набора данных. Я начал с примера Kafka по адресу: https://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.7-mainmaintenance/kafka/src/main/java/avro/KafkaAvroSource.java.

Я просто изменил конвейер на:

p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
    .map(Map.Entry::getValue)
    .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
    .map(user -> (Integer) user.get(2))
    .drainTo(Sinks.list("result"));

и перейти к:

IListJet<Integer> res = jet.getList("result");
SECONDS.sleep(10);
System.out.println(res.get(0));
SECONDS.sleep(15);
System.out.println(res.get(0));
cancel(job);

чтобы получить самый большой возраст людей в теме. Однако он не возвращает 20 и, похоже, возвращает разные значения при разных прогонах. Есть идеи, почему?


person Marc Mason    schedule 19.03.2019    source источник
comment
Вы используете Kafka, который является источником потоковой передачи, но вам нужен результат для всего набора данных. Кажется, эти двое не в ладах друг с другом. источник Kafka никогда не завершается, поэтому вы никогда не получите окончательного результата, только результат пока что.   -  person Marko Topolnik    schedule 19.03.2019
comment
Вы используете приемник списка, поэтому каждое обновление будет добавляться в конец списка. Значение индекса 0 не изменится. Вместо этого вы должны использовать приемник карты с фиксированным ключом, чтобы новые результаты перезаписывали предыдущие.   -  person Can Gencer    schedule 19.03.2019


Ответы (1)


Кажется, вы используете rollingAggregate, который создает новый элемент вывода каждый раз, когда он получает какой-либо ввод, но все, что вы проверяете, - это первый элемент, который он генерирует. Вместо этого вы должны найти последний элемент, который он выпустил. Один из способов добиться этого - поместить результат в IMap приемник, каждый раз используя один и тот же ключ:

p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
 .withoutTimestamps()
 .map(Map.Entry::getValue)
 .rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
 .map(user -> entry("user", (Integer) user.get(2)))
 .drainTo(Sinks.map("result"));

Вы можете получить последний результат с помощью

IMap<String, Integer> result = jet.getMap("result");
System.out.println(result.get("user");
person Marko Topolnik    schedule 19.03.2019