I spent all day trying to figure out how to solve this issue.
The purpose is to insert several sequence of strings into a single column of a table.
I have a method like this:
case class Column(strings: Seq[String])
def insertColumns(columns: Seq[Column]) = for {
_ <- Future.sequence(columns.map(col => insert(col)))
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
This is working to a point. I tested for a sequence of 2100 columns (with 100 strings in each), and it works fine. But as soon as I increase the number of columns to 3100+, I have this error
Task slick.basic.BasicBackend$DatabaseDef$$anon$3@293ce053 rejected from slick.util.AsyncExecutor$$anon$1$$anon$2@3e423930[Running, pool size = 10, active threads = 10, queued tasks = 1000, completed tasks = 8160]
I have read on several places that doing something like this would help
case class Column(strings: Seq[String])
val f = Future.sequence(columns.map(col => insert(col)))
def insertColumns(columns: Seq[Column]) = for {
_ <- f
} yield()
private def insert(column: Column) =
db.run((stringTable ++= rows)) //slick batch insert
it does not.
I tried several combination of changes inside insert
Future.sequence(
rows.grouped(500).toSeq.map(group => db.run(DBIO.seq(stringTable ++= group)))
)
Source(rows).buffer(500, OverflowStrategy.backpressure)
.via(
Slick.flow(row => stringTable += row)
)
.log("nr-of-inserted-rows")
.runWith(Sink.ignore)
Source(rows)
.runWith(Slick.sink(1, row => stringTable += row))
I tried:
reWriteBatchedInserts=true
inside my config(dataColumnStringsTable ++= rows).transactionally
optionimplicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1))
to try to execute the futures sequentiallyI don't have any other idea than reworking my subscriber to receive and block my messages (sequence of strings) and handle the back pressure at queue messaging side.
I am using slick (with alpakka-slick) 3.3.3 / HikariCP 3.2.0 / Postgres 13.2
My config is as such
slick {
profile = "slick.jdbc.PostgresProfile$"
db {
connectionPool = "HikariCP"
dataSourceClass = "slick.jdbc.DriverDataSource"
properties = {
driver = "org.postgresql.Driver"
user = "postgres"
password = "password"
url = "jdbc:postgresql://"${slick.db.host}":5432/slick?reWriteBatchedInserts=true"
}
host = "localhost"
numThreads = 10
maxConnections = 100
minConnections = 1
}
}
Thank your for your help.
You shouldn't use Future.sequence
with collections of more than a few elements. Every Future
is a computation running in the background. So when you run this for a collection of, let's say, 3000 columns
:
Future.sequence(columns.map(col => insert(col)))
you effectively spawn 3000 operations at once. As a result, the executor may start rejecting new tasks.
The solution is to process the input collection with Akka Streams. In your case, this means creating a Source
from columns
(not from rows
). This will ensure that the executor is not overwhelmed with too many parallel operations. I haven't used alpakka-slick
, but looking at the docs, the solution should look something like this:
Source(columns)
.via(
Slick.flow(column => stringTable ++= column.rows)
)
// further processing here
What's more, if "columns" are coming from a message queue, it's possible that you don't even need an intermediate Seq[Column]
. You may simply need to define a Source
of Column
that reads from the queue, and process it with a Slick flow.