apache-sparkapache-spark-sqlpalantir-foundryfoundry-code-repositoriesfoundry-code-workbooks

How do I make my highly skewed join complete in Spark SQL?


My join is executing as follows:

SELECT
  left.*, 
  right.*
FROM `/foo/bar/baz` AS left
JOIN `/foo2/bar2/baz2` AS right
ON left.something = right.something

Dataset: /foo/bar/baz

+-----------+-------+
| something | val_1 |
+-----------+-------+
| a         |     1 |
| a         |     2 |
| a         |     3 |
| a         |     4 |
| a         |     5 |
| a         |     6 |
| a         |   ... |
| a         |   10K |
| b         |     1 |
| b         |     2 |
| b         |     3 |
+-----------+-------+

Dataset: /foo2/bar2/baz2

+-----------+-------+
| something | val_2 |
+-----------+-------+
| a         |     1 |
| a         |     2 |
| b         |     1 |
| b         |     2 |
| b         |     3 |
+-----------+-------+

I am getting OOM errors on my executors and I don't want to throw more memory at the executors unnecessarily. How do I ensure this join executes successfully without burning extra resources?


Solution

  • Salting the Join

    One tactic to getting this join to execute successfully is to do what's known as salting the join.

    Salted joins operate in Spark by splitting the table with many entries per key into smaller portions while exploding the smaller table into an equivalent number of copies. This results in the same-sized output as a normal join, but with smaller task sizes for the larger table thus a decreased risk of OOM errors. You salt a join by adding a column of random numbers 0 through N to the left table and making N copies of the right table. If you add your new random column to the join, you reduce the largest bucket to 1/N of its previous size.

    The secret is the EXPLODE function. EXPLODE is a cross-product:

    SELECT
      left.*, 
      right.*
    FROM
      (
        SELECT 
          *, 
          FLOOR(RAND() * 8) AS salt 
          FROM `/foo/bar/baz`
      ) AS left
    JOIN
      (
        SELECT 
          *, 
          EXPLODE(ARRAY(0,1,2,3,4,5,6,7)) AS salt 
          FROM `/foo2/bar2/baz2`
      ) AS right
    ON 
    left.something = right.something 
    AND left.salt = right.salt
    

    Tuning

    What to Watch Out For

    The Overhead of Salt