scalikejdbc потоковая передача с оракулом

Я реализую источник потоковой передачи с помощью ScalikeJDBC, и мне нужно, чтобы он работал с несколькими типами БД, включая. Оракл, Сибас и др.

Документация немного сбивает с толку и не уверена, что это вариант:

На данный момент scalikejdbc-streams изначально поддерживает MySQL и PostgreSQL. При обычном использовании фабричного метода итератора SQL# ScalikeJDBC автоматически включает необходимые настройки для использования функции курсора. Если вам не нравится такое поведение, вы можете вместо этого настроить атрибуты DBSession.

Можно ли обрабатывать потоковое чтение через другие БД, кроме MySQL и PostgreSQL?


person Evan M.    schedule 02.08.2017    source источник


Ответы (1)


(Поскольку ваш вопрос касается создания источника потоковой передачи, этот ответ касается только стороны издателя поддержки потоковой передачи и игнорирует сторону подписчика.)

Для поддержки потоковой передачи требуется, чтобы запрос возврата базы данных возвращал несколько строк за раз, обычно на основе курсора, а не все сразу. В разных базах данных есть разные способы включить это. ScalikeJDBC изначально поддерживает использование метода потоковой передачи iterator для драйверов MySQL и PostgreSQL. То есть с MySQL и драйвером PostgreSQL работает следующее:

import scalikejdbc._
import scalikejdbc.streams._

// set up a connection pool

import scala.concurrent.ExecutionContext.Implicits.global

val publisher: DatabasePublisher[Int] = DB.readOnlyStream {
  sql"select id from users order by id".map(r => r.get[Int]("id")).iterator
}

Вышеприведенное работает для MySQL и PostgreSQL из-за это:

/**
 * Forcibly changes the database session to be cursor query ready.
 */
val defaultDBSessionForceAdjuster: DBSessionForceAdjuster = (session) => {

  // setup required settings to enable cursor operations
  session.connectionAttributes.driverName match {
    case Some(driver) if driver == "com.mysql.jdbc.Driver" && session.fetchSize.exists(_ > 0) =>
      /*
       * MySQL - https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html
       *
       * StreamAction.StreamingInvoker prepares the following required settings in advance:
       *
       * - java.sql.ResultSet.TYPE_FORWARD_ONLY
       * - java.sql.ResultSet.CONCUR_READ_ONLY
       *
       * If the fetchSize is set as 0 or less, we need to forcibly change the value with the Int min value.
       */
      session.fetchSize(Int.MinValue)

    case Some(driver) if driver == "org.postgresql.Driver" =>
      /*
       * PostgreSQL - https://jdbc.postgresql.org/documentation/94/query.html
       *
       * - java.sql.Connection#autocommit false
       * - java.sql.ResultSet.TYPE_FORWARD_ONLY
       */
      session.conn.setAutoCommit(false)

    case _ =>
  }
}

Обратите внимание, что последнее предложение case означает, что ScalikeJDBC по умолчанию не поддерживает потоковую передачу iterator с драйверами, отличными от драйверов для MySQL и PostgreSQL.

Это не означает, что нельзя использовать другие драйверы для потоковой передачи. В разделе документации, который вы цитируете, есть следующий пример кода:

val publisher: DatabasePublisher[Int] = DB readOnlyStream {
  sql"select id from users".map(r => r.int("id"))
    .iterator
    .withDBSessionForceAdjuster(session => {
      session.conn.setAutoCommit(true)
    })
}

В документации говорится, что для включения потоковой передачи для баз данных, отличных от MySQL и PostgreSQL, вам необходимо настроить атрибуты DBSession, как в приведенном выше примере, чтобы была включена поддержка курсора. Что именно влечет за собой эта настройка (например, настройка fetchSize или отключение autoCommit в соединении), зависит от драйвера (при условии, что драйвер поддерживает извлечение результатов запроса по небольшому количеству строк за раз).

person Jeffrey Chung    schedule 14.08.2017