Привет всем, сегодня в главе «Как сделать счастливым пользователя бизнес-аналитики» мы сделаем возможным скачивать «большие данные», не подвергая опасности наш собственный сервер.
Для этого мы будем использовать нашу любимую базу данных PostgreSQL и nodejs с помощью двух пакетов, pg и pg-copy-streams.
Основная проблема загрузки или обработки всех этих данных - их размер. Поэтому, чтобы избежать одновременной загрузки всей таблицы в память, поскольку она может быть довольно большой, мы используем потоки nodejs.
Node.js Streams имеет массу преимуществ, например, у них мало места в памяти, они потребляются и обрабатываются буферизованными фрагментами и, помимо прочего, они не блокируют поток.
А теперь давайте начнем с самого интересного.
Прежде всего, представьте, что у нас есть таблица с миллионами строк, которая называется super_big_table. Наш пользователь захочет отфильтровать и загрузить его. Лучший способ получить быстрый вывод из PostgreSQL - использовать оператор COPY, но у него есть одна проблема. Операторы COPY не допускают параметров. Одно из решений - создать темпоральную таблицу и вставить желаемые данные.
CREATE TEMPORARY TABLE temp_csv_table AS SELECT t.id, t.value FROM super_big_table t WHERE ${customFilters}
Затем нам просто нужно выполнить запрос COPY внутри pg-copy-streams. Код будет примерно таким:
const copyTo = require('pg-copy-streams').to const pg = require('pg') const client = new pg.Client() await client.connect() const q = `COPY temp_csv_table to STDOUT with csv DELIMITER ';'` const dataStream = client.query(copyTo(q)) dataStream.on('error', async function (err) { // Here we can controll stream errors await client.end() }) dataStream.on('end', async function () { await client.end() })
Теперь представьте, что вам нужно включить в этот поток заголовки. Мы могли бы использовать Трансформировать поток. С его помощью мы могли бы изменить все данные, или, как в этом случае, просто добавив строку в начале.
Мы можем создать наш собственный поток Tranformer, расширив Transform и создав наш собственный класс.
const {Transform} = require('stream') class PrefixedStream extends Transform { constructor (prefixRow) { super() this.prefixRow = prefixRow this.isFirstChunk = true } _transform (chunk, encoding, callback) { if (this.isFirstChunk) { this.isFirstChunk = false this.push(this.prefixRow) this.push('\n') } this.push(chunk) callback() } }
И просто соединяем оба потока.
const csvHeaders = ['Big table id', 'My value'] const csvWithHeadersStream = new streamUtils.PrefixedStream(csvHeaders) dataStream.pipe(csvWithHeadersStream)
Наконец, нам просто нужно снова передать этот поток, но на этот раз с ответом.
csvWithHeadersStream.pipe(res)
Итак, наконец, чтобы воспользоваться этим преимуществом, нам нужно будет предоставить пользователю возможность загрузить его.
Нам понадобится немного кода на стороне клиента, благодаря хранителю файлов.
const {saveAs} = require('file-saver') const blob = new Blob([response.data], {type: 'application/octet-stream'}) saveAs(blob, 'super_big_table_filtered.csv')
Итак, как мы только что видели, очень легко загрузить пользовательские данные в CSV и сделать счастливыми клиентов бизнес-аналитики!
Теперь вы больше не боитесь Streams, наслаждайтесь его мощью!
Хосе Луис Пилладо «Фофи» - ведущий инженер-программист