Я пытаюсь читать из pub / sub с помощью следующего кода
Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
@Override
public String apply(PubsubMessage input) {
LOG.info("hola " + input.getAttributeMap());
return new String(input.getMessage());
}
});
PCollection<String> pps = p.apply(pubsub)
.apply(
Window.<String>into(
FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("hola amigo "+c.element());
c.output(c.element());
}
}));
По сравнению с тем, что я получаю на NodeJS, я получаю сообщение, которое будет содержаться в поле data
. Как я могу получить поле ackId
(которое позже я могу использовать для подтверждения сообщения)? Карта атрибутов, которую я распечатываю, - null
. Есть ли другой способ подтвердить все сообщения, не вычисляя ackId?
PubsubIO.read()
должен подтверждать сообщения для вас после их успешной обработки - вы уверены, что вам необходимо подтвердить их самостоятельно? - person Ben Chambers   schedule 16.05.2017PubsubIO
отвечает за подтверждение сообщений. Я считаю, что это связано с контрольным поведением бегуна. В частности, источник будет подтверждать только тогда, когда считанные элементы были отмечены контрольной точкой. Как вы настроили контрольную точку флинк-бегуна? - person Ben Chambers   schedule 17.05.2017--runner=FlinkRunner --checkpointingInterval=15000 --streaming=true
, но я получаюCheckpoint triggering task Source: ... is not being executed at the moment. Aborting checkpoint.
. Связано ли это с issues.apache.org/jira/browse/FLINK-2491 или я все еще что-то упускаю? Я пробовал разные интервалы контрольных точек. - person njLT   schedule 18.05.2017