Загрузка больших файлов данных, сжатых с помощью gzip, в HDFS

У меня есть вариант использования, когда я хочу загрузить большие файлы текстовых данных в формате gzip (~ 60 ГБ) на HDFS.

Мой код ниже занимает около 2 часов, чтобы загрузить эти файлы кусками по 500 МБ. Ниже приведен псевдокод. Я проверял, может ли кто-нибудь помочь мне сократить это время:

i) intfileFetchBuffer = 500000000; System.out.println("буфер выборки файла: " + fileFetchBuffer); инт смещение = 0; число байтовЧтение = -1;

    try {
        fileStream = new FileInputStream (file);    
        if (fileName.endsWith(".gz")) {
            stream = new GZIPInputStream(fileStream);

            BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); 

            String[] fileN = fileName.split("\\.");
            System.out.println("fil 0 : " + fileN[0]);
            System.out.println("fil 1 : " + fileN[1]);
            //logger.info("First line is: " + streamBuff.readLine());

            byte[] buffer = new byte[fileFetchBuffer];

            FileSystem fs = FileSystem.get(conf);

            int charsLeft = fileFetchBuffer;
            while (true) {

                charsLeft = fileFetchBuffer;    



             logger.info("charsLeft outside while: " + charsLeft);

          FSDataOutputStream dos = null;
                while (charsLeft != 0) {
                    bytesRead = stream.read(buffer, 0, charsLeft);
                    if (bytesRead < 0) {
                        dos.flush();
                        dos.close();
                        break;
                    }
                    offset = offset + bytesRead;
                    charsLeft = charsLeft - bytesRead; 
                    logger.info("offset in record: " + offset);
                    logger.info("charsLeft: " + charsLeft);
                    logger.info("bytesRead in record: " + bytesRead);
                    //prettyPrintHex(buffer);

                    String outFileStr = Utils.getOutputFileName(
                            stagingDir,
                            fileN[0],
                            outFileNum);

                    if (dos == null) {
                    Path outFile = new Path(outFileStr);
                    if (fs.exists(outFile)) {
                        fs.delete(outFile, false);
                    }

                    dos = fs.create(outFile);
                    }

                    dos.write(buffer, 0, bytesRead);


                } 

                logger.info("done writing: " + outFileNum);
                dos.flush();
                dos.close();

                if (bytesRead < 0) {
                    dos.flush();
                    dos.close();
                    break;
                }

                outFileNum++;

            }  // end of if


        } else {
            // Assume uncompressed file
            stream = fileStream;
        }           

    } catch(FileNotFoundException e) {
        logger.error("File not found" + e);
    }

person user656189    schedule 22.06.2011    source источник


Ответы (2)


Вам следует рассмотреть возможность использования суперпакета ввода-вывода от Apache< /а>.

У него есть метод

IOUtils.copy( InputStream, OutputStream )

это значительно сократит время, необходимое для копирования ваших файлов.

person Snicolas    schedule 22.06.2011
comment
@Snicolas - как разделить InputStream? Например, 60 Гб должны быть загружены кусками по 1 Гб. Как эта функция узнает, откуда в InputStream копировать? - person user656189; 22.06.2011
comment
Вы можете рассмотреть возможность подкласса FilterInputStream, чтобы создать новый класс, который считывает ваш исходный поток ввода с определенного смещения и не дальше 1 Гб. - person Snicolas; 22.06.2011
comment
Другим вариантом может быть использование FileChannels и метода transferTo, что тоже будет весьма эффективно. - person Snicolas; 22.06.2011
comment
@Snicolas - как я могу создать файловый канал в сжатом входном потоке? - person user656189; 22.06.2011
comment
@user656189 user656189 вам нужно загружать файлы в разархивированном виде? Или вы просто хотите поместить заархивированный файл размером 60 Гб на кусочки по 1 Гб? Это не та же проблема. - person Snicolas; 22.06.2011
comment
@Snicolas - мне подходит распаковка. застегнутый еще лучше. Большой вопрос для меня - это время прямо сейчас. С приведенным выше кодом требуется 2,5 часа, чтобы загрузить заархивированный файл размером 60 ГБ в 770 несжатых файлов по 500 МБ. - person user656189; 22.06.2011
comment
@user656189 user656189, а зачем вам разбивать его на куски по 1 Гб? - person Snicolas; 22.06.2011
comment
@user656189 user656189 а что занимает большую часть времени: передача по сети или операции распаковки? Какая у вас максимальная скорость передачи? - person Snicolas; 22.06.2011
comment
@Snicolas - это требование, чтобы мне было предоставлено разделение на фрагмент размером 1 Гб, так как его необходимо обрабатывать параллельно в hadoop. Большую часть времени выполняется в stream.read() в цикле while выше, поскольку он считывается в каком-то внутреннем размер буфера, и я вижу, что он читает несколько КБ. пока он полностью не достигнет байтового буфера. передача по сети не является узким местом. - person user656189; 22.06.2011
comment
@user656189 user656189, так что большую часть времени вы тратите на распаковку данных из входного потока. Если вы используете не gzippedInputStream, а простой BufferedInputStream, вы можете передать сжатый файл, не распаковывая его. Это было бы гораздо быстрее. - person Snicolas; 22.06.2011
comment
@user656189 user656189 что нужно распараллелить в вашем приложении на стороне удаленного компьютера. Извлечение файла? Распаковка? Обработка данных? - person Snicolas; 22.06.2011
comment
обработка данных после того, как файлы, сжатые gzip, разбиваются на несколько фрагментов фиксированного размера. - person user656189; 22.06.2011
comment
@Snicolas - спасибо, что поделились этим. Я также делаю почти то же самое, что говорит эта ссылка, но моя проблема заключается в разделении 60-гигабайтного сжатого файла на куски по 1 Гб каждый (как я написал в своем первоначальном описании, которое полностью представляет собой код Java). Вот и подумал спросить здесь. Я пытался найти лучший способ, чем использовать in.read() для получения данных с диска. Я думаю, что запись в выходной файл не является узким местом. - person user656189; 22.06.2011
comment
@user656189 user656189 Нет, точно, узким местом, похоже, является распаковка вашего файла размером 60 ГБ. Таким образом, либо вы можете нарезать его (без распаковки) и отправить фрагменты по сети (это означает, что вам придется ждать всей передачи, прежде чем распаковывать), ЛИБО вы разархивируете его, отправляете отдельные файлы (или наборы файлов с задержкой) и передаете их по сети. сети, чтобы каждый клиент в вашем кластере мог выполнять некоторую работу независимо от остальных файлов. - person Snicolas; 23.06.2011
comment
@Snicolas - есть какие-нибудь указания, как нарезать его без распаковки? - person user656189; 23.06.2011
comment
@user656189 user656189 конечно, но тогда вам придется подождать, пока все фрагменты достигнут сервера, а затем снова собрать фрагменты вместе, прежде чем вы сможете их распаковать. Это нормально для вас ? - person Snicolas; 23.06.2011
comment
@Snicolas - да, как я могу это сделать? - person user656189; 23.06.2011

Я пробовал с буферизованным входным потоком и не увидел реальной разницы. Я полагаю, что реализация файлового канала может быть еще более эффективной. Скажи мне, если это не достаточно быстро.

package toto;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;

public class Slicer {

    private static final int BUFFER_SIZE = 50000;

    public static void main(String[] args) {

        try 
        {
            slice( args[ 0 ], args[ 1 ], Long.parseLong( args[2]) );
        }//try
        catch (IOException e) 
        {
            e.printStackTrace();
        }//catch
        catch( Exception ex )
        {
            ex.printStackTrace();
            System.out.println( "Usage :  toto.Slicer <big file> <chunk name radix > <chunks size>" );
        }//catch
    }//met

    /**
     * Slices a huge files in chunks.
     * @param inputFileName the big file to slice.
     * @param outputFileRadix the base name of slices generated by the slicer. All slices will then be numbered outputFileRadix0,outputFileRadix1,outputFileRadix2...
     * @param chunkSize the size of chunks in bytes
     * @return the number of slices.
     */
    public static int slice( String inputFileName, String outputFileRadix, long chunkSize ) throws IOException
    {
        //I would had some code to pretty print the output file names
        //I mean adding a couple of 0 before chunkNumber in output file name
        //so that they all have same number of chars
        //use java.io.File for that, estimate number of chunks, take power of 10, got number of leading 0s

        //just to get some stats
        long timeStart = System.currentTimeMillis();
        long timeStartSlice = timeStart;
        long timeEnd = 0;

        //io streams and chunk counter
        int chunkNumber = 0;
        FileInputStream fis = null;
        FileOutputStream fos = null;

        try 
        {
            //open files
            fis = new FileInputStream( inputFileName );
            fos = new FileOutputStream( outputFileRadix + chunkNumber );

            //declare state variables
            boolean finished = false;
            byte[] buffer = new byte[ BUFFER_SIZE ];
            int bytesRead = 0;
            long bytesInChunk = 0;


            while( !finished )
            {
                //System.out.println( "bytes to read " +(int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );
                bytesRead = fis.read( buffer,0, (int)Math.min( BUFFER_SIZE, chunkSize - bytesInChunk ) );

                if( bytesRead == -1 )
                    finished = true;
                else
                {
                                            fos.write( buffer, 0, bytesRead );
                    bytesInChunk += bytesRead;
                    if( bytesInChunk == chunkSize )
                    {
                        if( fos != null )
                        {
                            fos.close();
                            timeEnd = System.currentTimeMillis();
                            System.out.println( "Chunk "+chunkNumber + " has been generated in "+ (timeEnd - timeStartSlice) +" ms");
                            chunkNumber ++;
                            bytesInChunk = 0;
                            timeStartSlice = timeEnd;
                            System.out.println( "Creating slice number " + chunkNumber );
                            fos = new FileOutputStream( outputFileRadix + chunkNumber );
                        }//if
                    }//if
                }//else
            }//while
        }
        catch (Exception e) 
        {
            System.out.println( "A problem occured during slicing : " );
            e.printStackTrace();
        }//catch
        finally 
        {
            //whatever happens close all files
            System.out.println( "Closing all files.");
            if( fis != null )
                fis.close();
            if( fos != null )
                fos.close();
        }//fin

        timeEnd = System.currentTimeMillis();
        System.out.println( "Total slicing time : " + (timeEnd - timeStart) +" ms" );
        System.out.println( "Total number of slices "+ (chunkNumber +1) );

        return chunkNumber+1;
    }//met
}//class

Привет, Стефан

person Snicolas    schedule 23.06.2011
comment
@Snicolas - Большое спасибо, что потратили на это время! Разве это не то же самое, что и мое решение? Или это будет быстрее. - person user656189; 23.06.2011
comment
Чрезвычайно быстрее, я не распаковываю tar-файл размером 60 Гб... Пожалуйста, попробуйте. Если мои времена лучше, примите мой ответ;) - person Snicolas; 23.06.2011
comment
Я попробую, но дело в том, что он будет разделяться на границы строк? - person user656189; 23.06.2011
comment
нет, он просто нарезает файл. Поскольку я не распаковываю файл, его невозможно разрезать по каким-либо границам, вы просто получаете фрагменты одинакового и предопределенного размера. Я думал, это то, о чем мы договорились вчера. Это лучшее, что я могу сделать. - person Snicolas; 23.06.2011
comment
@Snicolas - я согласен. я буду запускать его. я чувствую, что это будет быстрее. Итак, теперь у нас есть сжатые слайсы, верно? - person user656189; 23.06.2011
comment
да, но нет возможности распаковать их до того, как вы присоедините их к серверу. - person Snicolas; 23.06.2011