I have a list of tables (across different categories) in RBDMS that I want to extract and save in hive and I want to parameterize in such a way that I'll be able to attach the category name to the output location in hive. For example, I have a category "employee", I want to be able to save the extracted table from RDBMS in the format "hive_db.employee_some_other_random_name"
I have code as below
val category = "employee"
val tableList = List("schema.table_1", "schema.table_2", "schema.table_3")
val tableMap = Map("schema.table_1" -> "table_1",
"schema.table_2" -> "table_2",
"schema.table_3" -> "table_3")
val queryMap = Map("table_1" -> (select * from table_1) tble,
"table_2" -> (select * from table_2) tble,
"table_3" -> (select * from table_3) tble)
val tableBucketMap = Map("table_1" -> "bucketBy(80,\"EMPLOY_ID\",\"EMPLOYE_ST\").sortBy(\"EMPLOY_ST\").format(\"parquet\")",
"table_2" -> "bucketBy(80, \"EMPLOY_ID\").sortBy(\"EMPLOY_ID\").format(\"parquet\")",
"table_3" -> "bucketBy(80, \"EMPLOY_ID\", \"SAL_ID\", \"DEPTS_ID\").sortBy(\"EMPLOY_ID\").format(\"parquet\")")
for (table <- tableList){
val tableName = tableMap(table)
val print_start = "STARTING THE EXTRACTION PROCESSING FOR TABLE: %s"
val print_statement = print_start.format(tableName)
println(print_statement)
val extract_query = queryMap(table)
val query_statement_non = "Query to extract table %s is: "
val query_statement = query_statement_non.format(tableName)
println(query_statement + extract_query)
val extracted_table = spark.read.format("jdbc")
.option("url", jdbcURL)
.option("driver", driver_type)
.option("dbtable", extract_query)
.option("user", username)
.option("password", password)
.option("fetchsize", "20000")
.option("queryTimeout", "0")
.load()
extracted_table.show(5, false)
//saving extracted table in hive
val tableBucket = tableBucketMap(table)
val output_loc = "hive_db.%s_table_extracted_for_%s"
val hive_location = output_loc.format(category, tableName)
println(hive_location)
val saving_table = "%s.write.%s.saveAsTable(\"%s\")"
saving_table.format(extracted_table, tableBucket, hive_location)
println(saving_table.format(extracted_table, tableBucket, hive_location))
val print_end = "COMPLETED EXTRACTION PROCESS FOR TABLE: %s"
val print_end_statement = print_end.format(tableName)
println(print_end_statement)
I have result below for the first table. Same result is applicable to the other tables..
STARTING THE EXTRACTION PROCESSING FOR TABLE: table_1
Query to extract table table_1 is: (select * from table_1) tble
+---------+--------------------+
|EMPLOY_ID|EMPLOYE_NM |
+---------+--------------------+
|1 |WELLINGTON |
|2 |SMITH |
|3 |CURLEY |
|4 |PENDRAGON |
|5 |KEESLER |
+---------+--------------------+
only showing top 5 rows
hive_db.employee_table_extracted_for_table_1
[EMPLOY_ID: int, EMPLOYE_NM: string].write.bucketBy(80, "EMPLOY_ID", "EMPLOYE_NO").sortBy("EMPLOY_ID").format("parquet").saveAsTable("hive_db.employee_table_extracted_for_table_1")
COMPLETED EXTRACTION PROCESS FOR TABLE: table_1
Instead of writing the extracted dataframe into hive, it just printed the column names
[EMPLOY_ID: int, EMPLOYE_NM: String].write............saveAsTable("hive_db.employee_table_extracted_for_table_1")
How can I make it to write the DF into hive table?
can you try this approach, change your bucket map as like this(i have done for t1, please do the same for t2 & t3),
val tableBucketMap = Map("table_1" -> "80,\"employe_st\"")
and replace the df.bucketBy()
with enough arguements as like (numBuckets: Int, colName: String, colNames: String*)
val stringArr=tableBucket.split(",")
val numBuckets=stringArr(0).toInt
val colName=stringArr(1)
extracted_table.write.mode("append").bucketBy(numBuckets,colName).format("parquet").saveAsTable(hive_location)
this approach will sort out the mentioned issue
[EMPLOY_ID: int, EMPLOYE_NM: String].write............saveAsTable("hive_db.employee_table_extracted_for_table_1")