У меня есть задача Apache Beam, которая считывает данные из источника MySQL с помощью JDBC, и предполагается, что данные записываются в таблицу BigQuery в том виде, в каком они есть. На данный момент не выполняется преобразование, которое произойдет позже, на данный момент я просто хочу, чтобы вывод базы данных был напрямую записан в BigQuery.
Это основной метод, пытающийся выполнить эту операцию:
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("phone").setType("STRING"));
fields.add(new TableFieldSchema().setName("url").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
p.apply(JdbcIO.<KV<String, String>>read()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
"com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name")
.withUsername("user")
.withPassword("pass"))
.withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100")
.withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
return KV.of(resultSet.getString(1), resultSet.getString(2));
}
})
.apply(BigQueryIO.Write
.to(options.getOutput())
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)));
p.run();
}
Но когда я запускаю шаблон с помощью maven, я получаю следующую ошибку:
Test.java:[184,6] не удается найти символ: метод apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
расположение: класс org.apache.beam.sdk.io .jdbc.JdbcIO.Read‹com.google.cloud.dataflow.sdk.values.KV‹java.lang.String,java.lang.String››
Кажется, я не передаю BigQueryIO. Напишите ожидаемый сбор данных, и это то, с чем я сейчас борюсь.
Как я могу сделать так, чтобы данные, поступающие из MySQL, соответствовали ожиданиям BigQuery в этом случае?