postgresqlakkascalikejdbc

Fetch size in PGConnection.getNotifications


A function in my postgresql database sends a notification when a table is updated. I'm polling that postgresql database by scalikejdbc, to get all the notifications, and then, do something with them. The process is explained here . A typical reactive system to sql tables updates. I get the PGConnection from the java.sql.Connection. And, after that, I get the notifications in this way:

val notifications = Option(pgConnection.getNotifications).getOrElse(Array[PGNotification]())

I'm trying to get the notifications in chunks of 1000 by setting the fetch size to 1000, and disabling the auto commit. But fetch size property is ignored.

Any ideas how I could do that? I wouldn't want to handle hundreds of thousands of notifications in a single map over my notifications dataset.

pgConnection.getNotifications.size could be huge, and therefore, this code wouldn't scale well.

Thanks!!!


Solution

  • To better scale, consider using postgresql-async and Akka Streams: the former is a library that can obtain PostgreSQL notifications asynchronously, and the former is a Reactive Streams implementation that provides backpressure (which would obviate the need for paging). For example:

    import akka.actor._
    import akka.stream._
    import akka.stream.scaladsl._
    
    import com.github.mauricio.async.db.postgresql.PostgreSQLConnection
    import com.github.mauricio.async.db.postgresql.util.URLParser
    
    import scala.concurrent.duration._
    import scala.concurrent.Await
    
    class DbActor(implicit materializer: ActorMaterializer) extends Actor with ActorLogging {
      private implicit val ec = context.system.dispatcher
    
      val queue =  
        Source.queue[String](Int.MaxValue, OverflowStrategy.backpressure)
          .to(Sink.foreach(println))
          .run()
    
      val configuration = URLParser.parse("jdbc:postgresql://localhost:5233/my_db?user=dbuser&password=pwd")
      val connection = new PostgreSQLConnection(configuration)
      Await.result(connection.connect, 5 seconds)
    
      connection.sendQuery("LISTEN my_channel")
      connection.registerNotifyListener { message =>
        val msg = message.payload
        log.debug("Sending the payload: {}", msg)
        self ! msg
      }
    
      def receive = {
        case payload: String =>
          queue.offer(payload).pipeTo(self)
        case QueueOfferResult.Dropped =>
          log.warning("Dropped a message.")
        case QueueOfferResult.Enqueued =>
          log.debug("Enqueued a message.")
        case QueueOfferResult.Failure(t) =>
          log.error("Stream failed: {}", t.getMessage)
        case QueueOfferResult.QueueClosed =>
          log.debug("Stream closed.")
      }
    }
    

    The code above simply prints notifications from PostgreSQL as they occur; you can replace the Sink.foreach(println) with another Sink. To run it:

    import akka.actor._
    import akka.stream.ActorMaterializer
    
    object Example extends App {
      implicit val system = ActorSystem()
      implicit val materializer = ActorMaterializer()
      system.actorOf(Props(classOf[DbActor], materializer))
    }