Обработка потоковых событий и запись файлов в почасовые сегменты представляет собой проблему из-за окон, поскольку некоторые события из входящего часа могут переходить в предыдущие и тому подобное.
Я копался в Apache Beam и его триггерах, но я изо всех сил пытаюсь управлять запуском с меткой времени следующим образом ...
Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
Это то, что я делал до сих пор, запуская 1-минутные окна независимо от того, какая временная метка. Однако я хотел бы включить метку времени в объект, чтобы он запускался только для тех, кто находится внутри.
Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterWatermark
.pastEndOfWindow())
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
Объекты, с которыми я имею дело, имеют объект отметки времени, однако это длинное поле, а не поле Instant
вообще.
"{ \"name\": \"timestamp\", \"type\": \"long\", \"logicalType\": \"timestamp-micros\" },"
Наличие моего класса POJO с этим полем long
ничего не вызывает, но если я поменяю его на класс Instant
и правильно воссоздаю объект, при чтении сообщения PubSub выдается следующая ошибка.
Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Long
Я также думал создать своего рода класс-оболочку вокруг GenericRecord, который содержит метку времени, но мне нужно будет просто использовать часть GenericRecord внутри, когда она будет готова к записи с FileIO в .parquet.
Какие еще способы использовать триггеры водяных знаков?
ИЗМЕНИТЬ: после комментариев @Anton я попробовал следующее.
.apply("Apply timestamps", WithTimestamps.of(
(SerializableFunction<GenericRecord, Instant>) item -> new Instant(Long.valueOf(item.get("timestamp").toString())))
.withAllowedTimestampSkew(Duration.standardSeconds(30)))
Даже если он устарел, похоже, что он проходит через конвейер, но все еще не записан (все еще отбрасывается до записи по какой-то причине ранее показанным триггером?).
А также попробовал другой упомянутый подход с использованием outputWithTimestamp
, но из-за задержки он печатает следующую ошибку ...
Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 2019-06-12T18:59:58.609Z. Output timestamps must be no earlier than the timestamp of the current input (2019-06-12T18:59:59.848Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.
context.outputWithTimestamp()
, чтобы предварительно обработать ввод и назначить пользовательские временные метки перед окном: github.com/apache/beam/blob/ - person Anton   schedule 12.06.2019WithTimestamps
PTransform
: beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/ - person Anton   schedule 12.06.2019outputWithTimestamp
вы можете изменитьgetAllowedTimestampSkew
, как я сделал в этом примере - person Guillem Xercavins   schedule 12.06.2019