scalaapache-sparksingletonconnection-pool

Add Configurations to a Singleton Object in Scala


I am trying to set up a connection pool to Redis in a Singleton Scala Object so that I can read/write to Redis while mapping partitions of a DF. I want to be able to configure the host, along with other connection pool variables when I run my main method. However, this current configuration does not give me my configured REDIS_HOST, it gives me localhost.

When writing this I referenced https://able.bio/patrickcording/sharing-objects-in-spark--58x4gbf the One instance per executor section.

What is the best way to acheive configuring the host while maintaining one RedisClient instance per executor?

object Main {
  def main(args: Array[String]): Unit = {
    val parsedConfig = ConfigFactory.parseFile(new File(args(0)))
    val config = ConfigFactory.load(parsedConfig)
    RedisClient.host = config.getString("REDIS_HOST")
    val Main = new Main()
    Main.runMain()
  }
}

class Main{
    val df = Seq(...).toDF()
    df.mapPartitions(partitions => {
        partitions.foreach(row => {
           val count =  RedisClient.getIdCount(row.getAs("id").asInstanceOf[String])
            //do something
        })
    })

    df.write.save
    RedisClient.close()
}

object RedisClient {
  var host: String = "localhost"

  private val pool = new RedisClientPool(host, 6379)

  def getIdCount(id: String):Option[String] = {
    pool.withClient(client => {
      client.get(orderLineId)
    })
  }

  def close(): Unit = {
    pool.close()

  }

}

Solution

  • In Spark, main only runs on the driver, not the executors. RedisClient is not guaranteed to exist on any given executor until you call a method which invokes it, and it will just be initialized with default values.

    Accordingly, the only way to ensure that it has the correct host is to, in the same RDD/DF operation, ensure that host is set, e.g.:

    df.mapPartitions(partitions => {
      RedisClient.host = config.getString("REDIS_HOST")
      partitions.foreach(row => {
        ...
      }
    }
    

    Of course, since main doesn't run on the executors, you'll probably also want to broadcast the config to the executors:

    // after setting up the SparkContext
    val sc: SparkContext = ???
    val broadcastConfig = sc.broadcast(config)
    

    Then you'll pass broadcastConfig around and use broadcastConfig.value in place of config, so the above would become:

    df.mapPartitions(partitions => {
      RedisClient.host = broadcastConfig.value.getString("REDIS_HOST")
      partitions.foreach(row => {
        ...
      }
    }
    

    As long as you take care to always be assigning the same value to RedisClient.host and to set it before doing anything else with RedisClient, you should be safe.