Подтвердить сообщение Google Pub / Sub на Apache Beam

Я пытаюсь читать из pub / sub с помощью следующего кода

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
    @Override
    public String apply(PubsubMessage input) {
        LOG.info("hola " + input.getAttributeMap());
        return new String(input.getMessage());
    }
});
PCollection<String> pps = p.apply(pubsub)
        .apply(
                Window.<String>into(
                    FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        LOG.info("hola amigo "+c.element());
        c.output(c.element());
    }
  }));

По сравнению с тем, что я получаю на NodeJS, я получаю сообщение, которое будет содержаться в поле data. Как я могу получить поле ackId (которое позже я могу использовать для подтверждения сообщения)? Карта атрибутов, которую я распечатываю, - null. Есть ли другой способ подтвердить все сообщения, не вычисляя ackId?


person njLT    schedule 16.05.2017    source источник
comment
Я использую v0.6.0   -  person njLT    schedule 16.05.2017
comment
Какой раннер вы используете? Я считаю, что PubsubIO.read() должен подтверждать сообщения для вас после их успешной обработки - вы уверены, что вам необходимо подтвердить их самостоятельно?   -  person Ben Chambers    schedule 16.05.2017
comment
Я использую флинк-бегун. Не было похоже, что сообщения подтверждаются, но я проверю еще раз.   -  person njLT    schedule 17.05.2017
comment
Проверил еще раз, сообщения точно не подтверждаются. Но я ошибался, предполагая, что ackId будет в атрибутах - значение карты атрибутов правильное. Так что мне просто нужно знать, как получить подтверждение моего сообщения.   -  person njLT    schedule 17.05.2017
comment
Читатель PubsubIO отвечает за подтверждение сообщений. Я считаю, что это связано с контрольным поведением бегуна. В частности, источник будет подтверждать только тогда, когда считанные элементы были отмечены контрольной точкой. Как вы настроили контрольную точку флинк-бегуна?   -  person Ben Chambers    schedule 17.05.2017
comment
Спасибо, я не знал, что мне нужно будет указать интервал контрольной точки. Мои аргументы теперь выглядят как --runner=FlinkRunner --checkpointingInterval=15000 --streaming=true, но я получаю Checkpoint triggering task Source: ... is not being executed at the moment. Aborting checkpoint.. Связано ли это с issues.apache.org/jira/browse/FLINK-2491 или я все еще что-то упускаю? Я пробовал разные интервалы контрольных точек.   -  person njLT    schedule 18.05.2017
comment
@BenChambers Спасибо. Вариант контрольной точки работал у меня в аналогичной проблеме. Сообщения начинают подтверждаться после того, как я его установил. Я зарегистрировал проблему в JIRA Beam, обновлю ее этой информацией, чтобы можно было обновить соответствующие документы Beam.   -  person talonx    schedule 08.06.2018


Ответы (1)


Читатель PubsubIO отвечает за подтверждение сообщений. Это связано с контрольным поведением бегуна. В частности, источник будет подтверждать сообщения только после того, как результирующие элементы будут отмечены контрольной точкой.

В этом случае вы должны посмотреть, когда контрольные точки бегуна Flink информируют о состоянии этого источника. Я считаю, что это связано с настройкой Flink для частоты контрольных точек.

person Ben Chambers    schedule 18.05.2017