apache-sparkpalantir-foundryfoundry-code-repositoriesfoundry-code-workbooks

How do I make my Spark job run faster using executors?


I know my code is free from antipatterns since I don't have any warnings in my Authoring code editor, so I know my code is doing PySpark operations that are distributed and scalable.

My current job has 2 executors assigned to it with 2 cores each, and it runs with task parallelism of 16 as seen on the Spark Details page.

How do I make this job run faster?


Solution

  • Your Executors are the pieces of Spark infrastructure assigned to 'execute' your work. As such, the more of these 'workers' you have, the more work you are able to do in parallel and the faster your job will be.

    There's a limit to the amount your job will increase in speed however, and this is a function of the max number of tasks in your stages. Note: with AQE, your max number of tasks will increase as you increase your executor count, so you will notice the task counts increasing up to a certain point.

    For instance, if my data scale is such that I only ever have a maximum of 8 tasks (let's assume AQE is controlling this), assigning an executor count to run more than 8 tasks will waste resources and won't increase your job speed (see above note that AQE may adjust your task counts as you add executors since it's detecting that more work can be run in parallel).

    The job defaults in most Foundry environments are 2 executors with 2 cores each, and 1 core per task. This means your job is capable of running 4 cores at a time, which means 4 tasks.

    This means if your max task counts per stage in your job is 4, you won't benefit from boosting your number of executors. If, however, you observe your stages have, for instance, 16 tasks, then you can choose to increase the number of executors in your job as such:

    16 max tasks, 1 core per task. -> 16 cores needed.
    2 cores per executor -> 8 executors max.

    We could therefore jump this example job up to 8 executors for maximum performance.

    For the original question, you would bump the number of executors to 8 for maximum performance, assuming AQE hasn't increased your task counts following.

    When AQE re-examines your job and your new count of Executors, it will detect that more tasks can be run in parallel and will therefore increase your task counts to try to match the infrastructure. However, when it does this, you might end up with tasks that are smaller than you would like.

    The way AQE decides how big to make the tasks (and therefore how many tasks it will run with) is based on the setting spark.sql.adaptive.advisoryPartitionSizeInBytes and the total number of cores available in your job. If you have more cores than would be worth parallelizing (i.e. the shuffle partitions are too small), then these small partitions will be coalesced into a fewer count which will mean you then have the same wasted executor problem without AQE.

    AQE will do the best it can with the executor counts you've given it, so you may see it get faster and faster with more executors up to a point. At the point more executors doesn't mean a faster job, this is because your partition sizes are too small to be worth splitting into smaller tasks, and you've started wasting executors