I've looked into my job and have identified that I do indeed have a skewed task. How do I determine what the actual value is inside this task that is causing the skew?
My Python Transforms code looks like this:
from transforms.api import Input, Output, transform
@transform(
...
)
def my_compute_function(...):
...
df = df.join(df_2, ["joint_col"])
...
Skew problems originate from anything that causes an exchange in your job. Things that cause exchanges include but are not limited to: join
s, window
s, groupBy
s.
These operations result in data movement across your Executors based upon the found values inside the DataFrames used. This means that when a used DataFrame has many repeated values on the column dictating the exchange, those rows all end up in the same task, thus increasing its size.
Let's consider the following example distribution of data for your join:
DataFrame 1 (df1)
| col_1 | col_2 |
|-------|-------|
| key_1 | 1 |
| key_1 | 2 |
| key_1 | 3 |
| key_1 | 1 |
| key_1 | 2 |
| key_2 | 1 |
DataFrame 2 (df2)
| col_1 | col_2 |
|-------|-------|
| key_1 | 1 |
| key_1 | 2 |
| key_1 | 3 |
| key_1 | 1 |
| key_2 | 2 |
| key_3 | 1 |
These DataFrames when joined together on col_1
will have the following data distributed across the executors:
key_1
from df1key_1
from df2key_2
from df1key_2
from df2key_3
from df2If you therefore look at the counts of input and output rows per task, you'll see that Task 1 has far more data than the others. This task is skewed.
The question now becomes how we identify that key_1
is the culprit of the skew since this isn't visible in Spark (the underlying engine powering the join).
If we look at the above example, we see that all we need to know is the actual counts per key of the joint column. This means we can:
The easiest way to do this is by opening the Analysis (Contour) tool in Foundry and performing the following analysis:
Add df1
as input to a first path
Add Pivot Table
board, using col_1
as the rows, and Row count
as the aggregate
Click the ā Switch to pivoted data
button
Use the Multi-Column Editor
board to keep only col_1
and the COUNT
column. Prefix each of them with df1_
, resulting in an output from the path which is only df1_col_1
and df1_COUNT
.
Add df2
as input to a second path
Add Pivot Table
board, again using col_1
as the rows, and Row count
as the aggregate
Click the ā Switch to pivoted data
button
Use the Multi-Column Editor
board to keep only col_1
and the COUNT
column. Prefix each of them with df2_
, resulting in an output from the path which is only df2_col_1
and df2_COUNT
.
Create a third path, using the result of the first path (df1_col_1
and df1_COUNT1
)
Add a Join
board, making the right side of the join the result of the second path (df2_col_1
and df2_col_1
). Ensure the join type is Full join
Add all columns from the right side (you don't need to add a prefix, all the columns are unique
Configure the join board to join on df1_col_1
equals df2_col_1
Add an Expression
board to create a new column, output_row_count
which multiplies the two COUNT
columns together
Add a Sort
board that sorts on output_row_count
descending
If you now preview the resultant data, you will have a sorted list of keys from both sides of the join that are causing the skew