slick-3.0akka-stream

Is it possible to create an "infinite" stream from a database table using Akka Stream


I'm playing with Akka Streams 2.4.2 and am wondering if it's possible to setup a stream which uses a database table for a source and whenever there is a record added to the table that record is materialized and pushed downstream?

UPDATE: 2/23/16

I've implemented the solution from @PH88. Here's my table definition:

case class Record(id: Int, value: String)

class Records(tag: Tag) extends Table[Record](tag, "my_stream") {
  def id = column[Int]("id")
  def value = column[String]("value")
  def * = (id, value) <> (Record.tupled, Record.unapply)
}

Here's the implementation:

 implicit val system = ActorSystem("Publisher")
 implicit val materializer = ActorMaterializer()
 val db = Database.forConfig("pg-postgres")

 try{
  val newRecStream = Source.unfold((0, List[Record]())) { n =>
    try {
      val q = for (r <- TableQuery[Records].filter(row => row.id > n._1)) yield (r)
      val r = Source.fromPublisher(db.stream(q.result)).collect {
        case rec => println(s"${rec.id}, ${rec.value}"); rec
      }.runFold((n._1, List[Record]())) {
        case ((id, xs), current) => (current.id, current :: xs)
      }

      val answer: (Int, List[Record]) = Await.result(r, 5.seconds)
      Option(answer, None)
    }
    catch { case e:Exception => println(e); Option(n, e) }
  }


   Await.ready(newRecStream.throttle(1, 1.second, 1, ThrottleMode.shaping).runForeach(_ => ()), Duration.Inf)
 }
 finally {
   system.shutdown
   db.close
 }

But my problem is that when I attempt to call flatMapConcat the type I get is Serializable.

UPDATE: 2/24/16

Updated to try db.run suggestion from @PH88:

implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val disableAutoCommit = SimpleDBIO(_.connection.setAutoCommit(false))
val queryLimit = 1

try {
 val newRecStream = Source.unfoldAsync(0) { n =>
     val q = TableQuery[Records].filter(row => row.id > n).take(queryLimit)
     db.run(q.result).map { recs =>
       Some(recs.last.id, recs)
     }
   }
   .throttle(1, 1.second, 1, ThrottleMode.shaping)
   .flatMapConcat { recs =>
      Source.fromIterator(() => recs.iterator)
   }
   .runForeach { rec =>
       println(s"${rec.id}, ${rec.value}")
   }

   Await.ready(newRecStream, Duration.Inf)
 }
 catch
 {
   case ex: Throwable => println(ex)
 }
 finally {
   system.shutdown
   db.close
 }

Which works (I changed query limit to 1 since I only have a couple items in my database table currently) - except once it prints the last row in the table the program exists. Here's my log output:

17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/xxxxxxx/dev/src/scratch/scala/fpp-in-scala/target/scala-2.11/classes/logback.xml]
17:09:28,062 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:09:28,064 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
17:09:28,079 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
17:09:28,102 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [application] to DEBUG
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
17:09:28,103 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
17:09:28,104 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@4278284b - Registering current configuration as safe fallback point
17:09:28.117 [main] INFO  com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
1, WASSSAAAAAAAP!
2, WHAAAAT?!?
3, booyah!
4, what!
5, This rocks!
6, Again!
7, Again!2
8, I love this!
9, Akka Streams rock
10, Tuning jdbc
17:09:39.000 [main] INFO  com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.

Process finished with exit code 0

Found the missing piece - need to replace this:

Some(recs.last.id, recs)

with this:

 val lastId = if(recs.isEmpty) n else recs.last.id
 Some(lastId, recs)

The call to recs.last.id was throwing java.lang.UnsupportedOperationException: empty.last when the result set was empty.


Solution

  • In general SQL database is a 'passive' construct and does not actively push changes like what you described. You can only 'simulate' the 'push' with periodic polling like:

    val newRecStream = Source
    
      // Query for table changes
      .unfold(initState) { lastState =>
        // query for new data since lastState and save the current state into newState...
        Some((newState, newRecords))
      }
    
      // Throttle to limit the poll frequency
      .throttle(...)  
    
      // breaks down into individual records...
      .flatMapConcat { newRecords =>
        Source.unfold(newRecords) { pendingRecords =>
          if (records is empty) {
            None
          } else {
            // take one record from pendingRecords and save to newRec.  Save the rest into remainingRecords.
            Some(remainingRecords, newRec)
          }
        }
      }
    

    Updated: 2/24/2016

    Pseudo code example based on the 2/23/2016 updates of the question:

    implicit val system = ActorSystem("Publisher")
    implicit val materializer = ActorMaterializer()
    val db = Database.forConfig("pg-postgres")
    val queryLimit = 10
    try {
      val completion = Source
        .unfoldAsync(0) { lastRowId =>
          val q = TableQuery[Records].filter(row => row.id > lastRowId).take(queryLimit)
          db.run(q.result).map { recs =>
            Some(recs.last.id, recs)
          }
        }
        .throttle(1, 1.second, 1, ThrottleMode.shaping)
        .flatMapConcat { recs =>
          Source.fromIterator(() => recs.iterator)
        }
        .runForeach { rec =>
          println(s"${rec.id}, ${rec.value}")
        }
    
      // Block forever
      Await.ready(completion, Duration.Inf)
    
    } catch {
      case ex: Throwable => println(ex)
    } finally {
      system.shutdown
      db.close
    }
    

    It will repeatedly execute the query in unfoldAsync against the DB, retrieving at most 10 (queryLimit) records a time and send the records downstream (-> throttle -> flatMapConcat -> runForeach). The Await at the end will actually block forever.

    Updated: 2/25/2016

    Executable 'proof-of-concept' code:

    import akka.actor.ActorSystem
    import akka.stream.{ThrottleMode, ActorMaterializer}
    import akka.stream.scaladsl.Source
    import scala.concurrent.duration.Duration
    import scala.concurrent.{Await, Future}
    import scala.concurrent.duration._
    
    object Infinite extends App{
      implicit val system = ActorSystem("Publisher")
      implicit val ec = system.dispatcher
      implicit val materializer = ActorMaterializer()
      case class Record(id: Int, value: String)
      try {
        val completion = Source
          .unfoldAsync(0) { lastRowId =>
            Future {
              val recs = (lastRowId to lastRowId + 10).map(i => Record(i, s"rec#$i"))
              Some(recs.last.id, recs)
            }
          }
          .throttle(1, 1.second, 1, ThrottleMode.Shaping)
          .flatMapConcat { recs =>
            Source.fromIterator(() => recs.iterator)
          }
          .runForeach { rec =>
            println(rec)
          }
    
        Await.ready(completion, Duration.Inf)
    
      } catch {
        case ex: Throwable => println(ex)
      } finally {
        system.shutdown
      }
    }