Использование MySQL в качестве источника ввода и запись в Google BigQuery

У меня есть задача Apache Beam, которая считывает данные из источника MySQL с помощью JDBC, и предполагается, что данные записываются в таблицу BigQuery в том виде, в каком они есть. На данный момент не выполняется преобразование, которое произойдет позже, на данный момент я просто хочу, чтобы вывод базы данных был напрямую записан в BigQuery.

Это основной метод, пытающийся выполнить эту операцию:

    public static void main(String[] args) {
        Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

        Pipeline p = Pipeline.create(options);

        // Build the table schema for the output table.
        List<TableFieldSchema> fields = new ArrayList<>();
        fields.add(new TableFieldSchema().setName("phone").setType("STRING"));
        fields.add(new TableFieldSchema().setName("url").setType("STRING"));
        TableSchema schema = new TableSchema().setFields(fields);

        p.apply(JdbcIO.<KV<String, String>>read()
         .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
             "com.mysql.jdbc.Driver", "jdbc:mysql://host:3306/db_name")
             .withUsername("user")
             .withPassword("pass"))
             .withQuery("SELECT phone_number, identity_profile_image FROM scraper_caller_identities LIMIT 100")
             .withRowMapper(new JdbcIO.RowMapper<KV<String, String>>() {
                public KV<String, String> mapRow(ResultSet resultSet) throws Exception {
                return KV.of(resultSet.getString(1), resultSet.getString(2));
             }
          })
         .apply(BigQueryIO.Write
            .to(options.getOutput())
            .withSchema(schema)
            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)));

        p.run();
    }

Но когда я запускаю шаблон с помощью maven, я получаю следующую ошибку:

Test.java:[184,6] не удается найти символ: метод apply(com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.Bound)
расположение: класс org.apache.beam.sdk.io .jdbc.JdbcIO.Read‹com.google.cloud.dataflow.sdk.values.KV‹java.lang.String,java.lang.String››

Кажется, я не передаю BigQueryIO. Напишите ожидаемый сбор данных, и это то, с чем я сейчас борюсь.

Как я могу сделать так, чтобы данные, поступающие из MySQL, соответствовали ожиданиям BigQuery в этом случае?


person MC.    schedule 21.02.2017    source источник


Ответы (1)


Я думаю, что вам нужно предоставить тип PCollection‹TableRow› для BigQueryIO.Write вместо типа PCollection‹KV‹String,String››, который выводит RowMapper.

Кроме того, используйте правильное имя столбца и пары значений при настройке TableRow. Примечание. Я думаю, что вашими KV являются значения телефона и URL-адреса (например, {"555-555-1234": "http://www.url.com"}), а не пары имя столбца и значение (например, {"phone": "555-555-1234", "url": "http://www.url.com"})

См. пример здесь: https://beam.apache.org/documentation/sdks/javadoc/0.5.0/

Не могли бы вы попробовать это и дайте мне знать, если это работает для вас? Надеюсь это поможет.

person Alex Amato    schedule 22.02.2017