(Поскольку ваш вопрос касается создания источника потоковой передачи, этот ответ касается только стороны издателя поддержки потоковой передачи и игнорирует сторону подписчика.)
Для поддержки потоковой передачи требуется, чтобы запрос возврата базы данных возвращал несколько строк за раз, обычно на основе курсора, а не все сразу. В разных базах данных есть разные способы включить это. ScalikeJDBC изначально поддерживает использование метода потоковой передачи iterator
для драйверов MySQL и PostgreSQL. То есть с MySQL и драйвером PostgreSQL работает следующее:
import scalikejdbc._
import scalikejdbc.streams._
// set up a connection pool
import scala.concurrent.ExecutionContext.Implicits.global
val publisher: DatabasePublisher[Int] = DB.readOnlyStream {
sql"select id from users order by id".map(r => r.get[Int]("id")).iterator
}
Вышеприведенное работает для MySQL и PostgreSQL из-за это:
/**
* Forcibly changes the database session to be cursor query ready.
*/
val defaultDBSessionForceAdjuster: DBSessionForceAdjuster = (session) => {
// setup required settings to enable cursor operations
session.connectionAttributes.driverName match {
case Some(driver) if driver == "com.mysql.jdbc.Driver" && session.fetchSize.exists(_ > 0) =>
/*
* MySQL - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
*
* StreamAction.StreamingInvoker prepares the following required settings in advance:
*
* - java.sql.ResultSet.TYPE_FORWARD_ONLY
* - java.sql.ResultSet.CONCUR_READ_ONLY
*
* If the fetchSize is set as 0 or less, we need to forcibly change the value with the Int min value.
*/
session.fetchSize(Int.MinValue)
case Some(driver) if driver == "org.postgresql.Driver" =>
/*
* PostgreSQL - https://jdbc.postgresql.org/documentation/94/query.html
*
* - java.sql.Connection#autocommit false
* - java.sql.ResultSet.TYPE_FORWARD_ONLY
*/
session.conn.setAutoCommit(false)
case _ =>
}
}
Обратите внимание, что последнее предложение case
означает, что ScalikeJDBC по умолчанию не поддерживает потоковую передачу iterator
с драйверами, отличными от драйверов для MySQL и PostgreSQL.
Это не означает, что нельзя использовать другие драйверы для потоковой передачи. В разделе документации, который вы цитируете, есть следующий пример кода:
val publisher: DatabasePublisher[Int] = DB readOnlyStream {
sql"select id from users".map(r => r.int("id"))
.iterator
.withDBSessionForceAdjuster(session => {
session.conn.setAutoCommit(true)
})
}
В документации говорится, что для включения потоковой передачи для баз данных, отличных от MySQL и PostgreSQL, вам необходимо настроить атрибуты DBSession
, как в приведенном выше примере, чтобы была включена поддержка курсора. Что именно влечет за собой эта настройка (например, настройка fetchSize
или отключение autoCommit
в соединении), зависит от драйвера (при условии, что драйвер поддерживает извлечение результатов запроса по небольшому количеству строк за раз).
person
Jeffrey Chung
schedule
14.08.2017