pysparkapache-spark-sqlhdfsdelta-lakejohnsnowlabs-spark-nlp

trying to use johnsnow pretrained pipeline on spark dataframe but unable to read delta file in the same session


i am using the below code to read the spark dataframe from hdfs:

from delta import *
from pyspark.sql import SparkSession



builder= SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark=configure_spark_with_delta_pip(builder).getOrCreate()


#change file path here

delta_df = spark.read.format("delta").load('hdfs://localhost:9000/final_project/data/2022-03-30/')


delta_df.show(10, truncate=False)

and below code to use the pretrained pipeline:

from sparknlp.pretrained import PipelineModel
from pyspark.sql import SparkSession
import sparknlp

# spark session one way
spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[4]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2").getOrCreate()


# alternate way #uncomment below to use
#spark=sparknlp.start(spark32=True)


# unzip the file and change path here
pipeline = PipelineModel.load("/home/sidd0613/final_project/classifierdl_bertwiki_finance_sentiment_pipeline_en_3.3.0_2.4_1636617651675")


print("-------")

# creating a spark data frame from the sentence
df=spark.createDataFrame([["As interest rates have increased, housing rents have also increased."]]).toDF('text')

# passing dataframe to the pipeline to derive sentiment
result = pipeline.transform(df)

#printing the result
print(result)

print("DONE!!!")

i wish to merge these two codes but the two spark sessions are not merging or not working for both tasks together. please help!

i tried merging the .config() options of both spark sessions and it did not work also i tried to create two spark sessions but it did not work

a normal spark session is enough to read other format files but to read a delta file i had to strictly use this option : configure_spark_with_delta_pip(builder)

is there any way to bypass this? or to make the code running?


Solution

  • The configure_spark_with_delta_pip is just a shortcut to setup correct parameters of the SparkSession... If you look into its source code you'll see following code, you'll see that everything it's doing is configuring the spark.jars.packages. But because you're using it separately for SparkNLP, you're overwriting Delta's value.

    Update 14.04.2022: it wasn't released at time of answer, but available in version 1.2.0

    To handle such situations, configure_spark_with_delta_pip has an additional parameter extra_packages to specify additional packages to be configured. So in your case the code should look as following:

    builder = SparkSession.builder.appName("MyApp") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", 
                "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.driver.memory","16G")\
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.kryoserializer.buffer.max", "2000M")
    
    my_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2"]
    
    spark=configure_spark_with_delta_pip(builder, extra_packages=my_packages) \
      .getOrCreate()
    

    Before that implementation with extra parameters is released, you need to avoid using that function, and simply configure all parameters yourself, like this:

    scala_version = "2.12"
    delta_version = "1.1.0"
    all_packages = ["com.johnsnowlabs.nlp:spark-nlp-spark32_2.12:3.4.2", 
       f"io.delta:delta-core_{scala_version}:{delta_version}"]
    
    spark = SparkSession.builder.appName("MyApp") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog", 
                "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.driver.memory","16G")\
        .config("spark.driver.maxResultSize", "0") \
        .config("spark.kryoserializer.buffer.max", "2000M") \
        .config("spark.jars.packages", ",".join(all_packages)) \
        .getOrCreate()