Разбор огромных лог-файлов в Node.js — чтение построчно

Мне нужно сделать разбор больших (5-10 Гб) лог-файлов в Javascript/Node.js (я использую Cube).

Логлайн выглядит примерно так:

10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".

Нам нужно прочитать каждую строку, выполнить некоторый синтаксический анализ (например, удалить 5, 7 и SUCCESS), а затем загрузить эти данные в Cube (https://github.com/square/cube), используя свой JS-клиент.

Во-первых, каков канонический способ чтения файла в Node построчно?

Кажется, это довольно распространенный вопрос в Интернете:

Многие ответы, похоже, указывают на кучу сторонних модулей:

Тем не менее, это кажется довольно простой задачей - ведь в stdlib есть простой способ читать текстовый файл построчно?

Во-вторых, мне нужно обработать каждую строку (например, преобразовать метку времени в объект Date и извлечь полезные поля).

Как лучше всего это сделать, максимизируя пропускную способность? Есть ли какой-нибудь способ, который не будет блокироваться ни при чтении в каждой строке, ни при отправке в Cube?

В-третьих, я предполагаю, что использование разделения строк, а JS-эквивалент contains (IndexOf != -1?) будет намного быстрее, чем регулярные выражения? У кого-нибудь был большой опыт разбора огромных объемов текстовых данных в Node.js?

Здоровья, Виктор


person victorhooi    schedule 15.04.2013    source источник
comment
Я создал анализатор журнала в узле, который берет кучу строк регулярных выражений со встроенными «захватами» и выводит в JSON. Вы даже можете вызывать функции для каждого захвата, если хотите выполнить расчет. Он может делать то, что вы хотите: npmjs.org/package/logax   -  person Jess    schedule 17.01.2014


Ответы (10)


Я искал решение для построчного анализа очень больших файлов (gbs) с использованием потока. Все сторонние библиотеки и примеры мне не подходили, так как обрабатывали файлы не построчно (типа 1, 2, 3, 4..) или читали весь файл в память

Следующее решение может анализировать очень большие файлы построчно, используя поток и канал. Для тестирования я использовал файл размером 2,1 ГБ с 17 000 000 записей. Использование оперативной памяти не превышало 60 мб.

Сначала установите пакет event-stream:

npm install event-stream

Потом:

var fs = require('fs')
    , es = require('event-stream');

var lineNr = 0;

var s = fs.createReadStream('very-large-file.csv')
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        lineNr += 1;

        // process line here and call s.resume() when rdy
        // function below was for logging memory usage
        logMemoryUsage(lineNr);

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(err){
        console.log('Error while reading file.', err);
    })
    .on('end', function(){
        console.log('Read entire file.')
    })
);

введите здесь описание изображения

Пожалуйста, дайте мне знать, как это происходит!

person Gerard    schedule 16.05.2014
comment
К вашему сведению, этот код не является синхронным. Это асинхронно. Если вы вставите console.log(lineNr) после последней строки вашего кода, он не покажет конечное количество строк, потому что файл читается асинхронно. - person jfriend00; 17.06.2015
comment
что такое logMemoryUsage(lineNr); или как получить данные строки Nr в файле, если я попытаюсь напечатать строку Nr с моим файлом, я получаю только номер 2 и есть 6000 записей - person Labeo; 07.10.2015
comment
Привет, Лабео, это была функция, которую я использовал для регистрации использования памяти, но она не включена в код. Вы можете удалить эту строку из кода. - person Gerard; 12.10.2015
comment
Спасибо, это было единственное решение, которое я смог найти, которое действительно приостанавливалось и возобновлялось, когда это должно было быть. Ридлайн этого не сделал. - person Brent; 14.10.2015
comment
Потрясающий пример, и он действительно делает паузу. Кроме того, если вы решите остановить чтение файла раньше, вы можете использовать s.end(); - person zipzit; 23.02.2016
comment
В чем преимущество паузы и возобновления? Я не думаю, что для моего типичного использования это понадобится, если только я что-то не упустил. - person hippietrail; 24.02.2016
comment
Мне было интересно, какой смысл иметь резюме внутри анонимной функции? - person ambodi; 01.03.2016
comment
@ambodi это бесполезно, я удалил эту функцию. - person Gerard; 09.03.2016
comment
@ambodi, я считаю, что цель паузы / возобновления состоит в том, чтобы позволить другим асинхронным процессам происходить, прежде чем продолжить чтение из файла. - person jchook; 23.05.2016
comment
Работал как шарм. Использовал его для индексации 150 миллионов документов в индекс elasticsearch. readline модуль - это боль. Он не останавливается и вызывает сбои каждый раз после 40-50 миллионов. Потерял день. Большое спасибо за ответ. Этот отлично работает - person Mandeep Singh; 07.06.2016
comment
Я использую это для анализа файлов журнала, и для строк, которые я пропускаю, он может опережать сам себя и приводить к переполнению стека. Если нет работы для каждой строки, будьте осторожны с этим, потому что это может закончиться неудачей. - person David; 05.10.2016
comment
Если у вас закончилась память кучи, вы можете использовать следующий узел формата команды --max_old_space_size=4096 app.js - person Prabhat; 16.11.2016
comment
pause и result были бы полезны только в том случае, если resume вызывался из обратного вызова, или если содержащая функция была async, а await использовались между методами pause и resume. - person ; 07.04.2017
comment
Как я буду писать в другой поток. Мое требование - читать тяжелые данные JSON из базы данных в ГБ и записывать их в файл excel. Если вы используете ваш код, мне нужно вставить данные в доступный для записи поток Excel. пожалуйста, помогите @Gerard - person Amulya Kashyap; 25.09.2017
comment
поток событий был скомпрометирован: medium.com/intrinsic/ но 4+, по-видимому, безопасен blog.npmjs.org/ сообщение/180565383195/ - person John Vandivier; 02.09.2019
comment
Ни один из s.end(); s.destroy(); s.emit('end'); не остановит чтение потока. Чтение 1 строки 5-гигабайтного файла с SSD-накопителя занимает 25 секунд (поскольку потоковое чтение не останавливается). См. построчный ответ для решения, которое сделало это за 1 мс (с использованием s.cancel()). - person Perez Lamed van Niekerk; 14.01.2020
comment
Ты бог, пожалуйста, назови мою дочь. - person user2081518; 14.06.2020

Вы можете использовать встроенный пакет readline, см. документацию здесь. Я использую stream для создания нового выходного потока.

var fs = require('fs'),
    readline = require('readline'),
    stream = require('stream');

var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;

var rl = readline.createInterface({
    input: instream,
    output: outstream,
    terminal: false
});

rl.on('line', function(line) {
    console.log(line);
    //Do your stuff ...
    //Then write to outstream
    rl.write(cubestuff);
});

Обработка больших файлов займет некоторое время. Скажите, если это работает.

person user568109    schedule 15.04.2013
comment
Как написано, предпоследняя строка не работает, потому что кубический материал не определен. - person Greg; 17.12.2014
comment
Можно ли с помощью readline приостановить/возобновить поток чтения для выполнения асинхронных действий в области выполнения действий? - person jchook; 23.05.2016
comment
@jchook readline доставлял мне много проблем, когда я пытался приостановить/возобновить. Он не приостанавливает поток должным образом, создавая много проблем, если нижестоящий процесс идет медленнее. - person Mandeep Singh; 07.06.2016

Мне очень понравился ответ @gerard, который на самом деле заслуживает быть здесь правильным ответом. Я сделал некоторые улучшения:

  • Код находится в классе (модульном)
  • Парсинг включен
  • Возможность возобновления предоставляется извне в случае, если асинхронное задание связано с чтением CSV, например, вставка в БД или HTTP-запрос.
  • Чтение блоков/размеров пакетов, которые пользователь может объявить. Я также позаботился о кодировании в потоке, если у вас есть файлы в другой кодировке.

Вот код:

'use strict'

const fs = require('fs'),
    util = require('util'),
    stream = require('stream'),
    es = require('event-stream'),
    parse = require("csv-parse"),
    iconv = require('iconv-lite');

class CSVReader {
  constructor(filename, batchSize, columns) {
    this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
    this.batchSize = batchSize || 1000
    this.lineNumber = 0
    this.data = []
    this.parseOptions = {delimiter: '\t', columns: true, escape: '/', relax: true}
  }

  read(callback) {
    this.reader
      .pipe(es.split())
      .pipe(es.mapSync(line => {
        ++this.lineNumber

        parse(line, this.parseOptions, (err, d) => {
          this.data.push(d[0])
        })

        if (this.lineNumber % this.batchSize === 0) {
          callback(this.data)
        }
      })
      .on('error', function(){
          console.log('Error while reading file.')
      })
      .on('end', function(){
          console.log('Read entirefile.')
      }))
  }

  continue () {
    this.data = []
    this.reader.resume()
  }
}

module.exports = CSVReader

Итак, в основном, вот как вы будете его использовать:

let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())

Я протестировал это с файлом CSV размером 35 ГБ, и это сработало для меня, и поэтому я решил построить его на @gerard. ответ, отзывы приветствуются.

person ambodi    schedule 11.03.2016
comment
сколько времени ушло? - person Bernardo Dal Corno; 19.06.2019
comment
Судя по всему, здесь не хватает вызова pause(), не так ли? - person Vanuan; 02.11.2019
comment
Кроме того, это не вызывает функцию обратного вызова в конце. Таким образом, если batchSize равен 100, размер файлов равен 150, будет обработано только 100 элементов. Я ошибся? - person Vanuan; 02.11.2019

Я использовал https://www.npmjs.com/package/line-by-line за чтение более 1 000 000 строк из текстового файла. При этом занимаемый объем оперативной памяти составлял около 50-60 мегабайт.

    const LineByLineReader = require('line-by-line'),
    lr = new LineByLineReader('big_file.txt');

    lr.on('error', function (err) {
         // 'err' contains error object
    });

    lr.on('line', function (line) {
        // pause emitting of lines...
        lr.pause();

        // ...do your asynchronous line processing..
        setTimeout(function () {
            // ...and continue emitting lines.
            lr.resume();
        }, 100);
    });

    lr.on('end', function () {
         // All lines are read, file is closed now.
    });
person Eugene Ilyushin    schedule 11.07.2016
comment
«построчно» более эффективно использует память, чем выбранный ответ. Для 1 миллиона строк в CSV в выбранном ответе мой процесс узла занимал менее 800 мегабайт. Используя «построчно», он постоянно был ниже 700. Этот модуль также делает код чистым и легко читаемым. Всего мне нужно будет прочитать около 18 миллионов, так что каждый мегабайт на счету! - person Neo; 27.08.2017
comment
жаль, что здесь используется собственная «линия» событий вместо стандартного «чанка», что означает, что вы не сможете использовать «трубу». - person Rene Wooller; 22.09.2017
comment
После нескольких часов тестирования и поиска это единственное решение, которое фактически остановилось на методе lr.cancel(). Читает первые 1000 строк 5-гигабайтного файла за 1 мс. Потрясающий!!!! - person Perez Lamed van Niekerk; 14.01.2020

Документация Node.js предлагает очень элегантный пример использования модуля Readline.

Пример: Построчное чтение файлового потока

const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
    input: fs.createReadStream('sample.txt'),
    crlfDelay: Infinity
});

rl.on('line', (line) => {
    console.log(`Line from file: ${line}`);
});

Примечание: мы используем параметр crlfDelay, чтобы распознавать все экземпляры CR LF ('\r\n') как один разрыв строки.

person Hanuman    schedule 29.08.2019
comment
В моем случае я хочу показать весь текст в HTML, используя innerHTML элемента, но последняя строка всегда обрезается, даже если у меня есть overflow: auto в моем css. Что случилось? - person kakyo; 11.09.2020
comment
Ладно, я понял. Я должен использовать больший параметр padding-bottom, чем мой параметр padding. - person kakyo; 11.09.2020

Помимо чтения большого файла построчно, вы также можете читать его по частям. Дополнительную информацию см. в этой статье.

var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset)) {
    offset += bytesRead;
    var str = chunkBuffer.slice(0, bytesRead).toString();
    var arr = str.split('\n');

    if(bytesRead = chunkSize) {
        // the last item of the arr may be not a full line, leave it to the next chunk
        offset -= arr.pop().length;
    }
    lines.push(arr);
}
console.log(lines);
person LF00    schedule 28.09.2017
comment
Может быть, вместо присваивания должно быть сравнение: if(bytesRead = chunkSize)? - person Stefan Rein; 14.01.2020

У меня была такая же проблема. После сравнения нескольких модулей, вроде бы имеющих такую ​​возможность, решил сделать сам, это проще, чем я думал.

суть: https://gist.github.com/deemstone/8279565

var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start){ ... });  //lines{array} start{int} lines[0] No.

Он покрывает файл, открытый в закрытии, возвращаемый fetchBlock() будет извлекать блок из файла, заканчиваться разделением на массив (будет обрабатывать сегмент из последней выборки).

Я установил размер блока 1024 для каждой операции чтения. Здесь могут быть ошибки, но логика кода очевидна, попробуйте сами.

person deemstone    schedule 06.01.2014

node-byline использует потоки, поэтому я бы предпочел его для ваших огромных файлов.

для ваших преобразований даты я бы использовал moment.js.

для максимизации пропускной способности вы можете подумать об использовании программного кластера. есть несколько хороших модулей, которые довольно хорошо оборачивают собственный кластерный модуль узла. мне нравится cluster-master от isaacs. например вы можете создать кластер из x рабочих, которые все вычисляют файл.

для сравнения разделения и регулярных выражений используйте benchmark.js. я не тестировал его до сих пор. Benchmark.js доступен в виде узла-модуля.

person hereandnow78    schedule 15.04.2013

Основываясь на этом ответе на вопросы, я реализовал класс, который вы можете использовать читать файл синхронно построчно с fs.readSync(). Вы можете сделать эту «паузу» и «возобновить», используя обещание Q (jQuery, похоже, требует DOM, поэтому не могу запустить его с nodejs):

var fs = require('fs');
var Q = require('q');

var lr = new LineReader(filenameToLoad);
lr.open();

var promise;
workOnLine = function () {
    var line = lr.readNextLine();
    promise = complexLineTransformation(line).then(
        function() {console.log('ok');workOnLine();},
        function() {console.log('error');}
    );
}
workOnLine();

complexLineTransformation = function (line) {
    var deferred = Q.defer();
    // ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
    return deferred.promise;
}

function LineReader (filename) {      
  this.moreLinesAvailable = true;
  this.fd = undefined;
  this.bufferSize = 1024*1024;
  this.buffer = new Buffer(this.bufferSize);
  this.leftOver = '';

  this.read = undefined;
  this.idxStart = undefined;
  this.idx = undefined;

  this.lineNumber = 0;

  this._bundleOfLines = [];

  this.open = function() {
    this.fd = fs.openSync(filename, 'r');
  };

  this.readNextLine = function () {
    if (this._bundleOfLines.length === 0) {
      this._readNextBundleOfLines();
    }
    this.lineNumber++;
    var lineToReturn = this._bundleOfLines[0];
    this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
    return lineToReturn;
  };

  this.getLineNumber = function() {
    return this.lineNumber;
  };

  this._readNextBundleOfLines = function() {
    var line = "";
    while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) { // read next bytes until end of file
      this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
      this.idxStart = 0
      while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) { // as long as there is a newline-char in leftOver
        line = this.leftOver.substring(this.idxStart, this.idx);
        this._bundleOfLines.push(line);        
        this.idxStart = this.idx + 1;
      }
      this.leftOver = this.leftOver.substring(this.idxStart);
      if (line !== "") {
        break;
      }
    }
  }; 
}
person Benvorth    schedule 03.02.2016

Я сделал модуль узла для асинхронного чтения большого файла текста или JSON. Проверено на больших файлах.

var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');

module.exports = FileReader;

function FileReader(){

}

FileReader.prototype.read = function(pathToFile, callback){
    var returnTxt = '';
    var s = fs.createReadStream(pathToFile)
    .pipe(es.split())
    .pipe(es.mapSync(function(line){

        // pause the readstream
        s.pause();

        //console.log('reading line: '+line);
        returnTxt += line;        

        // resume the readstream, possibly from a callback
        s.resume();
    })
    .on('error', function(){
        console.log('Error while reading file.');
    })
    .on('end', function(){
        console.log('Read entire file.');
        callback(returnTxt);
    })
);
};

FileReader.prototype.readJSON = function(pathToFile, callback){
    try{
        this.read(pathToFile, function(txt){callback(JSON.parse(txt));});
    }
    catch(err){
        throw new Error('json file is not valid! '+err.stack);
    }
};

Просто сохраните файл как file-reader.js и используйте его следующим образом:

var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj){/*callback logic here*/});
person Eyal Zoref    schedule 25.06.2016
comment
Похоже, вы скопировали ответ Джерарда. Вы должны отдать должное Джерарду за часть, которую вы скопировали. - person Paul Lynch; 26.10.2016