apache-sparkpysparkapache-hudi

Hudi with Spark perform very slow when trying to write data into filesystem


I'm trying Apache Hudi with Spark by a very simple demo:

with SparkSession.builder.appName(f"Hudi Test").getOrCreate() as spark:
    df = spark.read.option('mergeSchema', 'true').parquet('s3://an/existing/directory/')
    hudi_options = {
            'hoodie.table.name': 'users_activity',
            'hoodie.datasource.write.recordkey.field': 'users_activity_id',
            'hoodie.datasource.write.partitionpath.field': 'users_activity_id',
            'hoodie.datasource.write.table.name': 'users_activity_result',
            'hoodie.datasource.write.operation': 'upsert',
            'hoodie.datasource.write.precombine.field': 'users_activity_create_date',
        }
    df.write.format('hudi').options(**hudi_options).mode('append').save('s3://htm-hawk-data-lake-test/flink_test/copy/users_activity/')

There are about 10 parquet files in the directory; their total size is 1GB, about 6 million records. But Hudi takes a very long time to write, and it failed with org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 1409413 tasks (1024.0 MiB) is bigger than spark.driver.maxResultSize (1024.0 MiB) after 2 hours.

I have checked the Spark History Server, and it shows as below: spark ui Seems it's collecting all records in parquet files to driver and serializing them. Is it working right? How can I improve its writing performance?


Solution

  • Hudi seems to write the data without any problem, but it fails the indexing step which tries to collect a list of pairs (partition path, file id).

    You are using the field users_activity_id as a partition key and Hudi key, if the cardinality of this field is high, you will have a lot of partitions and then a very long list of pairs (partition, file_id), especially if this field is Hudi key which is supposed to be unique (6M records = 6M partitions)