Агрегировать непрерывный поток чисел из файла с помощью hazelcast Jet

Я пытаюсь суммировать непрерывный поток чисел из файла с помощью hazelcast jet

pipe
    .drawFrom(Sources.fileWatcher)<dir>))
    .map(s->Integer.parseInt(s))
    .addTimestamps()
    .window(WindowDefinition.sliding(10000,1000))
    .aggregate(AggregateOperations.summingDouble(x->x))
    .drainTo(Sinks.logger());

Несколько вопросов

  1. Он не дает ожидаемого результата, я ожидаю, что как только в файле появится новое число, он должен просто добавить его к существующей сумме.
  2. Для этого, почему мне нужно указать окно и метод addTimestamp, мне просто нужно сделать сумму бесконечного потока
  3. Как добиться отказоустойчивости, т.е. е. если сервер перезапустится, сохранит ли он агрегированный результат, а когда он появится, он будет агрегироваться из последней вычисленной суммы?
  4. если сервер не работает и теперь в файл поступает мало номеров, когда сервер запускается, будет ли он читать с последней точки, когда сервер вышел из строя, или он пропустит числа, когда он был отключен, и будет читать только номер, который он получил после сервер был запущен.

person Abhishek    schedule 22.01.2019    source источник


Ответы (1)


Ответ на вопросы 1 и 2: вы ищете rollingAggregate, вам не нужны временные метки или окна.

pipe
    .drawFrom(Sources.fileWatcher(<dir>))
    .rollingAggregate(AggregateOperations.summingDouble(Double::parseDouble))
    .drainTo(Sinks.logger());

Ответ на вопросы 3 и 4: fileWatcher источник не является отказоустойчивым. Причина в том, что он читает локальные файлы, и когда член умирает, локальные файлы все равно не будут доступны. Когда задание перезапустится, оно начнет чтение с текущей позиции и пропустит числа, добавленные во время выполнения задания.

Кроме того, поскольку вы используете глобальную агрегацию, данные из всех файлов будут перенаправлены на один член кластера, а другие участники будут бездействовать.

person Oliv    schedule 22.01.2019
comment
Отказоустойчивый RollingAggregator, даже если сам кластер отключает все узлы, такие как kafka, где он сохраняет раздел на локальном диске, и если все узлы kafka отключаются, он восстанавливается с диска - person Abhishek; 22.01.2019
comment
Да, rollingAggregate отказоустойчив. В приведенном выше примере источник - нет. - person Oliv; 23.01.2019
comment
Но в соответствии с документацией он сохраняет данные в самом IMap, и когда реактивный кластер перезапускается, IMap исчез, как он будет восстанавливаться после этого, я понимаю, что filewatcher может быть отказоустойчивым, но если у нас есть какой-то другой источник, такой как тема kafka, все еще после перезапуска Карта исчезла. - person Abhishek; 23.01.2019
comment
Он выдержит перезапуск задания или перезапуск участника (если у вас настроено резервное копирование, что по умолчанию), но не перезапуск кластера. Если вам нужен IMap, чтобы пережить перезапуск кластера и сбой кластера, вам понадобится коммерческая функция HotRestart или сделайте это самостоятельно, используя MapStore, или сбросьте карты на диск перед выключением кластера. - person Oliv; 23.01.2019