Запись неограниченной коллекции в GCS

Я видел много вопросов по той же теме. Но у меня все еще проблема с записью в GCS. Я читаю тему из pubsub и пытаюсь перенести это в GCS. Я сослался на эту ссылку. Но не удалось найти IOChannelUtils в последних пакетах лучей.

PCollection<String> details = pipeline
            .apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));

PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
        public String apply(String s) {
            return "constant";
        }
    }));

    PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
            .triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
                    .withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
                            AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
            .discardingFiredPanes()).apply(GroupByKey.create());

    PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());

Это я взял из многих других похожих тем о переполнении стека. Теперь я понимаю, что TextIO поддерживает неограниченную возможность записи PCollection с withWindowedWrites и withNumShards.

ref: Запись в облачное хранилище Google из PubSub с использованием Cloud Dataflow с использованием DoFn

Но я не понимал, как мне это делать.

Пытаюсь написать в GCS следующим образом.

FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
            StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");

    details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
            .withFilenamePolicy(policy).withNumShards(4));

У меня недостаточно очков, чтобы добавлять комментарии к этим темам в Stack Overflow, поэтому я поднимаю это как другой вопрос.


person Balu    schedule 04.08.2017    source источник


Ответы (2)


Я мог бы решить эту проблему, изменив Windowing, как указано ниже.

PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows())
            .triggering(Repeatedly
                    .forever(AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardSeconds(30))
                        )).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes());

 streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles()));


public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {

public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {

// OVERRIDE THE FILE NAME CREATION
}

}

Хотя я мог бы решить эту проблему вот так, я все еще не уверен в концепции окон. Я добавлю больше деталей, когда найду. Если у кого-то есть больше понимания, пожалуйста, добавьте подробности. Спасибо

person Balu    schedule 08.08.2017

Ознакомьтесь с этим конвейером Pub / Sub to GCS, который предоставляет полный пример записи оконных файлов в GCS.

person Ryan McDowell    schedule 06.08.2017
comment
Привет .. спасибо за ответ. Я просто мог закончить это за несколько минут до этого. Я дополню этот ответ своим подходом. Еще раз большое спасибо! - person Balu; 06.08.2017