Обнаружено исключение NullPointerException при записи в BigTable с использованием sdk потока данных Apache Beam

Я использую Apache's Beam версию sdk 0.2.0-incubating-SNAPSHOT и пытаюсь перенести данные в bigtable с помощью Dataflow runner. К сожалению, я получаю NullPointerException при выполнении конвейера потока данных, в котором я использую BigTableIO.Write в качестве приемника. Уже проверил мой BigtableOptions и параметры в порядке, в соответствии с моими потребностями.

По сути, я создаю, и в какой-то момент моего конвейера у меня есть шаг, чтобы записать PCollection<KV<ByteString, Iterable<Mutation>>> в мою желаемую большую таблицу:

final BigtableOptions.Builder optionsBuilder =
    new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
        .setInstanceId(System.getProperty("BT_INSTANCE_ID"));

// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>> 
// to write to bigtable

// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
    .withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));

p.run();

При выполнении конвейера я получил NullPointerException, точно указывающий на класс BigtableIO в методе public void processElement(ProcessContext c):

(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)

Я проверил, что этот метод обрабатывает все элементы, прежде чем писать в bigtable, но не уверен, почему я получаю такое исключение сверхурочно. Я выполняю этот конвейер. Согласно приведенному ниже коду, этот метод использует атрибут bigtableWriter для обработки каждого c.element(), но я даже не могу установить точку останова для отладки, где именно находится null. Какие-нибудь советы или предложения, как решить эту проблему?

@ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    checkForFailures();
    Futures.addCallback(
        bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
    ++recordsWritten;
  }

Спасибо.


comment
Не могли бы вы прояснить несколько вещей: 1) Какую версию SDK вы используете? 2) Какой раннер вы используете? (прямой запуск, Spark, Flink, Dataflow?) Если это Dataflow, не могли бы вы указать идентификатор задания?   -  person jkff    schedule 13.09.2016
comment
@jkff спасибо за комментарий. Да только что редактировал мой вопрос, включая версии. Итак, да, я использую Dataflow runner. Его идентификатор работы: 2016-09-13_08_29_14-14276852956124203982   -  person Saulo Ricci    schedule 13.09.2016
comment
Я просмотрел задание и его путь к классам, и, если не ошибаюсь, похоже, что вы используете версию 0.3.0-incubating-SNAPSHOT из beam-sdks-java- {core, io}, но версию 0.2.0- incubating-SNAPSHOT из google-cloud-dataflow-java. Я считаю, что проблема в этом - вы должны использовать ту же версию (подробнее: BigtableIO в версии 0.3.0 использует методы \ @Setup и \ @Teardown, но runner 0.2.0 пока не поддерживает их).   -  person jkff    schedule 14.09.2016
comment
@jkff именно в этом и была проблема, только что исправленная здесь. Спасибо.   -  person Saulo Ricci    schedule 14.09.2016
comment
@jkff - переместите свой комментарий в ответ, чтобы его можно было принять, а этот вопрос можно было пометить как закрытый. Спасибо за решение проблемы!   -  person Misha Brukman    schedule 15.09.2016
comment
@MishaBrukman Готово, спасибо.   -  person jkff    schedule 15.09.2016


Ответы (1)


Я просмотрел задание и его путь к классам, и, если я не ошибаюсь, похоже, что вы используете версию 0.3.0-incubating-SNAPSHOT из beam-sdks-java-{core,io}, но version 0.2.0-incubating-SNAPSHOT из google-cloud-dataflow-java.

Я считаю, что проблема в этом - вы должны использовать ту же версию (подробнее: BigtableIO в версии 0.3.0 использует методы @Setup и @Teardown, но runner 0.2.0 пока не поддерживает их).

person jkff    schedule 15.09.2016