Как указать insertId при отправке вставки в BigQuery с помощью Apache Beam

BigQuery поддерживает дедупликацию при потоковой вставке. Как я могу использовать эту функцию с помощью Apache Beam?

https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataconsistency

Чтобы обеспечить согласованность данных, вы можете указать insertId для каждой вставленной строки. BigQuery запоминает этот идентификатор как минимум одну минуту. Если вы попытаетесь передать тот же набор строк в течение этого периода времени и задано свойство insertId, BigQuery использует свойство insertId, чтобы максимально эффективно исключить дублирование ваших данных. Возможно, вам придется повторить попытку вставки, потому что нет способа определить состояние потоковой вставки при определенных условиях ошибки, таких как сетевые ошибки между вашей системой и BigQuery или внутренние ошибки в BigQuery. Если вы повторите попытку вставки, используйте тот же insertId для того же набора строк, чтобы BigQuery мог попытаться исключить дублирование ваших данных. Дополнительные сведения см. В разделе «Устранение неполадок с потоковыми вставками».

Я не могу найти такую ​​функцию в Java doc. https://beam.apache.org/releases/javadoc/2.9.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html

В этом вопросе он предлагает установить insertId в TableRow. Это верно?

https://developers.google.com/resources/api-libraries/documentation/bigquery/v2/java/latest/com/google/api/services/bigquery/model/TableRow.html?is-external=true

В клиентской библиотеке BigQuery есть эта функция.

https://googleapis.github.io/google-cloud-java/google-cloud-clients/apidocs/index.html?com/google/cloud/bigquery/package-summary.html https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-clients/google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/InsertAllRequest.java#L134


person Yohei Onishi    schedule 09.01.2019    source источник
comment
Не могли бы вы подробнее рассказать о своем варианте использования? Dataflow / Beam должны выполняться только один раз в сочетании с BigQuery, без необходимости указывать insertId вручную.   -  person Felipe Hoffa    schedule 10.01.2019
comment
мой вариант использования упомянут выше. хотите исключить дублирование при вставке в BigQuery. так что просто укажите insertId как столбец в новой строке?   -  person Yohei Onishi    schedule 10.01.2019
comment
Насколько я понимаю, вы хотите исключить дубликаты. Но в зависимости от источника дублирования это может быть уже решенная проблема.   -  person Felipe Hoffa    schedule 10.01.2019
comment
нет дублирования на стороне источника данных. поскольку Kafka по умолчанию поддерживает доставку по крайней мере один раз, поэтому я думаю, что существует возможность дублирования между производителем и потребителем Kafka. а также я предполагаю, что поток данных может вставлять одну и ту же строку более одного раза при повторной попытке при некоторых ошибках (например, проблема временной сети). поэтому я просто хочу знать, как избежать дублирования на обоих. этот вопрос касается вставки потока из потока данных в bigquery.   -  person Yohei Onishi    schedule 10.01.2019
comment
В моем фактическом варианте использования требование дедупликации не так сильно. Поэтому я думаю, что самый простой способ - просто вставить в Big Query, а затем исключить дублирование по запросу. но я просто хочу знать, что BigQueryIO (Apache Beam) поддерживает функцию дедупликации.   -  person Yohei Onishi    schedule 10.01.2019


Ответы (2)


  • Pub / Sub + Beam / Dataflow + BigQuery: "Ровно один раз" должен быть гарантирован, и вам не нужно сильно об этом беспокоиться. Эта гарантия усиливается, когда вы запрашиваете Dataflow для вставки в BigQuery с помощью FILE_LOADS вместо STREAMING_INSERTS, пока.

  • Kafka + Beam / Dataflow + BigQuery: если сообщение может быть отправлено из Kafka более одного раза (например, если производитель повторил вставку), вам необходимо позаботиться о дедупликации. Либо в BigQuery (как сейчас реализовано, согласно вашему комментарию), либо в Dataflow с преобразованием .apply(Distinct.create()).

person Felipe Hoffa    schedule 10.01.2019
comment
Спасибо! но мой первоначальный вопрос заключается в том, как использовать функцию дедупликации BigQuery из Apache Beam. - person Yohei Onishi; 11.01.2019
comment
Вы не можете вручную, потому что Dataflow уже использует insertId для своей реализации ровно один раз, как описано. - person Felipe Hoffa; 11.01.2019
comment
Да я вижу. Спасибо за разъяснения. - person Yohei Onishi; 11.01.2019
comment
Спасибо за вопрос! Мне пришлось попросить некоторых экспертов, чтобы получить этот ответ :). Включая Пабло, который улучшил мой ответ выше - person Felipe Hoffa; 11.01.2019
comment
И я не могу найти про .apply(Distinct.create()) преобразование в документе Apache Beam. Так что было бы полезно, если бы вы могли упомянуть об этом в документе. - person Yohei Onishi; 11.01.2019
comment
Я имею в виду, что его нелегко найти в японской документации без каких-либо объяснений на веб-сайте Apache Beam - person Yohei Onishi; 11.01.2019
comment
Теперь я смущен - person Felipe Hoffa; 11.01.2019
comment
Поток данных уже использует insertId для себя, чтобы реализовать его ровно один раз, как описано. Могу я посмотреть, как это реализовано? Не могли бы вы дать ссылку на эту реализацию? Спасибо. - person Yohei Onishi; 15.01.2019

Как упомянул Фелипе в комментарии, похоже, что Dataflow уже использует insertId для себя, чтобы реализовать «ровно один раз». поэтому мы не можем вручную указать insertId.

person Yohei Onishi    schedule 15.01.2019