pythonapache-sparkpysparkdatabricksspark-ui

Spark number of tasks not equal to number of partitions


I have read that the number of partitions are related to the number of tasks. When I read a query plan on any job that is not the file reading job (for instance, the merge job of a join) I do see that it gets as many tasks as number of partitions of each table that come into the job. Also, it follows the spark.conf.set('spark.sql.shuffle.partitions',X) definition.

But in the reading files jobs, like parquet scan, it does not matches. For example, I need a full read on a table that is made of 258 parquet files, but that reading job decided to use 8 tasks, which is not aligned with spark.conf.set('spark.sql.shuffle.partitions',X) (assuming X is not set to 8)

So it seems on file reading jobs it selects the number of tasks independently from the number of partitions it needs to read. For instance. I have a table that when I run

df=spark.sql('select * from transaction')
df.rdd.getNumPartitions()

it says 57, and it's made of 258 parquet files (I guess 1 parquet is not equal to one partition)

But then, when scanning this file for a join that needs all rows because it groups by afterwards, it just uses 8 task.

So why if number of files is 258, number of partitions is 57 spark decides to go with 8 taks regardless of what spark.sql.shuffle.partitions says?


Solution

  • From the Spark official docs the meaning of spark.sql.shuffle.partitions is:

    Configures the number of partitions to use when shuffling data for joins or aggregations

    Which means that it only applies on data shuffling job but not file reading. Therefore, it doesn't affect the number of partitions when you read the parquet files. On the other hand, spark.sql.files.maxPartitionBytes affects:

    The maximum number of bytes to pack into a single partition when reading files. This configuration is effective only when using file-based sources such as Parquet, JSON and ORC.

    The other question that you mentioned is that there are different number of tasks when you do the file scanning purely v.s file scanning + joining and aggregation.

    Without considering the AQE enabling, I think one of the reasons might be your data is already partitioned by some columns, for example date. As parquet files have the statistical information of columns, when the query is optimized (joining), query engine realized that not all the data need to be scanned and therefore the number of partitions are different. You can setup a test case of reading the same table multiple time without further action and constant configuration, you should see the constant number of partitions and execution plan.