apache-sparkpysparkamazon-redshiftdatabricksazure-databricks

Error while reading data from databricks jdbc connection to redshift


We use a databricks cluster, that is shutdown after 30 mins of inactivity(13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)). My objective is to read a redshift table and write it to snowflake, I am using the following code:

df = spark.read \
  .format("redshift") \
  .option("url", jdbc_url) \
  .option("user", user) \
  .option("password", password) \
  .option("dbtable", "schem_name.table_name") \
  .option("partitionColumn", "date_col1")\
  .option("lowerBound", "2023-11-05")\
  .option("upperBound", "2024-03-23")\
  .option("numPartitions", "500")\
  .load()\
  .filter("date_col1>dateadd(month ,-6,current_date)")\
  .filter(col("country_col").isin('India', 'China', 'Japan', 'Germany', 'United Kingdom', 'Brazil', 'United States', 'Canada'))
df1 = df.repartition(900)#Data is skedwed for that partition column, so repartitioning to 1* num cores in cluster for even dist
df1.write.format("snowflake") \
        .option("host", host_sfl) \
        .option("user", user_sfl) \
        .option('role', role_sfl) \
        .option("password", password_sfl) \
        .option("database", database_sfl) \
        .option("sfWarehouse", warehouse_sfl) \
        .option("schema",'schema_name')\
        .option("dbtable",'target_table_name')\
        .mode('Overwrite') \
        .save()
It throws the following error, despite not having used query option in my code:
IllegalArgumentException: requirement failed: 
    Options 'query' and 'partitionColumn' can not be specified together.
Please define the query using `dbtable` option instead and make sure to qualify
the partition columns using the supplied subquery alias to resolve any ambiguity.
Example :
spark.read.format("jdbc")
  .option("url", jdbcUrl)
  .option("dbtable", "(select c1, c2 from t1) as subq")
  .option("partitionColumn", "c1")
  .option("lowerBound", "1")
  .option("upperBound", "100")
  .option("numPartitions", "3")
  .load()

When I comment out the repartition and write to snowflake code and just do a count, it gives me the correct count.

Here is another observation: If I change the above code to JDBC instead of redshift in .format("redshift") after doing the count, the code works.

I don't know what is happening here. The job keeps failing the first time the cluster is restarted, and I have to do a count first manually and change it to JDBC to work, Please let me know if I am missing something obvious. I have gone through a lot of documentation and couldn't find what I need.


Solution

  • So this is what I ended up doing that worked.

    count_df = spark.read \
                    .format("com.databricks.spark.redshift") \
                    .option("dbtable", tbl1) \
                    .option("url", url) \
                    .option("user", user) \
                    .option("password", pwd) \
                    .load()\
                    .limit(1)
    count_df.count()```
    And then the code in the question started working, a dummy count action with a different driver parameter(com.databricks.spark.redshift) before running the code in the question.