I am trying to do some performance optimization for Spark job using bucketing technique. I am reading .parquet
and .csv
files and do some transformations. After I am doing bucketing and join two DataFrames. Then I am writing joined DF to parquet but I have an empty file of ~500B
instead of 500Mb
.
Blob
val readParquet = spark.read.parquet(inputP)
readParquet
.write
.format("parquet")
.bucketBy(23, "column")
.sortBy("column")
.mode(SaveMode.Overwrite)
.saveAsTable("bucketedTable1")
val firstTableDF = spark.table("bucketedTable1")
val readCSV = spark.read.csv(inputCSV)
readCSV
.filter(..)
.ordrerBy(someColumn)
.write
.format("parquet")
.bucketBy(23, "column")
.sortBy("column")
.mode(SaveMode.Overwrite)
.saveAsTable("bucketedTable2")
val secondTableDF = spark.table("bucketedTable2")
val resultDF = secondTableDF
.join(firstTableDF, Seq("column"), "fullouter")
.
.
resultDF
.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.parquet(output)
When I launch Spark job in command line using ssh
I have correct result, ~500Mb
parquet file which I can see using Hive. If I run the same job using oozie workflow I have an empty file (~500 Bytes
).
When I do .show()
on my resultDF
I can see the data but I have empty parquet file.
+-----------+---------------+----------+
| col1| col2 | col3|
+-----------+---------------+----------+
|33601234567|208012345678910| LOL|
|33601234567|208012345678910| LOL|
|33601234567|208012345678910| LOL|
There is no problem writing to parquet when I am not saving data as a table. It occurs only with DF created from table.
Any suggestions ?
Thanks in advance for any thoughts!
I figured it out for my case I just added an option .option("path", "/sources/tmp_files_path")
. Now I can use bucketing and I have a data in my output files.
readParquet
.write
.option("path", "/sources/tmp_files_path")
.mode(SaveMode.Overwrite)
.bucketBy(23, "column")
.sortBy("column")
.saveAsTable("bucketedTable1")