Я использую Apache's Beam
версию sdk 0.2.0-incubating-SNAPSHOT
и пытаюсь перенести данные в bigtable с помощью Dataflow
runner. К сожалению, я получаю NullPointerException
при выполнении конвейера потока данных, в котором я использую BigTableIO.Write
в качестве приемника. Уже проверил мой BigtableOptions
и параметры в порядке, в соответствии с моими потребностями.
По сути, я создаю, и в какой-то момент моего конвейера у меня есть шаг, чтобы записать PCollection<KV<ByteString, Iterable<Mutation>>>
в мою желаемую большую таблицу:
final BigtableOptions.Builder optionsBuilder =
new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
.setInstanceId(System.getProperty("BT_INSTANCE_ID"));
// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>>
// to write to bigtable
// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
.withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));
p.run();
При выполнении конвейера я получил NullPointerException
, точно указывающий на класс BigtableIO в методе public void processElement(ProcessContext c)
:
(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)
Я проверил, что этот метод обрабатывает все элементы, прежде чем писать в bigtable, но не уверен, почему я получаю такое исключение сверхурочно. Я выполняю этот конвейер. Согласно приведенному ниже коду, этот метод использует атрибут bigtableWriter
для обработки каждого c.element()
, но я даже не могу установить точку останова для отладки, где именно находится null
. Какие-нибудь советы или предложения, как решить эту проблему?
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
checkForFailures();
Futures.addCallback(
bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
++recordsWritten;
}
Спасибо.