Правильный способ использования нескольких ConnectionPool(ов)

В моем приложении я должен взаимодействовать (только для чтения) с несколькими MySQL БД один за другим. Для каждой БД мне нужно определенное количество соединений. Взаимодействие с БД не происходит однократно: я запрашиваю БД, некоторое время занимаюсь обработкой результатов, снова запрашиваю БД, снова обрабатываю результат и так далее.

Для каждого из этих взаимодействий требуется несколько подключений [Я запускаю несколько запросов одновременно], поэтому мне нужен ConnectionPool, который появляется, когда я начинаю взаимодействовать с БД, и живет до тех пор, пока я не закончу сделано со всеми запросами к этой БД (включая промежуточные интервалы времени, когда я не запрашиваю, а только обрабатываю результаты).


Я могу успешно создать ConnectionPool с желаемым количеством соединений и получить implicit session, как показано ниже.

def createConnectionPool(poolSize: Int): DBSession = {
 implicit val session: AutoSession.type = AutoSession

 ConnectionPool.singleton(
   url = "myUrl",
   user = "myUser",
   password = "***",
   settings = ConnectionPoolSettings(initialSize = poolSize)
 )

 session
}

Затем я передаю этот implicit session всем методам, где мне нужно взаимодействовать с БД. Таким образом, я могу запускать poolSize ни одного запроса одновременно, используя этот session. Справедливо.

def methodThatCallsAnotherMethod(implicit session: DBSession): Unit = {
  ...
  methodThatInteractsWithDb
  ...
}

def methodThatInteractsWithDb(implicit session: DBSession): Unit = {
  ...
  getResultsParallely(poolSize = 32, fetchSize = 2000000)
  ...
}

def getResultsParallely(poolSize: Int, fetchSize: Int)(implicit session: DBSession): Seq[ResultClass] = {
  import java.util.concurrent.Executors
  import scala.concurrent.ExecutionContext
  import scala.concurrent.duration._

  implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(poolSize))

  val resultsSequenceFuture: Seq[Future[ResultClass]] = {
    (0 until poolSize).map { i =>
      val limit: Long = fetchSize
      val offset: Long = i * fetchSize

      Future(methodThatMakesSingleQuery(limit, offset))
    }
  }
  val resultsFutureSequence: Future[Seq[ResultClass]] = Future.sequence(resultsSequenceFuture)

  Await.result(resultsFuture, 2.minutes)
}

У этой техники есть 2 проблемы:

  1. Мое приложение довольно большое и имеет много вызовов вложенных методов, поэтому передача implicit session через все подобные методы (см. ниже) невозможна.
  2. В дополнение к указанным взаимодействиям с разными БД один за другим, мне также необходимо одно соединение с другой (фиксированной) БД на протяжении всего жизненного цикла моего приложения. Это соединение будет использоваться для выполнения небольшой операции записиn (логирования хода моих взаимодействий с другими БД) через каждые несколько минут. Поэтому мне нужно несколько ConnectionPool, по одному на каждую БД

Из того, что я смог сделать из документов ScalikeJdbc, я придумал следующий способ: это не требует от меня передачи implicit session везде.

def createConnectionPool(poolName: String, poolSize: Int): Unit = {
  ConnectionPool.add(
    name = poolName,
    url = "myUrl",
    user = "myUser",
    password = "***",
    settings = ConnectionPoolSettings(initialSize = poolSize)
  )
}

def methodThatInteractsWithDb(poolName: String): Unit = {
  ...
  (DB(ConnectionPool.get(poolName).borrow())).readOnly { implicit session: DBSession =>
    // interact with DB
    ...
  }
  ...
}

Хотя это работает, но я больше не могу распараллеливать взаимодействие с базой данных. Такое поведение очевидно, поскольку я использую метод borrow(), который получает одно соединение из пула. Это, в свою очередь, заставляет меня задаться вопросом, почему эта штука с AutoSession работала раньше: почему я мог запускать несколько запросов одновременно, используя один implicit session? И если эта штука работала, то почему не работает эта? Но я не нашел примеров того, как получить DBSession из ConnectionPool, поддерживающего множественные соединения.


Подводя итог, у меня есть 2 проблемы и 2 решения: по одному для каждой проблемы. Но мне нужно единое (общее) решение, которое решает обе проблемы.

Ограниченная документация ScalikeJdbc не предлагает большой помощи, а блоги/статьи по ScalikeJdbc практически отсутствуют. Пожалуйста, предложите правильный способ/некоторый обходной путь.


Версии фреймворка

  • Scala 2.11.11
  • "org.scalikejdbc" %% "scalikejdbc" % "3.2.0"

person y2k-shubham    schedule 06.04.2018    source источник
comment
Вот ссылка на мой пост в ScalikeJdbc Google User Group по этому вопросу   -  person y2k-shubham    schedule 06.04.2018


Ответы (1)


Благодаря @Dennis Hunziker я смог выяснить правильный способ освободить соединения, заимствованные из ConnectionPool ScalikeJdbc. Это можно сделать следующим образом:

import scalikejdbc.{ConnectionPool, using}
import java.sql.Connection

using(ConnectionPool.get("poolName").borrow()) { (connection: Connection) =>
    // use connection (only once) here
}
// connection automatically returned to pool

Благодаря этому теперь я могу распараллелить взаимодействие с пулом.


Чтобы решить мою проблему управления несколькими ConnectionPool и использования соединений между несколькими classes, я написал ConnectionPoolManager, полный код для которого можно найти здесь. Разгрузив задачи

  • создание пулов
  • заимствование соединений из пулов
  • удаление пулов

к объекту singleton, который я мог использовать где угодно в своем проекте, я смог очистить много беспорядка и избавился от необходимости проходить implicit session через цепочку методов.


EDIT-1

Хотя я уже связал полный код для ConnectionPoolManager, вот краткий намек на то, как вы можете пойти об этом

Следующий метод ConnectionPoolManager позволяет заимствовать соединения из ConnectionPools

def getDB(dbName: String, poolNameOpt: Option[String] = None): DB = {
  // create a pool for db (only) if it doesn't exist
  addPool(dbName, poolNameOpt)

  val poolName: String = poolNameOpt.getOrElse(dbName)
  DB(ConnectionPool.get(poolName).borrow())
}

После этого во всем коде вы можете использовать описанный выше метод для заимствования соединений из пулов и выполнения запросов.

def makeQuery(dbName: String, poolNameOpt: Option[String]) = {
  ConnectionPoolManager.getDB(dbName, poolNameOpt).localTx { implicit session: DBSession =>
    // perform ScalikeJdbc SQL query here
  }
}
person y2k-shubham    schedule 06.09.2018