apache-sparkpysparkapache-spark-sqlparquetdata-partitioning

Spark write speed performance test while loading data from Teradata to S3 in parquet format


I have a requirement wherein I need to migrate tables from Teradata to DELL ECS S3, with the data being written in parquet format. I have been given a Spark cluster with single worker node of 1GB size and a driver of 2GB size. I am trying to test the performance of my spark code by using small dataset first: was able to load a table of 0.3GB size to S3. The process took around 1 min 3 secs to finish. This is the best performance that I have achieved for this dataset in which I just fetched the data in single partition, cached and wrote it back in a single partition to S3: had tried partitioning the data at source and target side but all those methods took longer than 1 min 3 secs.

I would like to know if there is a way to improve the performance further, or is this the max I can get for the compute resource that I have at my disposal?

This is roughly the code that I have written for getting it done(pretty straightforward since it's just data lift and shift):

df = spark.read.format("jdbc")\
.option("user", "user_name")\
.option("password","pwd")\
.option("url","jdbc:teradata://servername/LOGMECH=TD2")\
.option("driver","com.teradata.jdbc.TeraDriver")\
.option("query","sql_query_for_source_table").load()

df.cache()

filepath = 's3a://s3_bucket_name/prefix/tablename'

#For S3 write
df.write.mode("overwrite").format("parquet").option("compression","snappy").save(filepath)

The code that I used while partitioning at source/tgt level looks like:

df = spark.read.format("jdbc")\
.option("user", "user_name")\
.option("password","pwd")\
.option("url","jdbc:teradata://servername/LOGMECH=TD2")\
.option("driver","com.teradata.jdbc.TeraDriver")\
.option("dbtable","db_name.table_name")\
.option("partitionColumn","Business_Date")\
.option("LowerBound","lowest_value_of_business_date")\
.option("upperBound","highest_value_of_business_date")\
.option("numPartitions","This value was based on different granular level of the business date column, so could be the number of years in the tables or number of quarters or months").load()

df.cache()

#Added another column based on the value given in numPartitions to reduce the number of partitions in dataframe as the data volume is less

tgt_df = df.withColumn("part_col", {logic for extracting the year/month/quarter from Business_date column})
tgt_df.write.partitionBy("part_col").mode("overwrite").format("parquet").option("compression","snappy").save(filepath)

Just to add: Had got values as 4, 31 and 11 while partitioning business date column at yearly, monthly and quarterly level respectively.

Apologies for the long post, just wanted every detail to be mentioned.


Solution

  • The way to make this faster is to get more worker nodes, and give it more data. Use .coalesce() instead of .repartition() or .partitionBy() to reduce data movement. But with only one worker, the fastest you can get is pretty limited - without parallelism it's just one computer reading one s3 file.