Размер выборки в PGConnection.getNotifications

Функция в моей базе данных postgresql отправляет уведомление при обновлении таблицы. Я опрашиваю эту базу данных postgresql с помощью scalikejdbc, чтобы получить все уведомления, а затем что-то с ними сделать. Этот процесс объясняется здесь . Типичная реактивная система для обновлений таблиц sql. Я получаю PGConnection из java.sql.Connection. И после этого я получаю уведомления таким образом:

val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())

Я пытаюсь получать уведомления порциями по 1000, устанавливая размер выборки на 1000 и отключая автоматическую фиксацию. Но свойство размера выборки игнорируется.

Любые идеи, как я мог это сделать? Я бы не хотел обрабатывать сотни тысяч уведомлений на одной карте по моему набору данных уведомлений.

pgConnection.getNotifications.size может быть огромным, и поэтому этот код не будет хорошо масштабироваться.

Спасибо!!!


person josele    schedule 10.07.2018    source источник
comment
Может быть, использовать akka-streams и .group?   -  person lasekio    schedule 10.07.2018


Ответы (1)


Для лучшего масштабирования рассмотрите возможность использования postgresql-async и Akka Streams: первая — это библиотека, которая может асинхронно получать уведомления PostgreSQL. , а первый представляет собой реализацию Reactive Streams, обеспечивающую обратное давление (что устраняет необходимость в пейджинге). . Например:

import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
import com.github.mauricio.async.db.postgresql.util.URLParser

import scala.concurrent.duration._
import scala.concurrent.Await

class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
  private implicit val ec = context.system.dispatcher

  val queue =  
    Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
      .to(Sink.foreach(println))
      .run()

  val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
  val connection = new PostgreSQLConnection(configuration)
  Await.result(connection.connect, 5 seconds)

  connection.sendQuery("LISTEN my_channel")
  connection.registerNotifyListener { message =>
    val msg = message.payload
    log.debug("Sending the payload: {}", msg)
    self ! msg
  }

  def receive = {
    case payload: String =>
      queue.offer(payload).pipeTo(self)
    case QueueOfferResult.Dropped =>
      log.warning("Dropped a message.")
    case QueueOfferResult.Enqueued =>
      log.debug("Enqueued a message.")
    case QueueOfferResult.Failure(t) =>
      log.error("Stream failed: {}", t.getMessage)
    case QueueOfferResult.QueueClosed =>
      log.debug("Stream closed.")
  }
}

Приведенный выше код просто выводит уведомления от PostgreSQL по мере их появления; вы можете заменить Sink.foreach(println) другим Sink. Чтобы запустить его:

import akka.actor._
import akka.stream.ActorMaterializer

object Example extends App {
  implicit val system = ActorSystem()
  implicit val materializer = ActorMaterializer()
  system.actorOf(Props(classOf[DbActor], materializer))
}
person Jeffrey Chung    schedule 10.07.2018