My join is executing as follows:
SELECT
left.*,
right.*
FROM `/foo/bar/baz` AS left
JOIN `/foo2/bar2/baz2` AS right
ON left.something = right.something
Dataset:
/foo/bar/baz
+-----------+-------+
| something | val_1 |
+-----------+-------+
| a | 1 |
| a | 2 |
| a | 3 |
| a | 4 |
| a | 5 |
| a | 6 |
| a | ... |
| a | 10K |
| b | 1 |
| b | 2 |
| b | 3 |
+-----------+-------+
Dataset: /foo2/bar2/baz2
+-----------+-------+
| something | val_2 |
+-----------+-------+
| a | 1 |
| a | 2 |
| b | 1 |
| b | 2 |
| b | 3 |
+-----------+-------+
I am getting OOM errors on my executors and I don't want to throw more memory at the executors unnecessarily. How do I ensure this join executes successfully without burning extra resources?
One tactic to getting this join to execute successfully is to do what's known as salting the join.
Salted joins operate in Spark by splitting the table with many entries per key into smaller portions while exploding the smaller table into an equivalent number of copies. This results in the same-sized output as a normal join, but with smaller task sizes for the larger table thus a decreased risk of OOM errors. You salt a join by adding a column of random numbers 0 through N to the left table and making N copies of the right table. If you add your new random column to the join, you reduce the largest bucket to 1/N of its previous size.
The secret is the EXPLODE function. EXPLODE is a cross-product:
SELECT
left.*,
right.*
FROM
(
SELECT
*,
FLOOR(RAND() * 8) AS salt
FROM `/foo/bar/baz`
) AS left
JOIN
(
SELECT
*,
EXPLODE(ARRAY(0,1,2,3,4,5,6,7)) AS salt
FROM `/foo2/bar2/baz2`
) AS right
ON
left.something = right.something
AND left.salt = right.salt
CEIL(RAND() * N)
gives you integers between 0 and N. FLOOR(RAND() * N)
gives you numbers between 0 and N — 1. Make sure you explode the correct set of numbers in your salted join!