scalaapache-sparkapache-spark-sqlspark-jdbc

How to parameterize writing dataframe into hive table


I have a list of tables (across different categories) in RBDMS that I want to extract and save in hive and I want to parameterize in such a way that I'll be able to attach the category name to the output location in hive. For example, I have a category "employee", I want to be able to save the extracted table from RDBMS in the format "hive_db.employee_some_other_random_name"

I have code as below

    val category = "employee"
    val tableList = List("schema.table_1", "schema.table_2", "schema.table_3")

    val tableMap = Map("schema.table_1" -> "table_1",
    "schema.table_2" -> "table_2",
    "schema.table_3" -> "table_3")

    val queryMap = Map("table_1" -> (select * from table_1) tble,
    "table_2" -> (select * from table_2) tble,
    "table_3" -> (select * from table_3) tble)

    val tableBucketMap = Map("table_1" -> "bucketBy(80,\"EMPLOY_ID\",\"EMPLOYE_ST\").sortBy(\"EMPLOY_ST\").format(\"parquet\")",
    "table_2" -> "bucketBy(80, \"EMPLOY_ID\").sortBy(\"EMPLOY_ID\").format(\"parquet\")",
    "table_3" -> "bucketBy(80, \"EMPLOY_ID\", \"SAL_ID\", \"DEPTS_ID\").sortBy(\"EMPLOY_ID\").format(\"parquet\")")

     for (table <- tableList){
       val tableName = tableMap(table)
       val print_start = "STARTING THE EXTRACTION PROCESSING FOR TABLE: %s"
       val print_statement = print_start.format(tableName)
       println(print_statement)

       val extract_query = queryMap(table)
       val query_statement_non = "Query to extract table %s is: "
       val query_statement = query_statement_non.format(tableName)
       println(query_statement + extract_query)


       val extracted_table = spark.read.format("jdbc")
         .option("url", jdbcURL)
         .option("driver", driver_type)
         .option("dbtable", extract_query)
         .option("user", username)
         .option("password", password)
         .option("fetchsize", "20000")
         .option("queryTimeout", "0")
         .load()

       extracted_table.show(5, false)
       //saving extracted table in hive
       val tableBucket = tableBucketMap(table)
       val output_loc = "hive_db.%s_table_extracted_for_%s"
       val hive_location = output_loc.format(category, tableName)
       println(hive_location)

       val saving_table = "%s.write.%s.saveAsTable(\"%s\")"
       saving_table.format(extracted_table, tableBucket, hive_location)
       println(saving_table.format(extracted_table, tableBucket, hive_location))
  
       val print_end = "COMPLETED EXTRACTION PROCESS FOR TABLE: %s"
       val print_end_statement = print_end.format(tableName)
       println(print_end_statement)

I have result below for the first table. Same result is applicable to the other tables..

STARTING THE EXTRACTION PROCESSING FOR TABLE: table_1
Query to extract table table_1 is: (select * from table_1) tble
+---------+--------------------+
|EMPLOY_ID|EMPLOYE_NM          |
+---------+--------------------+
|1        |WELLINGTON          |
|2        |SMITH               |
|3        |CURLEY              |
|4        |PENDRAGON           |
|5        |KEESLER             |
+---------+--------------------+
only showing top 5 rows

hive_db.employee_table_extracted_for_table_1

[EMPLOY_ID: int, EMPLOYE_NM: string].write.bucketBy(80, "EMPLOY_ID", "EMPLOYE_NO").sortBy("EMPLOY_ID").format("parquet").saveAsTable("hive_db.employee_table_extracted_for_table_1")

COMPLETED EXTRACTION PROCESS FOR TABLE: table_1

Instead of writing the extracted dataframe into hive, it just printed the column names

[EMPLOY_ID: int, EMPLOYE_NM: String].write............saveAsTable("hive_db.employee_table_extracted_for_table_1")

How can I make it to write the DF into hive table?


Solution

  • can you try this approach, change your bucket map as like this(i have done for t1, please do the same for t2 & t3),

        val tableBucketMap = Map("table_1" -> "80,\"employe_st\"")
    

    and replace the df.bucketBy() with enough arguements as like (numBuckets: Int, colName: String, colNames: String*)

       
               val stringArr=tableBucket.split(",")
               val numBuckets=stringArr(0).toInt
               val colName=stringArr(1)
               
               extracted_table.write.mode("append").bucketBy(numBuckets,colName).format("parquet").saveAsTable(hive_location)
               
    

    this approach will sort out the mentioned issue

    [EMPLOY_ID: int, EMPLOYE_NM: String].write............saveAsTable("hive_db.employee_table_extracted_for_table_1")