apache-sparkpysparkoptimizationiteration

Is there another way to construct this column in pyspark that avoids an iterative algorithm?


I have two dataframes, df1:

  | attribute1 | attribute2  |
  | --------   | --------    |
  | value1     | value2      |
  | value1     | value4      | 

This is a dataframe with around 20 million rows.

And df2:

  | attribute1 | attribute2 | account |
  | --------   | --------   | ------- |
  | value1     | value2     |  101    |
  | value1     | whatever   |  102    |

This has around 1k entries. This df2 is supposed to act like a function in the following sense: if a row in df1 has value1 and value2 as its attributes, then it should be assigned account 101, if a row in df1 has attribute1 with value = value1 and any other thing in attibute2 except value2, then it should be assigned account 102.

So in some way i need to end with the following dataframe (df1 with the account column retrieved):

  | attribute1 | attribute2  | account |
  | --------   | --------    | ------- |
  | value1     |   value2    |  101    |
  | value1     |   value4    |  102    |

What i need is to recover the "account" data based on attribute1 and attribute2 on df1.

I have accomplished this using a fibonacci-like algorithm:

join_condition1 = [attribute1, attribute2]
join_condition2 = [attribute1]

(in general i have around 10 join_conditions obtained ordering them by specificity, meaning a condition is more specific than another if it has more not null attributes)

piece1 = df1.join(df2, join_condition1, left).filter(account is not null)
piece2 = df1.subtract(piece1.select(df1.columns)).join(df2, join_condition2, left).filter(account is not null)

(in general piece3, piece4,..., piece10)

total_piece = piece1.union(piece2) (union piece3 etc.) 

I have written this code like this because the user needs only to update df2 with say another row = (value1, value3, 103) so that the code automatically uses this info to retrieve account column for df1 and the code of the app needs not to be touched.

The problem is the computational resources that this requires, my application dies because of timeout.

Is there another way to do this that is more efficient?

Sorry if the format is not very good, this is my first question.

Thanks in advance.


Solution

  • I think a combination of create_map from pyspark.sql.functions, chain from itertools and mapping might get you a more performant way.

    This requires you are able to map your algorithm to a dict as per this SO answer: ttps://stackoverflow.com/a/42983199/2186184

    Since you have multiple conditions you might need nested dicts, not sure if that is allowed in create_map though.