Мы пытаемся запустить ежедневный конвейер потока данных, который считывает данные из Bigtable и выгружает данные в GCS (используя HBase Scan и BaseResultCoder в качестве кодировщика) следующим образом (просто чтобы выделить идею):
Pipeline pipeline = Pipeline.create(options);
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1);
scan.addFamily(Bytes.toBytes("f"));
CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build();
pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance()));
pipeline.run();
Кажется, все работает идеально, как и ожидалось.
Теперь мы хотим иметь возможность использовать выгруженный файл в GCS для восстановления, если это необходимо. То есть мы хотим иметь конвейер потока данных, который считывает выгруженные данные (то есть PCollection) из GCS и создает мутации (в основном объекты «Put»). По какой-то причине следующий код не работает с кучей исключений NullPointerExceptions. Мы не уверены, почему это могло быть так - ниже были добавлены операторы if, которые проверяют наличие строк нулевой или нулевой длины, чтобы увидеть, будет ли это иметь значение, но это не так.
// Part of DoFn<Result,Mutation>
@Override
public void processElement(ProcessContext c) {
Result result = c.element();
byte[] row = result.getRow();
if (row == null || row.length == 0) { // NullPointerException at this line
return;
}
Put mutation = new Put(result.getRow());
// go through the column/value entries of this row, and create a corresponding put mutation.
for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) {
byte[] qualifier = entry.getKey();
if (qualifier == null || qualifier.length == 0) {
continue;
}
byte[] val = entry.getValue();
if (val == null || val.length == 0) {
continue;
}
mutation.addImmutable(cf_bytes, qualifier, entry.getValue());
}
c.output(mutation);
}
Мы получаем следующую ошибку (строка 83 отмечена выше):
(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83)
У меня два вопроса: 1. Сталкивался ли кто-нибудь с чем-то подобным, когда пытался выполнить ParDo на PCollection, чтобы получить PCollection, который нужно записать в bigtable? 2. Разумный ли это подход? Конечная цель - иметь возможность регулярно оставлять ежедневный снимок нашей большой таблицы (для определенного семейства столбцов) с помощью резервной копии на случай, если произойдет что-то плохое. Мы хотим иметь возможность читать резервные копии данных через поток данных и записывать их в bigtable, когда это необходимо.
Мы будем очень благодарны за любые предложения и помощь!
-------- Редактировать
Вот код, который сканирует Bigtable и выгружает данные в GCS: (Некоторые детали скрыты, если они не имеют отношения к делу.)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
final String cf = "f"; // some specific column family.
Scan scan = new Scan();
scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell.
scan.addFamily(Bytes.toBytes(cf));
CloudBigtableScanConfiguration btConfig =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build();
PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig)));
PCollection<Mutation> mutation =
result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder());
mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder()));
pipeline.run();
}
}
Задание, которое считывает вывод вышеуказанного кода, имеет следующий код: (Это единственное исключение, вызывающее при чтении из GCS)
public static void execute(Options options) {
Pipeline pipeline = Pipeline.create(options);
PCollection<Mutation> mutations = pipeline.apply(TextIO.Read
.from("gs://path-to-files").withCoder(new HBaseMutationCoder()));
CloudBigtableScanConfiguration config =
BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build();
if (config != null) {
CloudBigtableIO.initializeForWrite(pipeline);
mutations.apply(CloudBigtableIO.writeToTable(config));
}
pipeline.run();
}
}
Ошибка, которую я получаю (https://jpst.it/Qr6M), немного сбивает с толку, поскольку мутации все объекты Put, но ошибка связана с объектом «Удалить».