javaapache-sparkapache-spark-sqlspark-structured-streamingrocksdb

Spark Structured Streaming StateStore Exception with RocksDBStateStoreProvider


I'm developing a Spark structured streaming application with a stream-stream join logic, leveraging RocksDB as the backbone for storing the state. The setup includes Kafka as the source for my streams, and I'm using Java 17 and Spark 3.5.0 in local mode. I'm encountering an issue in my Spark structured streaming application that involves stream-stream join. I want to use RocksDB as my Spark application state store. I have configured the state store provider using spark.sql.streaming.stateStore.providerClass to be org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider. After running multiple batches, I'm facing the following exception. I cleaned the check point directory multiple time and rerun the app and the exception happened every time. there is a many to one relationship between the join id's in my source streams and I couldn't find any data corruption in my source streams. I'm using Kafka as a source for my streams, running Spark locally, and utilizing a Windows directory as the checkpoint directory.

I also tried spark version 3.4.0 but the exception still happened. I appreciate any insights or guidance on resolving this issue. Thank you!

org.rocksdb.RocksDBException: Mismatch in unique ID on table file 27. Expected: {7568779327299478048,6781697239205038417} Actual: {7568779327299478056,2042577083335893353} in file C:\Windows\Temp\spark-fed38dd1-aae2-45ab-ad2f-8e0f02ea223b\StateStoreId(opId=0,partId=1,name=left-keyWithIndexToValue)-a692f8b0-e03c-4475-8925-0d589f67d628\workingDir-c71dca44-dcbc-4057-91d2-afce3d7cb7b4/MANIFEST-000005
    at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-8.3.2.jar!/:?]
    at org.rocksdb.RocksDB.open(RocksDB.java:249) ~[rocksdbjni-8.3.2.jar!/:?]
    at org.apache.spark.sql.execution.streaming.state.RocksDB.openDB(RocksDB.scala:584) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.state.RocksDB.load(RocksDB.scala:154) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider.getStore(RocksDBStateStoreProvider.scala:194) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:507) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$StateStoreHandler.getStateStore(SymmetricHashJoinStateManager.scala:417) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.<init>(SymmetricHashJoinStateManager.scala:600) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.<init>(SymmetricHashJoinStateManager.scala:386) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.<init>(StreamingSymmetricHashJoinExec.scala:529) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.processPartitions(StreamingSymmetricHashJoinExec.scala:276) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1(StreamingSymmetricHashJoinExec.scala:241) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec.$anonfun$doExecute$1$adapted(StreamingSymmetricHashJoinExec.scala:241) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinHelper$StateStoreAwareZipPartitionsRDD.compute(StreamingSymmetricHashJoinHelper.scala:295) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.sql.execution.SQLExecutionRDD.compute(SQLExecutionRDD.scala:55) ~[spark-sql_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:328) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.scheduler.Task.run(Task.scala:141) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) ~[spark-common-utils_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) ~[spark-common-utils_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) ~[spark-core_2.12-3.5.0.jar!/:3.5.0]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) [spark-core_2.12-3.5.0.jar!/:3.5.0]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]```







Solution

  • My use case is quite similar, I'm using Kafka as a source for my streams, and I'm doing lots of stateful operations.

    I was running Spark in production since a year in version 3.3.2 and everything was working fine, until I tried last week to upgrade from 3.3.2 to 3.5.0. The streams were working fine in our tests environments, but ended up crashing in production after a few hundreds of batches (probably because of the quantity of data) with the exact same error.

    I tried a few things, but nothing worked. I ended up downgrading my Spark to version 3.3.2, and everything is working again.

    If you really want to stick to 3.5.0, I think that setting spark.shuffle.service.db.backend to LEVELDB, and removing the RocksDBStateStoreProvider should fix your problem.