I have a large indexed lzo file in HDFS that I would like to read in spark dataframes. The file contains lines of json documents.
posts_dir='/data/2016/01'
posts_dir
has the following:
/data/2016/01/posts.lzo
/data/2016/01/posts.lzo.index
The following works but doesn't make use of the index and hence takes long time because it only uses one mapper.
posts = spark.read.json(posts_dir)
Is there a way to make it utilize the index?
I solved this by first creating an RDD that recognizes the index and then using from_json
function to turn each line into StructType
, effectively producing similar results to spark.read.json(...)
posts_rdd = sc.newAPIHadoopFile(posts_dir,
'com.hadoop.mapreduce.LzoTextInputFormat',
'org.apache.hadoop.io.LongWritable',
'org.apache.hadoop.io.Text')
posts_df = posts_rdd.map(lambda x:Row(x[1]))\
.toDF(['raw'])\
.select(F.from_json('raw', schema=posts_schema).alias('json')).select('json.*')
I am not aware of a better or more straightforward way.