Как правильно работать с результатами потоковой передачи slick 3.0.0 и Postgresql?

Я пытаюсь понять, как работать с гладкой потоковой передачей. Я использую slick 3.0.0 с драйвером postgres.

Ситуация следующая: сервер должен отдавать клиенту последовательности данных, разбитых на чанки, ограниченные по размеру (в байтах). Итак, я написал следующий гладкий запрос:

val sequences = TableQuery[Sequences]
def find(userId: Long, timestamp: Long) = sequences.filter(s ⇒ s.userId === userId && s.timestamp > timestamp).sortBy(_.timestamp.asc).result
val seq = db.stream(find(0L, 0L))

Я объединил seq с akka-streams Source, написал пользовательский PushPullStage, который ограничивает размер данных (в байтах) и завершается вверх по течению, когда он достигает предела размера. Это работает просто отлично. Проблема в том, что когда я смотрю в журналы postgres, я вижу такой запрос select * from sequences where user_id = 0 and timestamp > 0 order by timestamp;

Таким образом, на первый взгляд кажется, что выполняется много (и ненужных) запросов к базе данных, только для использования нескольких байтов в каждом запросе. Как правильно выполнять потоковую передачу с помощью Slick, чтобы свести к минимуму запросы к базе данных и наилучшим образом использовать данные, передаваемые в каждом запросе?


person Tatarinov Nikolay    schedule 10.07.2015    source источник


Ответы (2)


«Правильный способ» потоковой передачи с помощью Slick и Postgres включает в себя три вещи:

  1. Необходимо использовать db.stream()

  2. Необходимо отключить autoCommit в JDBC-драйвере. Один из способов — запустить запрос в транзакции, добавив суффикс .transactionally.

  3. Необходимо установить для fetchSize значение, отличное от 0, иначе postgres передаст клиенту весь набор результатов за один раз.

Ex:

DB.stream(
  find(0L, 0L)
    .transactionally
    .withStatementParameters(fetchSize = 1000)
).foreach(println)

Полезные ссылки:

https://github.com/slick/slick/issues/1038

https://github.com/slick/slick/issues/809

person Rikard    schedule 12.07.2015
comment
Спасибо за ответ, очень полезно. Меня смутило добавление: разве контекст выполнения, который обрабатывает db, не поддерживается отдельно в AsyncExecutor? - person JimN; 04.09.2015
comment
Рад слышать! По поводу дополнения: Да, вы правы. Это должно быть значение по умолчанию. Я удаляю дополнение, так как оно больше сбивает с толку, чем помогает, и, оглядываясь назад, я думаю, что это действительно было связано с механикой обратного давления. Мой потребитель был намного быстрее, чем сеть, поэтому более подходящим решением была обработка фьючерсов сразу после получения результатов. - person Rikard; 04.09.2015
comment
Есть ли аналог для MySQL? - person matanster; 09.03.2016
comment
Зачем на самом деле отключать автоматическую фиксацию? - person matanster; 09.03.2016

Правильный способ потоковой передачи в Slick указан в документации.

val q = for (c <- coffees) yield c.image
val a = q.result
val p1: DatabasePublisher[Blob] = db.stream(a.withStatementParameters(rsType = ResultSetType.ForwardOnly, rsConcurrency = ResultSetConcurrency.ReadOnly, fetchSize = 1000 /*your fetching size*/).transactionally)
person Manjunath A    schedule 03.01.2020