In our Spark app, we use Spark structured streaming
. It uses Kafka as input stream
, & HiveAcid as writeStream
to Hive table.
For HiveAcid
, it is open source library called spark acid
from qubole
: https://github.com/qubole/spark-acid
Below is our code:
import za.co.absa.abris.avro.functions.from_confluent_avro
....
val spark = SparkSession
.builder()
.appName("events")
.config("spark.sql.streaming.metricsEnabled", true)
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val input_stream_df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("startingOffsets", '{"events":{"0":2310384922,"1":2280420020,"2":2278027233,"3":2283047819,"4":2285647440}}')
.option("maxOffsetsPerTrigger", 10000)
.option("subscribe", "events")
.load()
// schema registry config
val srConfig = Map(
"schema.registry.url" -> "http://schema-registry:8081",
"value.schema.naming.strategy" -> "topic.name",
"schema.registry.topic" -> "events",
"value.schema.id" -> "latest"
)
val data = input_stream_df
.withColumn("value", from_confluent_avro(col("value"), srConfig))
.withColumn("timestamp_s", from_unixtime($"value.timestamp" / 1000))
.select(
$"value.*",
year($"timestamp_s") as 'year,
month($"timestamp_s") as 'month,
dayofmonth($"timestamp_s") as 'day
)
// format "HiveAcid" is provided by spark-acid lib from Qubole
val output_stream_df = data.writeStream.format("HiveAcid")
.queryName("hiveSink")
.option("database", "default")
.option("table", "events_sink")
.option("checkpointLocation", "/user/spark/events/checkpoint")
.option("spark.acid.streaming.log.metadataDir", "/user/spark/events/checkpoint/spark-acid")
.option("metastoreUri", "thrift://hive-metastore:9083")
.trigger(Trigger.ProcessingTime("30 seconds"))
.start()
output_stream_df.awaitTermination()
We able to deploy the app to production, & redeployed it several times (~ 10 times) without issue. Then it ran into the following error:
Query hiveSink [id = 080a9f25-23d2-4ec8-a8c0-1634398d6d29, runId = 990d3bba-0f7f-4bae-9f41-b43db6d1aeb3] terminated with exception: Job aborted due to stage failure: Task 3 in stage 0.0 failed 4 times, most recent failure: Lost task 3.3 in stage 0.0 (TID 42, 10.236.7.228, executor 3): org.apache.hadoop.fs.FileAlreadyExistsException: /warehouse/tablespace/managed/hive/events/year=2020/month=5/day=18/delta_0020079_0020079/bucket_00003 for client 10.236.7.228 already exists (...) at com.qubole.shaded.orc.impl.PhysicalFsWriter.(PhysicalFsWriter.java:95) at com.qubole.shaded.orc.impl.WriterImpl.(WriterImpl.java:177) at com.qubole.shaded.hadoop.hive.ql.io.orc.WriterImpl.(WriterImpl.java:94) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcFile.createWriter(OrcFile.java:334) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.initWriter(OrcRecordUpdater.java:602) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSimpleEvent(OrcRecordUpdater.java:423) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.addSplitUpdateEvent(OrcRecordUpdater.java:432) at com.qubole.shaded.hadoop.hive.ql.io.orc.OrcRecordUpdater.insert(OrcRecordUpdater.java:484) at com.qubole.spark.hiveacid.writer.hive.HiveAcidFullAcidWriter.process(HiveAcidWriter.scala:295) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1$$anonfun$6.apply(TableWriter.scala:153) (...) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:153) at com.qubole.spark.hiveacid.writer.TableWriter$$anon$1.apply(TableWriter.scala:139)
Each time the app is restarted, it shows different delta + bucket files
already exists error. However, those files are newly created (most probably) each time it starts, but no clue why the error is thrown.
Any pointer will be much appreciated.
I discovered the actual root cause from the worker's error log. It was due to code changes I made in one of the library used, that causes out of memory
issue.
What I posted before was the error log from the driver, after several failures on the worker node.