pythonpysparkdatabricks

How to use ntile() windows function or similar on hundreds of columns in AWS Databricks


I've a table with 20 million rows and 400+ columns. Aside from the first column, I need to change all other columns to evenly distributed deciles, independent of other columns. My data reside in AWS Databricks. I'm running the python notebook as job using a job cluster with the following configuration:

Multi-Node with Enable Autoscaling checked
Driver: i3en.3xlarge · Workers: i3en.3xlarge · 2-10 workers · DBR: 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)]

So the sample data and desired output (ie, desired is last two columns) may look like this:

person_id col2 col400 col2_decile col400_decile
a 0.01 0.10 1 1
b 0.15 0.25 2 2
c 0.20 0.30 3 3
d 0.25 0.30 3 3
e 0.30 0.40 4 4
f 0.35 0.45 4 5
g 0.40 0.50 5 6
h 0.45 0.55 6 6
i 0.50 0.60 6 8
j 0.55 0.62 7 8
k 0.55 0.33 8 4
l 0.56 0.64 8 9
n 0.05 0.59 1 7
o 0.19 0.22 2 1
p 0.42 0.49 5 5
q 0.51 0.23 7 2
r 0.75 0.63 10 9
s 0.82 1.02 10 10
t 0.59 0.76 9 10
u 0.57 0.57 9 7

Attempt 1: My first attempt used Pandas qcut feature, but unsurprisingly that errored when converting to pandas Dataframe.

import pandas as pd
from pyspark.sql import SparkSession

#import data
df = spark.sql("select * from my_space.table1")

# Convert to a pandas DataFrame
pandas_df = df.toPandas()

# List of columns to calculate deciles
columns = pandas_df.columns[1:]

# Loop through each column and calculate deciles (1 to 10)
for col in columns:
    # Use qcut to assign a decile rank (1 to 10)
    pandas_df[col + '_decile'] = pd.qcut(pandas_df[col], 10, labels=False, duplicates='drop') + 1  # Deciles 1-10

# Convert the DataFrame to a Spark DataFrame
spark_df = spark.createDataFrame(pandas_df)    

# Select the 'person_id' column along with columns that end with '_decile'
decile_columns = ['person_id'] + [col for col in spark_df.columns if col.endswith('_decile')]

# Create a new DataFrame with only 'person_id' and decile columns
decile_df = spark_df.select(decile_columns)

#some minor stuff here where I saved a new df as df_renamed and renamed some columns

# Write it to a table
df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table")

This is the error when converting to pandas Dataframe. From my understanding, increasing this size may not even help because pandas is not very performant with large data.

SparkException: 
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 164 tasks (23.4 GiB) is bigger than local result size limit 23.3 GiB, to address it, set spark.driver.maxResultSize bigger than your dataset result size.

Attempt 2: Using Pyspark's ntile function, but the job is still running after 28 hours.

from pyspark.sql import functions as F
from pyspark.sql.window import Window

# import data
df = spark.sql("select * from my_space.table1")

# List of columns to calculate deciles
columns = df.columns[1:]

# define number of deciles
num_deciles = 10

# List to hold the transformations for each column
decile_columns = []

for col in columns:
    window_spec = Window.orderBy(col)
    
    # Apply the ntile function to create deciles based on the column's values
    decile_columns.append(F.ntile(num_deciles).over(window_spec).alias(f'{col}_decile'))

# Apply the transformations to the DataFrame
df_with_deciles = df.select('*', *decile_columns)

# Select the 'person_id' column along with columns that end with '_decile'
decile_columns2 = ['person_id'] + [col for col in df_with_deciles.columns if col.endswith('_decile')]

# Create a new DataFrame with only 'person_id' and decile columns along with their values
decile_df = df_with_deciles.select(decile_columns2)

#some minor stuff here where I saved a new df as df_renamed and renamed some columns 

# Write it to a table
df_renamed.write.mode("overwrite").saveAsTable("my_space.my_new_table")

Question: Aside from increasing the cluster size or splitting up the columns for computing deciles and then stitching back together, is there anything else worth trying?


Solution

  • I doubled my cluster size and it still wouldn't complete after letting it run nearly all day, so I've decided to split this up and write 20 columns at a time to individual tables. Not ideal, but it worked. Each section of 20 columns took about 12 minutes to run. Then at the end I simply joined all the tables by ID and created a new table, which took just a few minutes.

    #import functions
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    
    # import data 
    df = spark.sql("select * from schema.my_table")
    
    ########### BEGIN COL SET 1 ###########
    
    # Define which columns to grab (id plus 20 other columns)
    get_columns = df.columns[:21]
    
    # Select the desired columns from the DataFrame
    df_subset = df.select(*get_columns)
    
    # Select columns 2 to 21 of the subset 
    columns_to_process = df_subset.columns[1:21]  
    
    # Define a Window specification for each column, ordered by column value
    # Each column's rank will be computed independently, sorted individually
    decile_df = df.select("id", *[
        F.ntile(10).over(Window.orderBy(F.col(col))).alias(f'{col}_decile')
        for col in columns_to_process
    ])
    
    # Rename columns to remove the "_decile" suffix
    decile_df = decile_df.select(
        *[
            F.col(col).alias(col.replace("_decile", ""))
            for col in decile_df.columns
        ]
    )
    
    # Write col set 1 to a table
    decile_df.write.mode("overwrite").saveAsTable("tempdbase.set1")
    
    ########### END COL SET 1 ###########
    
    ########### BEGIN COL SET 2 ###########
    
    # Define next set of columns: first column plus next 20 columns
    get_columns = [df.columns[0]] + df.columns[21:41]
    
    # Select the desired columns from the DataFrame
    df_subset = df.select(*get_columns)
    
    # Select columns 2 to 21 of the subset 
    columns_to_process = df_subset.columns[1:21]  
    
    # Run the ntile function
    decile_df = df.select("id", *[
        F.ntile(10).over(Window.orderBy(F.col(col))).alias(f'{col}_decile')
        for col in columns_to_process
    ])
    
    # Rename columns to remove the "_decile" suffix
    decile_df = decile_df.select(
        *[
            F.col(col).alias(col.replace("_decile", ""))
            for col in decile_df.columns
        ]
    )
    
    # Write col set 2 to a table
    decile_df.write.mode("overwrite").saveAsTable("tempdbase.set2")
    
    ########### END COL SET 2 ###########
    
    ### REPEAT ###