pysparkpalantir-foundryfoundry-code-repositoriesfoundry-python-transformfoundry-contour

How do I identify the value of a skewed task of my Foundry job?


I've looked into my job and have identified that I do indeed have a skewed task. How do I determine what the actual value is inside this task that is causing the skew?

My Python Transforms code looks like this:

from transforms.api import Input, Output, transform


@transform(
  ...
)
def my_compute_function(...):
  ...
  df = df.join(df_2, ["joint_col"])
  ...


Solution

  • Theory

    Skew problems originate from anything that causes an exchange in your job. Things that cause exchanges include but are not limited to: joins, windows, groupBys.

    These operations result in data movement across your Executors based upon the found values inside the DataFrames used. This means that when a used DataFrame has many repeated values on the column dictating the exchange, those rows all end up in the same task, thus increasing its size.

    Example

    Let's consider the following example distribution of data for your join:

    DataFrame 1 (df1)
    
    | col_1 | col_2 |
    |-------|-------|
    | key_1 | 1     |
    | key_1 | 2     |
    | key_1 | 3     |
    | key_1 | 1     |
    | key_1 | 2     |
    | key_2 | 1     |
    
    DataFrame 2 (df2)
    
    | col_1 | col_2 |
    |-------|-------|
    | key_1 | 1     |
    | key_1 | 2     |
    | key_1 | 3     |
    | key_1 | 1     |
    | key_2 | 2     |
    | key_3 | 1     |
    

    These DataFrames when joined together on col_1 will have the following data distributed across the executors:

    If you therefore look at the counts of input and output rows per task, you'll see that Task 1 has far more data than the others. This task is skewed.

    Identification

    The question now becomes how we identify that key_1 is the culprit of the skew since this isn't visible in Spark (the underlying engine powering the join).

    If we look at the above example, we see that all we need to know is the actual counts per key of the joint column. This means we can:

    1. Aggregate each side of the join on the joint key and count the rows per key
    2. Multiply the counts of each side of the join to determine the output row counts

    The easiest way to do this is by opening the Analysis (Contour) tool in Foundry and performing the following analysis:

    1. Add df1 as input to a first path

    2. Add Pivot Table board, using col_1 as the rows, and Row count as the aggregate

      Pivot

    3. Click the ā‡„ Switch to pivoted data button

      Switch

    4. Use the Multi-Column Editor board to keep only col_1 and the COUNT column. Prefix each of them with df1_, resulting in an output from the path which is only df1_col_1 and df1_COUNT.

    5. Add df2 as input to a second path

    6. Add Pivot Table board, again using col_1 as the rows, and Row count as the aggregate

      Pivot

    7. Click the ā‡„ Switch to pivoted data button

      Switch

    8. Use the Multi-Column Editor board to keep only col_1 and the COUNT column. Prefix each of them with df2_, resulting in an output from the path which is only df2_col_1 and df2_COUNT.

    9. Create a third path, using the result of the first path (df1_col_1 and df1_COUNT1)

    10. Add a Join board, making the right side of the join the result of the second path (df2_col_1 and df2_col_1). Ensure the join type is Full join

      Join

    11. Add all columns from the right side (you don't need to add a prefix, all the columns are unique

    12. Configure the join board to join on df1_col_1 equals df2_col_1

      Join condition

    13. Add an Expression board to create a new column, output_row_count which multiplies the two COUNT columns together

      Multiply

    14. Add a Sort board that sorts on output_row_count descending

      Sort

    15. If you now preview the resultant data, you will have a sorted list of keys from both sides of the join that are causing the skew

      preview