Я пытаюсь суммировать непрерывный поток чисел из файла с помощью hazelcast jet
pipe
.drawFrom(Sources.fileWatcher)<dir>))
.map(s->Integer.parseInt(s))
.addTimestamps()
.window(WindowDefinition.sliding(10000,1000))
.aggregate(AggregateOperations.summingDouble(x->x))
.drainTo(Sinks.logger());
Несколько вопросов
- Он не дает ожидаемого результата, я ожидаю, что как только в файле появится новое число, он должен просто добавить его к существующей сумме.
- Для этого, почему мне нужно указать окно и метод
addTimestamp
, мне просто нужно сделать сумму бесконечного потока - Как добиться отказоустойчивости, т.е. е. если сервер перезапустится, сохранит ли он агрегированный результат, а когда он появится, он будет агрегироваться из последней вычисленной суммы?
- если сервер не работает и теперь в файл поступает мало номеров, когда сервер запускается, будет ли он читать с последней точки, когда сервер вышел из строя, или он пропустит числа, когда он был отключен, и будет читать только номер, который он получил после сервер был запущен.