This is more of a clarification on my understanding. I get the error "Cannot broadcast the table that is larger than 8GB error"
Here is my pseudo code:
val riskDF = broadcast(someDF) // i want riskDF to be broadcasted as part of join, as it is small < 1GB
val processDF1 = ProcessAndJoin(riskDF) // read from another source and join with riskDF
val processDF2 = ProcessAndJoin(riskDF) // read from another source and join with riskDF
val processDF3 = ProcessAndJoin(riskDF) // read from another source and join with riskDF
// union of processDF1, processDF2 and processDF3. and then write the output to a bucket.
I have spark.sql.adaptive.enabled=true.
Understanding & Questions
I thought that the riskDF will be broadcasted only once. But based on DAG, it's reading and broadcasting multiple times.
I think there is a fixed 8GB limit set for "broadcast" "hit" as per this code reference I am not sure if the 8GB limit is set for one broadcast join hit or cumulative of all broadcast joins for the entire pipeline
I do not think this specific error is w.r.t spark.sql.autoBroadcastJoinThreshold ( in my case I am not setting it, it's left to default). As per documentation, the "broadcast" hint is independent of "spark.sql.autoBroadcastJoinThreshold".
Is my understanding right?
I am trying to set spark.sql.adaptive.enabled=false and rerun the usecase. Will update here soon.
I understood the reason/cause for the error.
We are experimenting on dataproc serverless and it is setting spark.sql.autoBroadcastJoinThreshold=16g. Because of this, joins are resulting in broadcast. But in the spark code,here, is checking for limit of 8GB. This check results in failure as the broadcasted data is obviously more than 8GB.
Ideally spark should have a default max for spark.sql.autoBroadcastJoinThreshold (=8gb). Anything higher should get reset to 8gb.