Мне нужно запустить через HTTP-запрос процесс, в котором я загружаю некоторые данные из S3, архивирую их, изменяю поток, сжимаю его и отправляю в другое ведро в S3.
Пока я мог:
- Скачать
- Gunzip
- Изменить (отфильтровать) данные
- вернуть данные
Or:
- Скачать
- Gunzip
- Gzip
- Загрузите неизмененные данные и получите URL-адрес объекта.
Моя первая попытка заключалась в использовании события on ('data') из потока gunzip для изменения данных; затем, когда генерируется событие end, я могу вернуть его браузеру, сделав запрос.
var accumulator = [];
gunzip.on('data', chunk=>{
var lines = chunk.toString('utf-8').split(\n);
lines.forEach(line=>{
if(shouldBeFiltered(line)){
accumulator.push(line);
}
})
})
gunzip.on('end', ()=>{
res.send(accumulator);
})
getS3.pipe(gunzip)
Если вместо возврата результата (res.send) я попытаюсь передать gunzip в gzip, фильтр игнорируется. Это имеет смысл, поскольку у меня есть массив аккумулятор, который я возвращаю (в предыдущем случае), когда генерируется конечное событие.
Затем, покопавшись, я нашел ссылку, в которой предлагалось отправить данные, и попробовал следующее, но не сработало:
gunzip.on('data', chunk=>{
var lines = chunk.toString('utf-8').split(\n);
lines.forEach(line=>{
if(shouldBeFiltered(line)){
gunzip.push(line);
}
})
})
// the end event no longer mattered
// gunzip.on('end', ()=>{
// res.send(accumulator);
// })
getS3.pipe(gunzip).pipe(gzip).pipe(putS3(putS3param.Key, putS3param.Bucket));
Затем я попытался создать поток преобразования (это чрезвычайно упрощено, поскольку я пробовал концепцию), но затем у меня возникла внутренняя ошибка:
const stream = require('stream');
const Transform = stream.Transform;
function filter(pipeline) {
var the_filter = new Transform({
transform(chunk, encoding, next) {
console.log();
chunk += Buffer('Modified', 'utf-8');
this.push(chunk);
next();
}
});
pipeline.pipe(the_filter);
}
У меня больше нет идей, кроме создания файла, его сжатия и загрузки.
Спасибо за любую помощь!