CSVRecordReader и поле с незавершенными кавычками в конце строки CSV

У меня проблема с наборами данных, которые я использую. Это CSV, содержащие фейковые новости. Моя проблема связана с классом CSVRecordReader, который мне предлагает DataVec (Deeplearning4j). Я пытаюсь сделать процесс преобразования искры. Моя проблема заключается в известной ошибке «Поле без кавычек в конце строки CSV».

При поиске в Интернете все рекомендуют искать строку, где это происходит, и исправлять проблему в csv, но это будет очень сложно, потому что набор данных содержит части статей (которые могут быть истинными или ложными). Эти статьи содержат множество цитат в кавычках, в том числе типичных для статей.

В поисках решения я реализовал свой собственный CSVRecordReader, используя библиотеку синтаксического анализатора Univocity csv, которая очень гибкая и решает все проблемы, которые есть у текущего CSVRecordReader, но теперь я нахожу другую дилемму, и она заключается в том, что парсер этой библиотеки не реализует интерфейс Serializable и выполнение преобразования в Apache Spark вызывает исключение

org.apache.spark.SparkException: задача не сериализуема Причина: java.io.NotSerializableException: com.univocity.parsers.csv.CsvParser

Как я могу решить свою проблему?

введите здесь описание изображения

Мой собственный код CSVRecordReader

package cu.desoft.cav.RecordReader;

import com.univocity.parsers.common.IterableResult;
import com.univocity.parsers.common.ParsingContext;
import com.univocity.parsers.common.ResultIterator;
import com.univocity.parsers.csv.CsvParser;
import com.univocity.parsers.csv.CsvParserSettings;
import org.datavec.api.records.Record;
import org.datavec.api.records.metadata.RecordMetaData;
import org.datavec.api.records.metadata.RecordMetaDataLine;
import org.datavec.api.records.reader.impl.LineRecordReader;
import org.datavec.api.split.FileSplit;
import org.datavec.api.split.InputSplit;
import org.datavec.api.writable.Text;
import org.datavec.api.writable.Writable;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
 * @author: Acosta email: [email protected]
 * created at: 11/25/2019
 */

public class UltraCSVRecordReader extends LineRecordReader {
    public static final char DEFAULT_DELIMITER = ',';
    public static final char DEFAULT_QUOTE = '"';
    public static final char DEFAULT_QUOTE_ESCAPE = '"';
    public static final char DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING = '\0';
    private CsvParser csvParser;
    private CsvParserSettings settings;
    private ResultIterator<String[], ParsingContext> iterator;
    public UltraCSVRecordReader() {
        this(0, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
    }

/**
 * @param unknownFormat if you can't know line endings, column delimiters and quotation characters set unknownFormat=true
 *                      for automatic detection
 */
public UltraCSVRecordReader(boolean unknownFormat) {
    this();
    if (unknownFormat) {
        settings = new CsvParserSettings();
        settings.detectFormatAutomatically();
        csvParser = new CsvParser(settings);
    }
}

public UltraCSVRecordReader(CsvParserSettings settings) {
    this.settings = settings;
    csvParser = new CsvParser(settings);
}

/**
 * @param skipNumLines              number of lines to skip
 * @param delimiter                 (default ,): value used to separate individual fields in the input
 * @param quote                     (default "): value used for escaping values where the fields delimiter is part of
 *                                  the value (e.g. the value "a,b" is parse as a , b).
 * @param quoteEscape               (default "): value used for escaping the quote character inside an already escaped value
 *                                  (e.g. the value " "" a,b "" " is parse as " a , b ").
 * @param charToEscapeQuoteEscaping (default \0): value used for escaping the quote escape character, when quote and quote escape are different
 *                                  (e.g. the value “\ " a , b " \” is parsed as \ " a , b " \, if quote = ", quoteEscape = \ and charToEscapeQuoteEscaping = \).
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape,
                            char charToEscapeQuoteEscaping) {
    settings = new CsvParserSettings();
    settings.getFormat().setDelimiter(delimiter);
    settings.getFormat().setQuote(quote);
    settings.getFormat().setQuoteEscape(quoteEscape);
    settings.getFormat().setCharToEscapeQuoteEscaping(charToEscapeQuoteEscaping);
    settings.setNumberOfRowsToSkip(skipNumLines);
    csvParser = new CsvParser(settings);
}

/**
 * @param skipNumLines number of lines to skip
 */
public UltraCSVRecordReader(long skipNumLines) {
    this(skipNumLines, DEFAULT_DELIMITER, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
 * @param skipNumLines number of lines to skip
 * @param delimiter    (default ,): value used to separate individual fields in the input
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter) {
    this(skipNumLines, delimiter, DEFAULT_QUOTE, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
 * @param skipNumLines number of lines to skip
 * @param delimiter    (default ,): value used to separate individual fields in the input
 * @param quote        (default "): value used for escaping values where the fields delimiter is part of
 *                     the value (e.g. the value "a,b" is parse as a , b).
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote) {
    this(skipNumLines, delimiter, quote, DEFAULT_QUOTE_ESCAPE, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

/**
 * @param skipNumLines number of lines to skip
 * @param delimiter    (default ,): value used to separate individual fields in the input
 * @param quote        (default "): value used for escaping values where the fields delimiter is part of
 *                     the value (e.g. the value "a,b" is parse as a , b).
 * @param quoteEscape  (default "): value used for escaping the quote character inside an already escaped value
 *                     (e.g. the value " "" a,b "" " is parse as " a , b ").
 */
public UltraCSVRecordReader(long skipNumLines, char delimiter, char quote, char quoteEscape) {
    this(skipNumLines, delimiter, quote, quoteEscape, DEFAULT_CHAR_TO_ESCAPE_QUOTE_ESCAPING);
}

@Override
public void initialize(InputSplit split) throws IOException, InterruptedException {
    super.initialize(split);
    this.initialize(((FileSplit) split).getRootDir());
}

public UltraCSVRecordReader maxLengthCharactersToParser(int numberCharacters) {
    this.settings.setMaxCharsPerColumn(numberCharacters);
    this.csvParser = new CsvParser(this.settings);
    return this;
}

public void initialize(File file) {
    IterableResult<String[], ParsingContext> iterate = this.csvParser.iterate(file);
    iterator = iterate.iterator();
}

protected List<Writable> parseLine(String line) {
    String[] split;
    split = this.csvParser.parseLine(line);
    List<Writable> values = new ArrayList<>();
    for (String value : split) {
        values.add(new Text(value));
    }
    return values;
}

public List<List<Writable>> next(int num) {
    List<List<Writable>> ret = new ArrayList<>(Math.min(num, 10000));
    int count = 0;

    while (this.hasNext() && count++ < num) {
        ret.add(this.next());
    }
    return ret;
}

public List<Writable> next() {
    String[] valuesSplit = iterator.next();
    List<Writable> values = new ArrayList<>();
    try {
        for (String value : valuesSplit) {
            values.add(new Text(value));
        }
    } catch (NullPointerException ex) {
        ex.printStackTrace();
        System.out.println(values);
        System.out.println("================================");
        System.out.println(Arrays.toString(valuesSplit));
    }

    return values;
}

public boolean batchesSupported() {
    return true;
}

public boolean hasNext() {
    return iterator.hasNext();
}

public Record nextRecord() {
    List<Writable> next = this.next();
    URI uri = this.locations != null && this.locations.length >= 1 ? this.locations[this.splitIndex] : null;
    RecordMetaData meta = new RecordMetaDataLine(this.lineIndex - 1, uri, UltraCSVRecordReader.class);
    return new org.datavec.api.records.impl.Record(next, meta);
}

public Record loadFromMetaData(RecordMetaData recordMetaData) throws IOException {
    return this.loadFromMetaData(Collections.singletonList(recordMetaData)).get(0);
}

public List<Record> loadFromMetaData(List<RecordMetaData> recordMetaDatas) throws IOException {
    List<Record> list = super.loadFromMetaData(recordMetaDatas);

    for (Record r : list) {
        String line = r.getRecord().get(0).toString();
        r.setRecord(this.parseLine(line));
    }

    return list;
}

public void reset() {
    super.reset();
}

public CsvParser getCsvParser() {
    return csvParser;
}
}

Пример НАБОР ДАННЫХ

«uuid», «ord_in_thread», «автор», «опубликовано», «название», «текст», «язык», «сканировано», «site_url», «страна», «domain_rank», «thread_title», «spam_score ","main_img_url","ответы_количество","участники_количество","лайки","комментарии","расшаривания","тип" "6a175f46bcd24d39b3e962ad0f29936721db70db",0,"Бригада Барракуда","2016-10-26T21:41:00.000 +03:00", "Мусульмане ПОЛУЧЕНЫ: они украли миллионы государственных пособий", "Печать Они должны вернуть все деньги плюс проценты. Всю семью и всех, кто прибыл с ними, нужно депортировать как можно скорее. Почему потребовалось два года, чтобы их арестовать? Ну вот и снова… еще одна группа, ворующая у правительства и налогоплательщиков! Группа сомалийцев украла более четырех миллионов государственных пособий всего за 10 месяцев! Мы сообщали о многочисленных случаях, подобных этому. где беженцы/иммигранты-мусульмане совершают мошенничество, обманывая нашу систему… Это выходит из-под контроля! США", 25689, "Мусульмане ПОЛУЧЕНЫ: они украли Миллионы государственных пособий",0,"http://bb4sp.com/wp-content/uploads/2016/10/Fullscreen-capture-10262016-83501-AM.bmp.jpg",0,1,0,0, 0, "предвзятость"

Это мой процесс трансформации

package cu.desoft.cav.preprocessing;

import cu.desoft.cav.RecordReader.UltraCSVRecordReader;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.datavec.api.records.reader.RecordReader;
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
import org.datavec.api.transform.TransformProcess;
import org.datavec.api.transform.schema.Schema;
import org.datavec.api.writable.Writable;
import org.datavec.spark.transform.SparkTransformExecutor;
import org.datavec.spark.transform.misc.StringToWritablesFunction;
import org.datavec.spark.transform.misc.WritablesToStringFunction;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;

/**
 * author: acosta
 * email: [email protected]
 * Created on: 2/3/20
 */
public class FakeNewsTransformation {
    private final String DATSETS_PATH = "data/FakeNews/";

    public void transform(boolean useSparkLocal) {
        Schema schema = new Schema.Builder()
                .addColumnString("uuid")
                .addColumnInteger("ord_in_thread")
                .addColumnString("author")
                .addColumnString("published")
                .addColumnsString("title","text","language","crawled","site_url","country")
                .addColumnInteger("domain_rank")
                .addColumnString("thread_title")
                .addColumnsInteger("spam_score","main_img_url","replies_count","participants_count","likes","comments","shares")
                .addColumnCategorical("type", Arrays.asList("bias", "bs","conspiracy","fake","hate","junksci","satire","state"))
                .build();

        TransformProcess tp = new TransformProcess.Builder(schema)
                .removeColumns("uuid", "ord_in_thread","author","published","site_url","country","thread_title")
                .categoricalToInteger("type")
                .build();

        int numActions = tp.getActionList().size();
        for (int i = 0; i < numActions; i++) {
            System.out.println("\n\n===============================");
            System.out.println("--- Schema after step " + i +
                    " (" + tp.getActionList().get(i) + ")--");
            System.out.println(tp.getSchemaAfterStep(i));
        }

        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.set("spark.kryo.registrator", "org.nd4j.Nd4jRegistrator");
        if (useSparkLocal) {
            sparkConf.setMaster("local[*]");
        }

        sparkConf.setAppName("Fake News Spanish Corpus dataset transformation");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        //Load our data using Spark
        JavaRDD<String> lines = sc.textFile(DATSETS_PATH + "fake.csv");
        int skipNumLines = 1;
        //We first need to parse this format. It's comma-delimited (CSV) format, so let's parse it using CSVRecordReader:
        RecordReader rr = new UltraCSVRecordReader();
//        RecordReader rr = new CSVRecordReader();
        JavaRDD<List<Writable>> parsedInputData = lines.map(new StringToWritablesFunction(rr));

        //Now, let's execute the transforms we defined earlier:
        JavaRDD<List<Writable>> processedData = SparkTransformExecutor.execute(parsedInputData, tp);

        //For the sake of this example, let's collect the data locally and print it:
        JavaRDD<String> processedAsString = processedData.map(new WritablesToStringFunction(","));
        System.out.println("<<<<<<<<<<<<<<<PATH>>>>>>>>>>>>>");
        File dataset = new File("dataset/FakeNews");
        if (dataset.exists()) {
            try {
                FileUtils.deleteDirectory(dataset);
                System.out.println("DELETE THE DIRECTORY");
            } catch (IOException e) {
                System.out.println("The directory was not delete");
                e.printStackTrace();
            }
        }
        System.out.println(dataset.getAbsolutePath());
        System.out.println("<<<<<<<<<<<<<<<END-PATH>>>>>>>>>>>>>");
        processedAsString.saveAsTextFile("file://" + dataset.getAbsolutePath());   //To save locally
        //processedAsString.saveAsTextFile("hdfs://your/hdfs/save/path/here");   //To save to hdfs

        List<String> processedCollected = processedAsString.collect();
        List<String> inputDataCollected = lines.collect();


    }

    public static void main(String[] args) {
        new FakeNewsTransformation().transform(true);
    }
}

Это ошибка вывода, когда я использую CSVRecordReader (DataVec)

    java.lang.RuntimeException: java.io.IOException: Un-terminated quoted field at end of CSV line
    at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:183)
    at org.datavec.api.records.reader.impl.csv.CSVRecordReader.next(CSVRecordReader.java:175)
    at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:41)
    at org.datavec.spark.transform.misc.StringToWritablesFunction.call(StringToWritablesFunction.java:33)
    at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply$mcV$sp(PairRDDFunctions.scala:1211)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13$$anonfun$apply$7.apply(PairRDDFunctions.scala:1210)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1218)
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1197)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Un-terminated quoted field at end of CSV line
    at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:276)
    at org.datavec.api.records.reader.impl.csv.SerializableCSVParser.parseLine(SerializableCSVParser.java:186)
    at org.datavec.api.records.reader.impl.csv.CSVRecordReader.parseLine(CSVRecordReader.java:181)
    ... 21 more

И это проблема сериализации, когда я использую свой собственный CSVRecordReader с однозначным парсером csv (эта библиотека не реализует Serializable)

  Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.map(RDD.scala:369)
    at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
    at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
    at cu.desoft.cav.preprocessing.FakeNewsTransformation.transform(FakeNewsTransformation.java:71)
    at cu.desoft.cav.preprocessing.FakeNewsTransformation.main(FakeNewsTransformation.java:101)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMainV2.main(AppMainV2.java:131)
Caused by: java.io.NotSerializableException: com.univocity.parsers.csv.CsvParser
Serialization stack:
    - object not serializable (class: com.univocity.parsers.csv.CsvParser, value: com.univocity.parsers.csv.CsvParser@75b6dd5b)
    - field (class: cu.desoft.cav.RecordReader.UltraCSVRecordReader, name: csvParser, type: class com.univocity.parsers.csv.CsvParser)
    - object (class cu.desoft.cav.RecordReader.UltraCSVRecordReader, cu.desoft.cav.RecordReader.UltraCSVRecordReader@1fedf0a4)
    - field (class: org.datavec.spark.transform.misc.StringToWritablesFunction, name: recordReader, type: interface org.datavec.api.records.reader.RecordReader)
    - object (class org.datavec.spark.transform.misc.StringToWritablesFunction, org.datavec.spark.transform.misc.StringToWritablesFunction@465b38e6)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
    - object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 18 more

person Yuniel Acosta    schedule 03.02.2020    source источник
comment
Привет, добро пожаловать в Stack Overflow. Я предлагаю вам добавить полную трассировку стека ошибки, которая будет более полезна для понимания, где может быть проблема.   -  person A. Wolf    schedule 03.02.2020
comment
Хорошо, спасибо, я отредактировал вопрос   -  person Yuniel Acosta    schedule 05.02.2020


Ответы (1)


Вы либо должны исправить строку в CSV, вызывающую вашу ошибку. Или сделайте средство чтения записей сериализуемым.
На основе этот univocity-parsers "обрабатывает неэкранированные кавычки, и вы можете настроить его так, чтобы он вызывал исключение, если оно найдено." Может быть, попробовать это?

person Susan Eraly    schedule 04.02.2020
comment
Я уже реализовал свой собственный RecordReader с помощью синтаксического анализатора univocity csv, но ни один класс в библиотеке не реализует сериализуемый интерфейс, поэтому, когда я использую его со искрой, он выдает вторую ошибку. Мой CSVRecordReader отлично работает сам по себе, но не вместе со искрой. - person Yuniel Acosta; 05.02.2020
comment
Верно. Поэтому вам нужно либо изменить его для работы со искрой, либо исправить строку в файле csv. Ссылка выше говорит вам, как распечатать исключения для неэкранированных кавычек. Таким образом, вы можете исправить файл. - person Susan Eraly; 06.02.2020