Извлечь первую строку файла CSV в Pig

У меня есть несколько файлов CSV, и заголовок всегда является первой строкой в ​​файле. Каков наилучший способ получить эту строку из файла CSV в виде строки в Pig? Предварительная обработка с помощью sed, awk и т. д. невозможна.

Я пытался загрузить файл с помощью обычного PigStorage и CsvLoader из копилки, но мне непонятно, как я могу получить эту первую строку, если вообще смогу.

Я готов написать UDF, если это необходимо.


person Uppsilon    schedule 18.08.2013    source источник
comment
Что вы подразумеваете под «получить эту строку из файла CSV в виде строки»?   -  person Lorand Bendig    schedule 18.08.2013


Ответы (2)


Отказ от ответственности: я плохо разбираюсь в Java.

Вам понадобится UDF. Я не уверен, что именно вы просите, но этот UDF возьмет серию файлов CSV и превратит их в карты, где ключами являются значения в верхней части файла. Надеюсь, этого будет достаточно для скелета, чтобы вы могли изменить его на то, что хотите.

Пара тестов, которые я провел удаленно и локально, показывают, что это сработает.

package myudfs;
import java.io.IOException;
import org.apache.pig.LoadFunc;

import java.util.Map;
import java.util.HashMap;
import java.util.ArrayList;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;

public class ExampleCSVLoader extends LoadFunc {
    protected RecordReader in = null;
    private String fieldDel = "" + '\t';
    private Map<String, String> outputMap = null;
    private TupleFactory mTupleFactory = TupleFactory.getInstance();

    // This stores the fields that are defined in the first line of the file
    private ArrayList<Object> topfields = null;

    public ExampleCSVLoader() {}

    public ExampleCSVLoader(String delimiter) {
        this();
        this.fieldDel = delimiter;
    }

    @Override
    public Tuple getNext() throws IOException {
        try {
            boolean notDone = in.nextKeyValue();
            if (!notDone) {
                outputMap = null;
                topfields = null;
                return null;
            }

            String value = in.getCurrentValue().toString();
            String[] values = value.split(fieldDel);
            Tuple t =  mTupleFactory.newTuple(1);

            ArrayList<Object> tf = new ArrayList<Object>();

            int pos = 0;
            for (int i = 0; i < values.length; i++) {
                if (topfields == null) {
                    tf.add(values[i]);
                } else {
                    readField(values[i], pos);
                    pos = pos + 1;
                }
            }
            if (topfields == null) {
                topfields = tf;
                t = mTupleFactory.newTuple();
            } else {
                t.set(0, outputMap);
            }

            outputMap = null;
            return t;
        } catch (InterruptedException e) {
            int errCode = 6018;
            String errMsg = "Error while reading input";
            throw new ExecException(errMsg, errCode,
                    PigException.REMOTE_ENVIRONMENT, e);
        }

    }

    // Applies foo to the appropriate value in topfields
    private void readField(String foo, int pos) {
        if (outputMap == null) {
            outputMap = new HashMap<String, String>();
        }
        outputMap.put((String) topfields.get(pos), foo);
    }

    @Override
    public InputFormat getInputFormat() {
        return new TextInputFormat();
    }

    @Override
    public void prepareToRead(RecordReader reader, PigSplit split) {
        in = reader;
    }

    @Override
    public void setLocation(String location, Job job)
            throws IOException {
        FileInputFormat.setInputPaths(job, location);
    }
}

Пример вывода при загрузке каталога с помощью:

csv1.in             csv2.in
-------            ---------
A|B|C               D|E|F
Hello|This|is       PLEASE|WORK|FOO
FOO|BAR|BING        OR|EVERYTHING|WILL
BANG|BOSH           BE|FOR|NAUGHT

Производит этот вывод:

A: {M: map[]}
()
([D#PLEASE,E#WORK,F#FOO])
([D#OR,E#EVERYTHING,F#WILL])
([D#BE,E#FOR,F#NAUGHT])
()
([A#Hello,B#This,C#is])
([A#FOO,B#BAR,C#BING])
([A#BANG,B#BOSH])

() — это верхние строки файла. getNext() требует, чтобы мы что-то возвращали, иначе файл перестанет обрабатываться. Поэтому они возвращают нулевую схему.

person mr2ert    schedule 20.08.2013

Если ваш CSV соответствует соглашениям CSV Excel 2007, вы можете использовать уже доступный загрузчик из Piggybank http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/CSVExcelStorage.java?view=markup

У него есть возможность пропустить заголовок CSV SKIP_INPUT_HEADER

person Kris    schedule 26.08.2013