apache-kafkajvmapache-kafka-streamsrocksdb

Kafka Streams unexpectedly high memory usage


I'm running a Kafka Streams application in Kubernetes that simply reads a topic straight into a GlobalKTable and then offers an HTTP API to get the key-value pairs. The memory usage of this pod is for me unexpectedly high and I can not figure out why it uses so much memory.

I have configured a BoundedMemoryRocksDBConfigSetter according to https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb

roughly

abstract class BoundedMemoryRocksDBConfig(blockCacheSize: Long = 50 * BoundedMemoryRocksDBConfig.Mi,
                                          numPartitions: Int = 10,
                                          ttl: Long = 180 * 24 * 60 * 60L)
  extends RocksDBConfigSetter
    with LazyLogging {
 
  private val totalOffHeapMemory = numPartitions * blockCacheSize
  private val totalMemTableMemory = totalOffHeapMemory
  assert(totalMemTableMemory <= totalOffHeapMemory)

  override def setConfig(storeName: String, options: Options, configs: util.Map[String, AnyRef]): Unit =
    BoundedMemoryRocksDBConfig.synchronized {
      if (!initialized) {
        sharedCache = Some(new org.rocksdb.LRUCache(totalOffHeapMemory))
        sharedWriteBufferManager = Some(new org.rocksdb.WriteBufferManager(totalOffHeapMemory, sharedCache.get))
        initialized = true
      }

      val tableConfig = options.tableFormatConfig.asInstanceOf[BlockBasedTableConfig]
      // These three options in combination will limit the memory used by RocksDB to the size passed
      // to the block cache (totalOffHeapMemory)
      tableConfig.setBlockCache(sharedCache.get)
      tableConfig.setCacheIndexAndFilterBlocks(true)
      options.setWriteBufferManager(sharedWriteBufferManager.get)
      options.setTableFormatConfig(tableConfig)

      val compactionsOptions = new CompactionOptionsFIFO()
      options.setCompactionOptionsFIFO(compactionsOptions)
      options.setTtl(ttl)
    }

  override def close(storeName: String, options: Options): Unit = {
    // do not close cache or writeBufferManager because they are shared among all stream tasks
  }
}
object BoundedMemoryRocksDBConfig {
  private var initialized = false
  private var sharedCache: Option[org.rocksdb.LRUCache] = None
  private var sharedWriteBufferManager: Option[org.rocksdb.WriteBufferManager] = None

  val Ki: Long = 1024L
  val Mi: Long = Ki * Ki
}

There are 10 partitions in the topic with an LRUCache of 50 MB shared between block cache and memtables/write buffers. In addition, Kafka Streams has a record cache that is by default 10 MB for the entire topology, as well as producer/consumer buffers, TCP send/recv buffers and a deserialization buffer of negligible sizes. According to our metrics, the JVM in the pod has 200 MB heap committed memory and another 160 MB JVM non-heap memory usage. Thus, I would expect the pod memory usage to be at most 10 * 50 MB block cache & write buffers + 10 MB record cache + 360 MB JVM memory = 870 MB, yet the pod memory usage is well above 2 GB and constantly rising.

Note that the following RocksDB metrics collected by Kafka-Streams are usually by task_id (and here sometimes aggregated as the sum over task_ids). I assume that the metric values are wrong and should really be 1/10 as high, since there are 10 topics/tasks with a shared LRUCache and RocksDB doesn't know that it is shared, therefore reporting 500 MB block cache usage for each task separately when really it is the same 500 MB memory buffer for all of them.

cri mem rss block cache usage block cache capacity size all mem tables estimate table readers mem JVM non-heap used JVM heap committed


Solution

  • Unless using Direct IO (https://github.com/facebook/rocksdb/wiki/Direct-IO), shared file cache pages from the OS can increase your RSS. If you have a way of tracking anonymous memory usage, that would be more accurate for gauging "memory usage."

    By the way, RocksDB does not typically use mmaped file cache directly (only through other functions) because it typically has to decompress and verify checksums on data before admitting it to RocksDB block cache.