Назначение GenericRecord метки времени из внутреннего объекта

Обработка потоковых событий и запись файлов в почасовые сегменты представляет собой проблему из-за окон, поскольку некоторые события из входящего часа могут переходить в предыдущие и тому подобное.

Я копался в Apache Beam и его триггерах, но я изо всех сил пытаюсь управлять запуском с меткой времени следующим образом ...

Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
                    .triggering(AfterProcessingTime
                     .pastFirstElementInPane()
                     .plusDelayOf(Duration.standardSeconds(1)))
                    .withAllowedLateness(Duration.ZERO)
                    .discardingFiredPanes())

Это то, что я делал до сих пор, запуская 1-минутные окна независимо от того, какая временная метка. Однако я хотел бы включить метку времени в объект, чтобы он запускался только для тех, кто находится внутри.

Window.<GenericRecord>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(AfterWatermark
                    .pastEndOfWindow())
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes())

Объекты, с которыми я имею дело, имеют объект отметки времени, однако это длинное поле, а не поле Instant вообще.

"{ \"name\": \"timestamp\", \"type\": \"long\", \"logicalType\": \"timestamp-micros\" },"

Наличие моего класса POJO с этим полем long ничего не вызывает, но если я поменяю его на класс Instant и правильно воссоздаю объект, при чтении сообщения PubSub выдается следующая ошибка.

Caused by: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Long

Я также думал создать своего рода класс-оболочку вокруг GenericRecord, который содержит метку времени, но мне нужно будет просто использовать часть GenericRecord внутри, когда она будет готова к записи с FileIO в .parquet.

Какие еще способы использовать триггеры водяных знаков?

ИЗМЕНИТЬ: после комментариев @Anton я попробовал следующее.

.apply("Apply timestamps", WithTimestamps.of(
            (SerializableFunction<GenericRecord, Instant>) item -> new Instant(Long.valueOf(item.get("timestamp").toString())))
        .withAllowedTimestampSkew(Duration.standardSeconds(30)))

Даже если он устарел, похоже, что он проходит через конвейер, но все еще не записан (все еще отбрасывается до записи по какой-то причине ранее показанным триггером?).

А также попробовал другой упомянутый подход с использованием outputWithTimestamp, но из-за задержки он печатает следующую ошибку ...

Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 2019-06-12T18:59:58.609Z. Output timestamps must be no earlier than the timestamp of the current input (2019-06-12T18:59:59.848Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

person Lenny D.    schedule 12.06.2019    source источник
comment
Если я правильно понимаю, вы хотите запускать окно и запускать на основе метки времени, которая является частью вашего объекта. Вы можете заглянуть в context.outputWithTimestamp(), чтобы предварительно обработать ввод и назначить пользовательские временные метки перед окном: github.com/apache/beam/blob/   -  person Anton    schedule 12.06.2019
comment
Или подобное можно сделать с помощью WithTimestamps PTransform: beam.apache.org/releases/javadoc/2.13.0/org/apache/beam/sdk/   -  person Anton    schedule 12.06.2019
comment
@Anton спасибо за ответ! Пробовал оба, но ни один из них не работал, каким-то образом триггер может все еще отбрасывать события?   -  person Lenny D.    schedule 12.06.2019
comment
Для ошибки с outputWithTimestamp вы можете изменить getAllowedTimestampSkew, как я сделал в этом примере   -  person Guillem Xercavins    schedule 12.06.2019