Из Bigtable в GCS (и наоборот) через поток данных

Мы пытаемся запустить ежедневный конвейер потока данных, который считывает данные из Bigtable и выгружает данные в GCS (используя HBase Scan и BaseResultCoder в качестве кодировщика) следующим образом (просто чтобы выделить идею):

  Pipeline pipeline = Pipeline.create(options); 
  Scan scan = new Scan();
  scan.setCacheBlocks(false).setMaxVersions(1);
  scan.addFamily(Bytes.toBytes("f"));
  CloudBigtableScanConfiguration btConfig = BCloudBigtableScanConfiguration.Builder().withProjectId("aaa").withInstanceId("bbb").withTableId("ccc").withScan(scan).build();
  pipeline.apply(Read.from(CloudBigtableIO.read(btConfig))).apply(TextIO.Write.to("gs://bucket/dir/file").withCoder(HBaseResultCoder.getInstance()));
  pipeline.run();

Кажется, все работает идеально, как и ожидалось.

Теперь мы хотим иметь возможность использовать выгруженный файл в GCS для восстановления, если это необходимо. То есть мы хотим иметь конвейер потока данных, который считывает выгруженные данные (то есть PCollection) из GCS и создает мутации (в основном объекты «Put»). По какой-то причине следующий код не работает с кучей исключений NullPointerExceptions. Мы не уверены, почему это могло быть так - ниже были добавлены операторы if, которые проверяют наличие строк нулевой или нулевой длины, чтобы увидеть, будет ли это иметь значение, но это не так.

// Part of DoFn<Result,Mutation>
@Override
public void processElement(ProcessContext c) {
  Result result = c.element();
  byte[] row = result.getRow();
  if (row == null || row.length == 0) { // NullPointerException at this line
    return;
  }
  Put mutation = new Put(result.getRow());
  // go through the column/value entries of this row, and create a corresponding put mutation.
  for (Entry<byte[], byte[]> entry : result.getFamilyMap(Bytes.toBytes(cf)).entrySet()) {
    byte[] qualifier = entry.getKey();
    if (qualifier == null || qualifier.length == 0) {
      continue;
    }
    byte[] val = entry.getValue();
    if (val == null || val.length == 0) {
      continue;
    }
    mutation.addImmutable(cf_bytes, qualifier, entry.getValue());
  }
  c.output(mutation);
}

Мы получаем следующую ошибку (строка 83 отмечена выше):

(2a6ad6372944050d): java.lang.NullPointerException at some.package.RecoveryFromGcs$CreateMutationFromResult.processElement(RecoveryFromGcs.java:83)

У меня два вопроса: 1. Сталкивался ли кто-нибудь с чем-то подобным, когда пытался выполнить ParDo на PCollection, чтобы получить PCollection, который нужно записать в bigtable? 2. Разумный ли это подход? Конечная цель - иметь возможность регулярно оставлять ежедневный снимок нашей большой таблицы (для определенного семейства столбцов) с помощью резервной копии на случай, если произойдет что-то плохое. Мы хотим иметь возможность читать резервные копии данных через поток данных и записывать их в bigtable, когда это необходимо.

Мы будем очень благодарны за любые предложения и помощь!

-------- Редактировать

Вот код, который сканирует Bigtable и выгружает данные в GCS: (Некоторые детали скрыты, если они не имеют отношения к делу.)

public static void execute(Options options) {
  Pipeline pipeline = Pipeline.create(options);
  final String cf = "f"; // some specific column family.
  Scan scan = new Scan();
  scan.setCacheBlocks(false).setMaxVersions(1); // Disable caching and read only the latest cell.
  scan.addFamily(Bytes.toBytes(cf));

  CloudBigtableScanConfiguration btConfig =
      BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), "some-bigtable-name").withScan(scan).build();

  PCollection<Result> result = pipeline.apply(Read.from(CloudBigtableIO.read(btConfig)));

  PCollection<Mutation> mutation =
      result.apply(ParDo.of(new CreateMutationFromResult(cf))).setCoder(new HBaseMutationCoder());

  mutation.apply(TextIO.Write.to("gs://path-to-files").withCoder(new HBaseMutationCoder()));

  pipeline.run();
}

}

Задание, которое считывает вывод вышеуказанного кода, имеет следующий код: (Это единственное исключение, вызывающее при чтении из GCS)

public static void execute(Options options) {
  Pipeline pipeline = Pipeline.create(options);
  PCollection<Mutation> mutations = pipeline.apply(TextIO.Read
      .from("gs://path-to-files").withCoder(new HBaseMutationCoder()));

  CloudBigtableScanConfiguration config =
      BigtableUtils.getCloudBigtableScanConfigurationBuilder(options.getProject(), btTarget).build();
  if (config != null) {
    CloudBigtableIO.initializeForWrite(pipeline);
    mutations.apply(CloudBigtableIO.writeToTable(config));
  }
  pipeline.run();
}

}

Ошибка, которую я получаю (https://jpst.it/Qr6M), немного сбивает с толку, поскольку мутации все объекты Put, но ошибка связана с объектом «Удалить».


person Haden Hooyeon Lee    schedule 09.12.2016    source источник


Ответы (1)


Вероятно, лучше всего обсудить эту проблему на странице проблем github клиента Cloud Bigtable. В настоящее время мы работаем над такими функциями импорта / экспорта, что мы оперативно ответим. Мы также рассмотрим этот подход самостоятельно, даже если вы не добавите проблему с github. Проблема с github позволит нам лучше общаться.

FWIW, я не понимаю, как вы могли получить NPE в выделенной вами строке. Вы уверены, что у вас правильная линия?

РЕДАКТИРОВАТЬ (12/12):

Следующий метод processElement() должен работать для преобразования Result в Put:

@Override
public void processElement(DoFn<Result, Mutation>.ProcessContext c) throws Exception {
  Result result = c.element();
  byte[] row = result.getRow();
  if (row != null && row.length > 0) {
    Put put = new Put(row);
    for (Cell cell : result.rawCells()) {
      put.add(cell);
    }
    c.output(put);
  }
}
person Solomon Duskis    schedule 11.12.2016
comment
Это точно такой же DoFn, который я использовал, за исключением того, что я сначала написал PCollection ‹Result› в GCS (с помощью задания «сброса»), прочитал его (с помощью задания «восстановления») и применил этот DoFn для создания мутаций. При записи / чтении результата я использовал HBaseResultCoder.getInstance () в качестве кодировщика. - person Haden Hooyeon Lee; 16.12.2016
comment
Теперь я немного изменил код, применив DoFn после сканирования из BigTable, получив таким образом PCollection ‹Mutation›, и он записывает в GCS, используя новый HBaseMutationCoder (). Опять же, когда он читается другим заданием (для записи в bigtable), я получаю ошибки (разные ошибки, но, похоже, это как-то связано с кодером). - person Haden Hooyeon Lee; 16.12.2016
comment
Вот идентификатор задания для чтения данных: 2016-12-15_18_18_02-8862892074153368958 И вот ошибка, которую я получаю: - person Haden Hooyeon Lee; 16.12.2016
comment
jpst.it/Qr6M (ошибка слишком длинная для комментария). Я поделюсь кодом, отредактировав вопрос, так как он слишком длинный. - person Haden Hooyeon Lee; 16.12.2016
comment
Как вы предположили, я нашел страницу github более полезной (поскольку я просматривал другие проблемы там), и я подал здесь проблему: github.com/GoogleCloudPlatform/cloud-bigtable-client/issues/ - person Haden Hooyeon Lee; 16.12.2016