I am doing an ETL in spark which sometimes takes a lot of time. I want to gracefully shutdown the spark session after a certain time.
I am writing my code in Pyspark.
try:
df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output)
exception:
spark.stop()
I would like to stop spark after sometime in the above code.
Is there a way to gracefully shutdown the spark session after sometime??
I would suggest use the official python Timer to stop the Spark session gracefully:
import threading
def timer_elapsed():
print('Timer elapsed')
if not sc._jsc.sc().isStopped():
spark.stop()
# wait for 0.5 sec for Spark job to complete
spark_timer = threading.Timer(0.5, timer_elapsed)
spark_timer.start()
try:
df_final.write.partitionBy("col1","col2","col3").mode("append").format("orc").save(output
print('Spark job finished successfully.')
except Exception as e:
spark_timer.cancel() # stop timer, we don't need to wait if error occured
if not sc._jsc.sc().isStopped():
spark.stop()
Note: We stop the session in two cases if time has elapsed or exception was caught. Before requesting to stop the Spark context we check if the context is active with sc._jsc.sc().isStopped
which calls the Java API directly.