как сохранить вывод схемы apache spark в базе данных mysql

Может ли кто-нибудь сказать мне, есть ли способ в apache Spark хранить JavaRDD в базе данных mysql? Я беру входные данные из 2 файлов CSV, а затем после выполнения операций соединения с их содержимым мне нужно сохранить выходные данные (выходные данные JavaRDD) в базе данных mysql. Я уже могу успешно сохранить вывод на hdfs, но я не нахожу никакой информации, связанной с соединением apache Spark-MYSQL. Ниже я публикую код для Spark sql. Это может служить ссылкой для тех, кто ищет пример для spark-sql.

package attempt1;

import java.io.Serializable;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;


public class Spark_Mysql {
    @SuppressWarnings("serial")
    public static class CompleteSample implements Serializable {
        private String ASSETNUM;
        private String ASSETTAG;
        private String CALNUM;



        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getASSETTAG() {
            return ASSETTAG;
        }
        public void setASSETTAG(String aSSETTAG) {
            ASSETTAG = aSSETTAG;
        }
        public String getCALNUM() {
            return CALNUM;
        }
        public void setCALNUM(String cALNUM) {
            CALNUM = cALNUM;
        }


      }

    @SuppressWarnings("serial")
    public static class ExtendedSample implements Serializable {

        private String ASSETNUM;
        private String CHANGEBY;
        private String CHANGEDATE;


        public String getASSETNUM() {
            return ASSETNUM;
        }
        public void setASSETNUM(String aSSETNUM) {
            ASSETNUM = aSSETNUM;
        }
        public String getCHANGEBY() {
            return CHANGEBY;
        }
        public void setCHANGEBY(String cHANGEBY) {
            CHANGEBY = cHANGEBY;
        }
        public String getCHANGEDATE() {
            return CHANGEDATE;
        }
        public void setCHANGEDATE(String cHANGEDATE) {
            CHANGEDATE = cHANGEDATE;
        }
    }

    @SuppressWarnings("serial")
    public static void main(String[] args) throws Exception {

          JavaSparkContext ctx = new JavaSparkContext("local[2]", "JavaSparkSQL");
          JavaSQLContext sqlCtx = new JavaSQLContext(ctx);

          JavaRDD<CompleteSample> cs = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportCompleteSample.csv").map(
                  new Function<String, CompleteSample>() {
                    public CompleteSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      CompleteSample cs = new CompleteSample();
                      cs.setASSETNUM(parts[0]);
                      cs.setASSETTAG(parts[1]);
                      cs.setCALNUM(parts[2]);

                      return cs;
                    }
                  });

          JavaRDD<ExtendedSample> es = ctx.textFile("C:/Users/cyg_server/Documents/bigDataExample/AssetsImportExtendedSample.csv").map(
                  new Function<String, ExtendedSample>() {
                    public ExtendedSample call(String line) throws Exception {
                      String[] parts = line.split(",");

                      ExtendedSample es = new ExtendedSample();
                      es.setASSETNUM(parts[0]);
                      es.setCHANGEBY(parts[1]);
                      es.setCHANGEDATE(parts[2]);

                      return es;
                    }
                  });

          JavaSchemaRDD complete = sqlCtx.applySchema(cs, CompleteSample.class);
            complete.registerAsTable("cs");

          JavaSchemaRDD extended = sqlCtx.applySchema(es, ExtendedSample.class);
          extended.registerAsTable("es");

          JavaSchemaRDD fs= sqlCtx.sql("SELECT cs.ASSETTAG, cs.CALNUM, es.CHANGEBY, es.CHANGEDATE FROM cs INNER JOIN es ON cs.ASSETNUM=es.ASSETNUM;");


          JavaRDD<String> result = fs.map(new Function<Row, String>() {
              public String call(Row row) {
                return row.getString(0);
              }
            });

              result.saveAsTextFile("hdfs://path/to/hdfs/dir-name");          //instead of hdfs I need to save it on mysql database, but I am not able to find any Spark-MYSQL connection

    }



}

Здесь, в конце, я успешно сохраняю результат в HDFS. Но теперь я хочу сохранить в базе данных MYSQL. Пожалуйста, помогите мне. Спасибо


person Amitabh Ranjan    schedule 22.07.2014    source источник


Ответы (2)


Есть два подхода, которые вы можете использовать для записи результатов обратно в базу данных. Один из них - использовать что-то вроде DBOutputFormat и настроить его, а другой - использовать foreachPartition в RDD, который вы хотите сохранить, и передать функцию, которая создает соединение с MySQL и записывает результат обратно.

person Holden    schedule 23.07.2014
comment
Привет .. Спасибо за ответ. Я попробую эти варианты и дам вам знать. Тем временем я узнал о JavaRDD (ссылка: github.com/apache/spark/blob/master/core/src/test/scala/org/) .. Это может сработать. Но это в scala, а я совершенно не знаю о scala. Пытаюсь разобраться в этом примере. Спасибо - person Amitabh Ranjan; 24.07.2014
comment
Здесь DBOutputFormat не работает, но работает foreachPartition. Я полагал, что есть еще один способ. Используйте ForeachRDD, а затем разбейте rdd на массивы, и с помощью jdbc мы можем передать ввод или получить вывод. Я отправляю образец кода в качестве ответа на случай, если он кому-то еще понадобится. - person Amitabh Ranjan; 26.07.2014
comment
Привет, Амит. Вы сказали, что разместите образец кода. Пожалуйста, опубликуйте это, если сможете. Мы будем очень признательны. Спасибо - person Akshay Hazari; 03.11.2014
comment
@RED Я нашел здесь еще один вопрос с образцом foreachPartition: stackoverflow.com/questions/24916852/ - person G-Mac; 09.11.2014

Вот пример использования DBOutputFormat.

Создайте класс, который представляет строку вашей таблицы -

public class TableRow implements DBWritable
{
    public String column1;
    public String column2;

    @Override
    public void write(PreparedStatement statement) throws SQLException
    {
        statement.setString(1, column1);
        statement.setString(2, column2);
    }

    @Override
    public void readFields(ResultSet resultSet) throws SQLException
    {
        throw new RuntimeException("readFields not implemented");
    }
}

Затем настройте свою работу и напишите функцию mapToPair. Значение не используется. Если кто знает, оставьте, пожалуйста, комментарий.

String tableName = "YourTableName";
String[] fields = new String[] { "column1", "column2" };

JobConf job = new JobConf();
DBConfiguration.configureDB(job, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost/DatabaseNameHere", "username", "password");
DBOutputFormat.setOutput(job, tableName, fields);

// map your rdd into a table row
JavaPairRDD<TableRow, Object> rows = rdd.mapToPair(...);

rows.saveAsHadoopDataset(job);
person Josh Unger    schedule 20.01.2015