Я создаю HTTP-функцию Firebase, которая выполняет запрос BigQuery и возвращает измененную версию результатов запроса. Запрос потенциально возвращает миллионы строк, поэтому я не могу сохранить весь результат запроса в памяти, прежде чем ответить HTTP-клиенту. Я пытаюсь использовать потоки Node.js, и, поскольку мне нужно изменить результаты перед их отправкой клиенту, я пытаюсь использовать поток преобразования. Однако, когда я пытаюсь направить поток запроса через поток преобразования, функция Firebase падает со следующим сообщением об ошибке: finished with status: 'response error'
.
Мой минимальный воспроизводимый пример выглядит следующим образом. Я использую буфер, потому что не хочу обрабатывать по одной строке (фрагменту) за раз, так как мне нужно делать асинхронные сетевые вызовы для преобразования данных.
return new Promise((resolve, reject) => {
const buffer = new Array(5000)
let bufferIndex = 0
const [job] = await bigQuery.createQueryJob(options)
const bqStream = job.getQueryResultsStream()
const transformer = new Transform({
writableObjectMode: true,
readableObjectMode: false,
transform(chunk, enc, callback) {
buffer[bufferIndex] = chunk
if (bufferIndex < buffer.length - 1) {
bufferIndex++
}
else {
this.push(JSON.stringify(buffer).slice(1, -1)) // Transformation should happen here.
bufferIndex = 0
}
callback()
},
flush(callback) {
if (bufferIndex > 0) {
this.push(JSON.stringify(buffer.slice(0, bufferIndex)).slice(1, -1))
}
this.push("]")
callback()
},
})
bqStream
.pipe(transform)
.pipe(response)
bqStream.on("end", () => {
resolve()
})
}