Я хочу сделать простой максимум для всего набора данных. Я начал с примера Kafka по адресу: https://github.com/hazelcast/hazelcast-jet-code-samples/blob/0.7-mainmaintenance/kafka/src/main/java/avro/KafkaAvroSource.java.
Я просто изменил конвейер на:
p.drawFrom(KafkaSources.<Integer, User>kafka(brokerProperties(), TOPIC))
.map(Map.Entry::getValue)
.rollingAggregate(minBy(comparingInt(user -> (Integer) user.get(2))))
.map(user -> (Integer) user.get(2))
.drainTo(Sinks.list("result"));
и перейти к:
IListJet<Integer> res = jet.getList("result");
SECONDS.sleep(10);
System.out.println(res.get(0));
SECONDS.sleep(15);
System.out.println(res.get(0));
cancel(job);
чтобы получить самый большой возраст людей в теме. Однако он не возвращает 20 и, похоже, возвращает разные значения при разных прогонах. Есть идеи, почему?