Я подумываю о замене собственной библиотеки обработки журналов, которая очень похожа на ReactiveStreams, на io.projectreactor
. Цель состоит в том, чтобы сократить количество поддерживаемого нами кода и воспользоваться преимуществами любых новых функций, добавленных сообществом (отслеживание слияния операторов).
Для начала мне нужно использовать stdio и объединить многострочные записи журнала в текстовые блоки, которые будут течь по конвейеру. Вариант использования подробно объясняется в многострочных записях журнала главу документации Filebeat (за исключением того, что мы хотим, чтобы она находилась в процессе).
Пока что у меня есть код:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
Это заботится о многострочном слиянии при обнаружении нового заголовка журнала, но в существующей библиотеке мы также сбрасываем накопленные строки после тайм-аута (т.е. если текст не получен в течение 5 секунд, сбрасываем запись).
Как правильно смоделировать это в Reactor? Нужно ли мне написать свой оператор, или я могу настроить любой из существующих?
Мы будем очень благодарны за любые указатели на соответствующие примеры и документы для достижения этого варианта использования в Project Reactor или RxJava.
buffer(long timespan, TimeUnit unit)
оператора (rxjava)? - person zella   schedule 19.07.2017