amazon-web-servicesapache-sparkjoinpyspark

Join Two 100k table taking longer than half hours


I am using pyspark to join two tables with 100k rows for each (so not skewed join). It takes longer than 30mins even an hour which I think something is wrong here. The code is just regular join

a = b.join(c, b.id == c.id, "inner").drop(c.id)

I did a lot search and try, including:

Neither works.

My question is: if both tables (pyspark.sql.dataframe object) came from using udf, does it matter? This is the only difference compared with common use.

I used the following udf logic to prepare the tables:

def func(row):
    id = row.split(",")[0]
    f1, f2 = ", ".join(row.split(",")[1:-1]), int(row.split(",")[-1])
    return (int(id), f1, f2)

func_udf = udf(func, 
              StructType([
                StructField("id", IntegerType(), True),
                StructField("f1", StringType(), True),
                StructField("f2", IntegerType(), True)
               ]))
df = df.withColumn("Result", func_udf(col("data")))
df = df.drop(df.data).select("Result.*")

df is the table used for join.

Any troubleshooting idea is appreciated. Thank you.

P.S. table b has 3 columns and table c has 6 columns. So they are not wide. Also, if I shrink size to 10k, the join works as expected.


Solution

  • I figured out. I would like share my experience below:

    1. For my case, I join tables first and later parse columns using UDF. Certainly the table with UDF parsed show poor performance to join (as pointed out by Jonathan below, using spark API could be a better option).
    2. Even though the tables do not have many columns, but some data in columns is super big (long string). This also impacts the join performance.
    3. Seems df.cache() is helpful.
    4. Checking Logical plan is a good idea to check too.