I'm running a Kafka Streams application in Kubernetes that simply reads a topic straight into a GlobalKTable
and then offers an HTTP API to get the key-value pairs. The memory usage of this pod is for me unexpectedly high and I can not figure out why it uses so much memory.
I have configured a BoundedMemoryRocksDBConfigSetter
according to https://docs.confluent.io/platform/current/streams/developer-guide/memory-mgmt.html#rocksdb
roughly
abstract class BoundedMemoryRocksDBConfig(blockCacheSize: Long = 50 * BoundedMemoryRocksDBConfig.Mi,
numPartitions: Int = 10,
ttl: Long = 180 * 24 * 60 * 60L)
extends RocksDBConfigSetter
with LazyLogging {
private val totalOffHeapMemory = numPartitions * blockCacheSize
private val totalMemTableMemory = totalOffHeapMemory
assert(totalMemTableMemory <= totalOffHeapMemory)
override def setConfig(storeName: String, options: Options, configs: util.Map[String, AnyRef]): Unit =
BoundedMemoryRocksDBConfig.synchronized {
if (!initialized) {
sharedCache = Some(new org.rocksdb.LRUCache(totalOffHeapMemory))
sharedWriteBufferManager = Some(new org.rocksdb.WriteBufferManager(totalOffHeapMemory, sharedCache.get))
initialized = true
}
val tableConfig = options.tableFormatConfig.asInstanceOf[BlockBasedTableConfig]
// These three options in combination will limit the memory used by RocksDB to the size passed
// to the block cache (totalOffHeapMemory)
tableConfig.setBlockCache(sharedCache.get)
tableConfig.setCacheIndexAndFilterBlocks(true)
options.setWriteBufferManager(sharedWriteBufferManager.get)
options.setTableFormatConfig(tableConfig)
val compactionsOptions = new CompactionOptionsFIFO()
options.setCompactionOptionsFIFO(compactionsOptions)
options.setTtl(ttl)
}
override def close(storeName: String, options: Options): Unit = {
// do not close cache or writeBufferManager because they are shared among all stream tasks
}
}
object BoundedMemoryRocksDBConfig {
private var initialized = false
private var sharedCache: Option[org.rocksdb.LRUCache] = None
private var sharedWriteBufferManager: Option[org.rocksdb.WriteBufferManager] = None
val Ki: Long = 1024L
val Mi: Long = Ki * Ki
}
There are 10 partitions in the topic with an LRUCache
of 50 MB shared between block cache and memtables/write buffers. In addition, Kafka Streams has a record cache that is by default 10 MB for the entire topology, as well as producer/consumer buffers, TCP send/recv buffers and a deserialization buffer of negligible sizes. According to our metrics, the JVM in the pod has 200 MB heap committed memory and another 160 MB JVM non-heap memory usage. Thus, I would expect the pod memory usage to be at most 10 * 50 MB block cache & write buffers + 10 MB record cache + 360 MB JVM memory = 870 MB, yet the pod memory usage is well above 2 GB and constantly rising.
Note that the following RocksDB metrics collected by Kafka-Streams are usually by task_id (and here sometimes aggregated as the sum over task_ids). I assume that the metric values are wrong and should really be 1/10 as high, since there are 10 topics/tasks with a shared LRUCache
and RocksDB doesn't know that it is shared, therefore reporting 500 MB block cache usage for each task separately when really it is the same 500 MB memory buffer for all of them.
Unless using Direct IO (https://github.com/facebook/rocksdb/wiki/Direct-IO), shared file cache pages from the OS can increase your RSS. If you have a way of tracking anonymous memory usage, that would be more accurate for gauging "memory usage."
By the way, RocksDB does not typically use mmaped file cache directly (only through other functions) because it typically has to decompress and verify checksums on data before admitting it to RocksDB block cache.