Реактивные потоки - пакетирование с таймаутом

Я подумываю о замене собственной библиотеки обработки журналов, которая очень похожа на ReactiveStreams, на io.projectreactor. Цель состоит в том, чтобы сократить количество поддерживаемого нами кода и воспользоваться преимуществами любых новых функций, добавленных сообществом (отслеживание слияния операторов).

Для начала мне нужно использовать stdio и объединить многострочные записи журнала в текстовые блоки, которые будут течь по конвейеру. Вариант использования подробно объясняется в многострочных записях журнала главу документации Filebeat (за исключением того, что мы хотим, чтобы она находилась в процессе).

Пока что у меня есть код:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
          .subscribe();          

Это заботится о многострочном слиянии при обнаружении нового заголовка журнала, но в существующей библиотеке мы также сбрасываем накопленные строки после тайм-аута (т.е. если текст не получен в течение 5 секунд, сбрасываем запись).

Как правильно смоделировать это в Reactor? Нужно ли мне написать свой оператор, или я могу настроить любой из существующих?

Мы будем очень благодарны за любые указатели на соответствующие примеры и документы для достижения этого варианта использования в Project Reactor или RxJava.


person ddimitrov    schedule 12.07.2017    source источник
comment
Вы видели buffer(long timespan, TimeUnit unit) оператора (rxjava)?   -  person zella    schedule 19.07.2017
comment
Buffer выглядит очень близко, но ни одна из перегрузок не соответствует тому, что мне нужно - мне нужна комбинация bufferClosingSelector и стратегий закрытия временного интервала - в зависимости от того, что произойдет раньше.   -  person ddimitrov    schedule 19.07.2017


Ответы (2)


Это зависит от того, как вы определяете начало и конец каждого буфера, поэтому следующий код RxJava 2 предназначен как подсказка об использовании значения основного источника для открытия и закрытия ворот буфера:

TestScheduler scheduler = new TestScheduler();
PublishProcessor<String> pp = PublishProcessor.create();

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
        o.buffer(o.filter(v -> v.contains("Start")), 
                 v -> Flowable.merge(o.filter(w -> w.contains("End")), 
                                     Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f)
.subscribe(System.out::println);

pp.onNext("Start");
pp.onNext("A");
pp.onNext("B");
pp.onNext("End");

pp.onNext("Start");
pp.onNext("C");

scheduler.advanceTimeBy(5, TimeUnit.MINUTES);

pp.onNext("Start");
pp.onNext("D");
pp.onNext("End");
pp.onComplete();

Печать:

[Start, A, B, End]
[Start, C]
[Start, D, End]

Он работает путем совместного использования источника через publish, что позволяет повторно использовать одно и то же значение из восходящего потока без одновременного запуска нескольких копий источника. Открытие регулируется обнаружением на линии строки «Старт». Закрытие регулируется либо обнаружением строки «Конец», либо срабатыванием таймера после периода отсрочки.

Изменить:

Если «Начало» также является индикатором для следующего пакета, вы можете заменить проверку «Конец» на «Начало» и изменить содержимое буфера, поскольку в противном случае он будет включать новый заголовок в предыдущий буфер:

pp.publish(f)
.doOnNext(v -> {
    int s = v.size();
    if (s > 1 && v.get(s - 1).contains("Start")) {
        v.remove(s - 1);
    }
})
.subscribe(System.out::println);
person akarnokd    schedule 19.07.2017
comment
Как насчет случая, когда нет END, но буфер закрывается, когда мы видим следующий START, или истекает tumeout? Я начинаю сомневаться в своем общении - есть ли что-то неясное в вопросе? - person ddimitrov; 20.07.2017

Оператор buffer мне кажется наиболее подходящим и простым решением.

Он имеет стратегии, основанные на размере и времени. У вас есть журнал, поэтому я думаю, вы можете интерпретировать количество строк как размер буфера.

Вот пример - как генерировать элементы, сгруппированные по временному интервалу 4 или 5 секунд:

    Observable<String> lineReader = Observable.<String>create(subscriber -> {
        try {
            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
            for (String line = br.readLine(); line != null; line = br.readLine()) {
                subscriber.onNext(line);
            }
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }).subscribeOn(Schedulers.newThread());

    lineReader
      .buffer(5, TimeUnit.SECONDS,4)
      .filter(lines -> !lines.isEmpty())
      .subscribe(System.out::println);
person zella    schedule 19.07.2017
comment
Мне нужен сгруппированный по заголовку журнала с tumeout. Т.е. если я записал в журнал 2-строчное сообщение, за которым следовало бы 1-строчное сообщение, за которым следовала трассировка стека, тогда еще одна строка беспорядка, а затем ничего за период tumeout. Я ожидал, что сразу получу 3 сообщения, вплоть до трассировки стека, и 4-е сообщение после tumeout. - person ddimitrov; 19.07.2017