I have a rabbit MQ queue which is high load, it may have up to several millions messages. The MQ broker reads messages from queue and writes them to MS SQL DB. I tried to write non-blocking, concurrently, using a goroutine:
for m := range msgs {
//......
se := &sqlEntity{
body: string(m.Body),
cnt: m.MessageCount,
timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
uuid: u,
}
go func(se *sqlEntity) {
writeSQL(se)
}(se)
//.........
}
func writeSQL(se *sqlEntity) {
result, err := db.Exec(cmd, args...)
//.......
}
So, write function does not block reading from MQ. But if there are too many messages, write process exhausts all present connections on MS SQL server. Thus I tried to setup the pool, set the number of connections explicitly - (DB.SetMaxOpenConns). I was sure that the database/sql driver will manage the connections, but it does not. If connections (for example let SetMaxOpenConns = 256) exhausts, writeSQL() call does not wait for free connection in the pool, result, err := db.Exec(cmd, args...) inside it simply returns connection error in this case. So, how can I design my application to call writeSQL() concurrently, but strictly within the pool limits? Now I simply loose my data if the pool is exhausted. Or DB overloads if there is no pool limit.
One thing you can do is to use a buffered channel with a size equal to the maximum number of connections in the pool to control the concurrency of writeSQL
function.
Every time writeSQL
is called, it sends a message to the channel.
And before executing the db.Exec
statement, it waits for a message to be received from the channel which indicates a free connection is available in the pool.
This way should allow you to handle the number of concurrent writeSQL
functions and ensure that it will never exceed the maximum number of connections in the pool.
You won't lose any data when the pool is exhausted.
Using the code you've provided, it should look like:
connPool := make(chan struct{}, maxOpenConns) // create a buffered channel with a size equal to the maximum number of connections in the pool
for m := range msgs {
// ...
se := &sqlEntity{
body: string(m.Body),
cnt: m.MessageCount,
timeStamp: fmt.Sprintf("%v", m.Timestamp.Format("2006-01-02 15:04:05")),
uuid: u,
}
go func(se *sqlEntity) {
writeSQL(se)
}(se)
// ...
}
func writeSQL(se *sqlEntity) {
connPool <- struct{}{} // wait for a free connection in the pool
defer func() {
<-connPool // release the connection after writeSQL is done
}()
result, err := db.Exec(cmd, args...)
// handle error and return
}