apache-sparkspark-structured-streamingqubolespark-hivespark-checkpoint

Spark Structured Streaming using spark-acid writeStream (with checkpoint) throwing org.apache.hadoop.fs.FileAlreadyExistsException


In our Spark app, we use Spark structured streaming. It uses Kafka as input stream, & HiveAcid as writeStream to Hive table. For HiveAcid, it is open source library called spark acid from qubole: https://github.com/qubole/spark-acid

Below is our code:

import za.co.absa.abris.avro.functions.from_confluent_avro
....

val spark = SparkSession
  .builder()
  .appName("events")
  .config("spark.sql.streaming.metricsEnabled", true)
  .enableHiveSupport()
  .getOrCreate()

import spark.implicits._

val input_stream_df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("startingOffsets", '{"events":{"0":2310384922,"1":2280420020,"2":2278027233,"3":2283047819,"4":2285647440}}')
  .option("maxOffsetsPerTrigger", 10000)
  .option("subscribe", "events")
  .load()

// schema registry config
val srConfig = Map(
  "schema.registry.url"           -> "http://schema-registry:8081",
  "value.schema.naming.strategy"  -> "topic.name",
  "schema.registry.topic"         -> "events",
  "value.schema.id"               -> "latest"
)

val data = input_stream_df
  .withColumn("value", from_confluent_avro(col("value"), srConfig))
  .withColumn("timestamp_s", from_unixtime($"value.timestamp" / 1000))
  .select(
    $"value.*",
    year($"timestamp_s")       as 'year,
    month($"timestamp_s")      as 'month,
    dayofmonth($"timestamp_s") as 'day
  )

// format "HiveAcid" is provided by spark-acid lib from Qubole
val output_stream_df = data.writeStream.format("HiveAcid")
  .queryName("hiveSink")
  .option("database", "default")
  .option("table", "events_sink")
  .option("checkpointLocation", "/user/spark/events/checkpoint")
  .option("spark.acid.streaming.log.metadataDir", "/user/spark/events/checkpoint/spark-acid")
  .option("metastoreUri", "thrift://hive-metastore:9083")
  .trigger(Trigger.ProcessingTime("30 seconds"))
  .start()

output_stream_df.awaitTermination()

We able to deploy the app to production, & redeployed it several times (~ 10 times) without issue. Then it ran into the following error:

Query hiveSink [id = 080a9f25-23d2-4ec8-a8c0-1634398d6d29, runId = 990d3bba-0f7f-4bae-9f41-b43db6d1aeb3] terminated with exception: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 42, 10.236.7.228, executor 3): org.apache.hadoop.fs.FileAlreadyExistsException: /warehouse/tablespace/managed/hive/events/year=2020/month=5/day=18/delta_0020079_0020079/bucket_00003 for client 10.236.7.228 already exists (...) at com.qubole.shaded.orc.impl.PhysicalFsWriter.(PhysicalFsWriter.java:95) at com.qubole.shaded.orc.impl.WriterImpl.(WriterImpl.java:177) at com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.(WriterImpl.java:94) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:334) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.initWriter(OrcRecordUpdater.java:602) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:423) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:432) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:484) at com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:295) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153) (...) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:153) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:139)

Each time the app is restarted, it shows different delta + bucket files already exists error. However, those files are newly created (most probably) each time it starts, but no clue why the error is thrown.

Any pointer will be much appreciated.


Solution

  • I discovered the actual root cause from the worker's error log. It was due to code changes I made in one of the library used, that causes out of memory issue.

    What I posted before was the error log from the driver, after several failures on the worker node.