I'm following the Apache Hudi documentation to write and read a Hudi table. Here's the code I'm using to create and save a PySpark DataFrame into Azure DataLake Gen2:
tableName = "my_hudi_table"
basePath = <<table_path>>
dataGen = sc._jvm.org.apache.hudi.QuickstartUtils.DataGenerator()
inserts = sc._jvm.org.apache.hudi.QuickstartUtils.convertToStringList(dataGen.generateInserts(10))
df_hudi = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
hudi_options = {
'hoodie.table.name': tableName,
'hoodie.datasource.write.recordkey.field': 'uuid',
'hoodie.datasource.write.partitionpath.field': 'partitionpath',
'hoodie.datasource.write.table.name': tableName,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.datasource.write.precombine.field': 'ts',
'hoodie.upsert.shuffle.parallelism': 2,
'hoodie.insert.shuffle.parallelism': 2
}
df_hudi.write.format("hudi").options(**hudi_options).mode("overwrite").save(basePath)
This works, and it is generating the expected folder structure based on the partition field partitionpath
. This field takes values such as "americas/brazil/sao_paulo", and subfolders like this are created:
However, when I try to read this table I get an empty DataFrame. Interestingly, the empty DataFrame has the correct schema:
partition_column = "partitionpath"
hudi_read_options = {
'hoodie.datasource.read.partitionpath.field': partition_column,
'hoodie.file.index.enable': 'false'
}
df_hudi = spark.read.format("hudi").options(**hudi_read_options).load(basePath)
df_hudi.printSchema()
root
|-- _hoodie_commit_time: string (nullable = true)
|-- _hoodie_commit_seqno: string (nullable = true)
|-- _hoodie_record_key: string (nullable = true)
|-- _hoodie_partition_path: string (nullable = true)
|-- _hoodie_file_name: string (nullable = true)
|-- begin_lat: double (nullable = true)
|-- begin_lon: double (nullable = true)
|-- driver: string (nullable = true)
|-- end_lat: double (nullable = true)
|-- end_lon: double (nullable = true)
|-- fare: double (nullable = true)
|-- partitionpath: string (nullable = true)
|-- rider: string (nullable = true)
|-- ts: long (nullable = true)
|-- uuid: string (nullable = true)
But if I read a specific partition then I get the data:
df = spark.read.format("parquet").load(basePath + "/americas/brazil/sao_paulo/")
I feel like I might be missing some additional parameter when reading, since I can query the data using Spark Structured Streaming and it works fine:
spark.readStream.format("hudi").load(basePath)
I'm running this code in an Azure Databricks cluster with following specs:
Databricks Runtime Version - 9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)
Python version - Python 3.8.10
Having installed the following package from Maven repository: "hudi_spark3_1_2_bundle_2_12_0_10_1.jar"
I'm setting 'hoodie.file.index.enable': 'false'
when reading as suggested in this Github issue because otherwise I get the error:
NoSuchMethodError: org.apache.spark.sql.execution.datasources.FileStatusCache.putLeafFiles(Lorg/apache/hadoop/fs/Path;[Lorg/apache/hadoop/fs/FileStatus;)V
Am I missing something? Thanks in advance.
spark.read.format("hudi").load(basePath)
might not work because of the databricks issue. Then if you refer to the hudi doc about fileindex :
Since 0.9.0 hudi has support a hudi built-in FileIndex: HoodieFileIndex to query hudi table, which supports partition pruning and metatable for query. This will help improve query performance. It also supports non-global query path which means users can query the table by the base path without specifing the "*" in the query path.
Then the glob syntax should help:
hudi_read_options = {
'hoodie.file.index.enable': 'false'
}
spark.read.format("hudi").options(**hudi_read_options).load(basePath + "/*/*/*)