apache-sparkpysparklzo

Can't import lzo files in pyspark


I have a csv file compressed in lzo format and I want to import it into a pyspark dataframe. Were the file not compressed, I would simply do:

import pyspark as ps

spark = ps.sql.SparkSession.builder.master("local[2]").getOrCreate()
data = spark.read.csv(fp, schema=SCHEMA, sep="\t")

where the file path fp and schema SCHEMA are properly defined elsewhere. When the file is compressed with lzo, however, this returns a dataframe filled with null values.

I have installed lzop on my machine and can decompress the file from the terminal then import it using pyspark. However, that's not a feasible solution due to hard disk space and time constraints (I have tons of lzo files).


Solution

  • It took me a long time but I found a solution. I took inspiration from this answer and tried to reproduce by hand what Maven does with Java.

    These are the steps to follow:

    1. Find the pyspark home folder: one way of doing it on Ubuntu is to run from terminal the command locate pyspark/find_spark_home.py; if it fails, make sure you installed pyspark and run the command sudo updatedb before trying again to use locate. (Make sure you select the correct installation of pyspark: you might have more than one, especially if you use virtual environments.)
    2. Download the hadoop-lzo jar from this maven repository and place it inside the $pyspark_home/jars folder.
    3. Create the folder $pyspark_home/conf.
    4. Inside this folder, create a core-site.xml file containing the following text:

      <configuration>
          <property>
              <name>io.compression.codecs</name>
              <value>
                  org.apache.hadoop.io.compress.DefaultCodec,
                  com.hadoop.compression.lzo.LzoCodec,
                  com.hadoop.compression.lzo.LzopCodec,
                  org.apache.hadoop.io.compress.GzipCodec,
                  org.apache.hadoop.io.compress.BZip2Codec
              </value>
          </property>
          <property>
              <name>io.compression.codec.lzo.class</name>
              <value>com.hadoop.compression.lzo.LzoCodec</value>
          </property>
      </configuration>
      

    Now the code in the question should work properly.