Как передать данные, измененные из потока gunzip, в поток gzip?

Мне нужно запустить через HTTP-запрос процесс, в котором я загружаю некоторые данные из S3, архивирую их, изменяю поток, сжимаю его и отправляю в другое ведро в S3.

Пока я мог:

  1. Скачать
  2. Gunzip
  3. Изменить (отфильтровать) данные
  4. вернуть данные

Or:

  1. Скачать
  2. Gunzip
  3. Gzip
  4. Загрузите неизмененные данные и получите URL-адрес объекта.

Моя первая попытка заключалась в использовании события on ('data') из потока gunzip для изменения данных; затем, когда генерируется событие end, я могу вернуть его браузеру, сделав запрос.

var accumulator = [];

gunzip.on('data', chunk=>{
    var lines = chunk.toString('utf-8').split(\n);
    lines.forEach(line=>{
       if(shouldBeFiltered(line)){
         accumulator.push(line);
       }
    })
})

gunzip.on('end', ()=>{
    res.send(accumulator);
})

getS3.pipe(gunzip) 

Если вместо возврата результата (res.send) я попытаюсь передать gunzip в gzip, фильтр игнорируется. Это имеет смысл, поскольку у меня есть массив аккумулятор, который я возвращаю (в предыдущем случае), когда генерируется конечное событие.

Затем, покопавшись, я нашел ссылку, в которой предлагалось отправить данные, и попробовал следующее, но не сработало:

gunzip.on('data', chunk=>{
    var lines = chunk.toString('utf-8').split(\n);
    lines.forEach(line=>{
       if(shouldBeFiltered(line)){
         gunzip.push(line);
       }
    })
})

// the end event no longer mattered
// gunzip.on('end', ()=>{
//    res.send(accumulator);
// })

getS3.pipe(gunzip).pipe(gzip).pipe(putS3(putS3param.Key, putS3param.Bucket)); 

Затем я попытался создать поток преобразования (это чрезвычайно упрощено, поскольку я пробовал концепцию), но затем у меня возникла внутренняя ошибка:

const stream = require('stream');
const Transform = stream.Transform;

function filter(pipeline) {
    var the_filter = new Transform({
        transform(chunk, encoding, next) {
            console.log();
            chunk += Buffer('Modified', 'utf-8');
            this.push(chunk);
            next();
        }
    });
    pipeline.pipe(the_filter);
}

У меня больше нет идей, кроме создания файла, его сжатия и загрузки.

Спасибо за любую помощь!


person OJEP    schedule 11.11.2020    source источник


Ответы (1)


После долгих поисков я наконец нашел ответ в этом страница

Похоже, что не хватало настройки Transform как objectMode, кроме этого, я не вижу ничего значимого.

    var stream = require('stream')
    var liner = new stream.Transform( { objectMode: true } )
    
    liner._transform = function (chunk, encoding, done) {
         var data = chunk.toString()
         if (this._lastLineData) data = this._lastLineData + data
    
         var lines = data.split('\n')
         this._lastLineData = lines.splice(lines.length-1,1)[0]
    
         lines.forEach(this.push.bind(this))
         done()
    }
    
    liner._flush = function (done) {
         if (this._lastLineData) this.push(this._lastLineData)
         this._lastLineData = null
         done()
    }
    
    module.exports = liner
person OJEP    schedule 30.11.2020