Я видел много вопросов по той же теме. Но у меня все еще проблема с записью в GCS. Я читаю тему из pubsub и пытаюсь перенести это в GCS. Я сослался на эту ссылку. Но не удалось найти IOChannelUtils в последних пакетах лучей.
PCollection<String> details = pipeline
.apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));
PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) {
return "constant";
}
}));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes()).apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
Это я взял из многих других похожих тем о переполнении стека. Теперь я понимаю, что TextIO поддерживает неограниченную возможность записи PCollection с withWindowedWrites и withNumShards.
ref: Запись в облачное хранилище Google из PubSub с использованием Cloud Dataflow с использованием DoFn
Но я не понимал, как мне это делать.
Пытаюсь написать в GCS следующим образом.
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
.withFilenamePolicy(policy).withNumShards(4));
У меня недостаточно очков, чтобы добавлять комментарии к этим темам в Stack Overflow, поэтому я поднимаю это как другой вопрос.