Привет всем, сегодня в главе «Как сделать счастливым пользователя бизнес-аналитики» мы сделаем возможным скачивать «большие данные», не подвергая опасности наш собственный сервер.

Для этого мы будем использовать нашу любимую базу данных PostgreSQL и nodejs с помощью двух пакетов, pg и pg-copy-streams.

Основная проблема загрузки или обработки всех этих данных - их размер. Поэтому, чтобы избежать одновременной загрузки всей таблицы в память, поскольку она может быть довольно большой, мы используем потоки nodejs.

Node.js Streams имеет массу преимуществ, например, у них мало места в памяти, они потребляются и обрабатываются буферизованными фрагментами и, помимо прочего, они не блокируют поток.

А теперь давайте начнем с самого интересного.

Прежде всего, представьте, что у нас есть таблица с миллионами строк, которая называется super_big_table. Наш пользователь захочет отфильтровать и загрузить его. Лучший способ получить быстрый вывод из PostgreSQL - использовать оператор COPY, но у него есть одна проблема. Операторы COPY не допускают параметров. Одно из решений - создать темпоральную таблицу и вставить желаемые данные.

CREATE TEMPORARY TABLE temp_csv_table AS
  SELECT
    t.id, t.value
  FROM
    super_big_table t
  WHERE
    ${customFilters}

Затем нам просто нужно выполнить запрос COPY внутри pg-copy-streams. Код будет примерно таким:

const copyTo = require('pg-copy-streams').to
const pg = require('pg')
const client = new pg.Client()

await client.connect()
const q = `COPY temp_csv_table to STDOUT with csv DELIMITER ';'`
const dataStream = client.query(copyTo(q))

dataStream.on('error', async function (err) {
  // Here we can controll stream errors
  await client.end()
})
dataStream.on('end', async function () {
  await client.end()
})

Теперь представьте, что вам нужно включить в этот поток заголовки. Мы могли бы использовать Трансформировать поток. С его помощью мы могли бы изменить все данные, или, как в этом случае, просто добавив строку в начале.

Мы можем создать наш собственный поток Tranformer, расширив Transform и создав наш собственный класс.

const {Transform} = require('stream')

class PrefixedStream extends Transform {
  constructor (prefixRow) {
    super()
    this.prefixRow = prefixRow
    this.isFirstChunk = true
  }

  _transform (chunk, encoding, callback) {
    if (this.isFirstChunk) {
      this.isFirstChunk = false
      this.push(this.prefixRow)
      this.push('\n')
    }
    this.push(chunk)
    callback()
  }
}

И просто соединяем оба потока.

const csvHeaders = ['Big table id', 'My value']
const csvWithHeadersStream = new streamUtils.PrefixedStream(csvHeaders)
dataStream.pipe(csvWithHeadersStream)

Наконец, нам просто нужно снова передать этот поток, но на этот раз с ответом.

csvWithHeadersStream.pipe(res)

Итак, наконец, чтобы воспользоваться этим преимуществом, нам нужно будет предоставить пользователю возможность загрузить его.

Нам понадобится немного кода на стороне клиента, благодаря хранителю файлов.

const {saveAs} = require('file-saver') 
const blob = new Blob([response.data], {type: 'application/octet-stream'}) 
saveAs(blob, 'super_big_table_filtered.csv')

Итак, как мы только что видели, очень легко загрузить пользовательские данные в CSV и сделать счастливыми клиентов бизнес-аналитики!

Теперь вы больше не боитесь Streams, наслаждайтесь его мощью!

Хосе Луис Пилладо «Фофи» - ведущий инженер-программист