pythonamazon-deequ

How to submit a PyDeequ job from Jupyter Notebook to a Spark/YARN


How to configure the environment to submit a PyDeequ job to a Spark/YARN (client mode) from a Jupyter notebook. There is no comprehensive explanation other than those using the environment. How to setup the environment to use with non-AWS environment?

There are errors caused such as TypeError: 'JavaPackage' object is not callable if just follow the example e.g. Testing data quality at scale with PyDeequ.

from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
    .onData(df) \
    .addAnalyzer(Size()) \
    .addAnalyzer(Completeness("review_id")) \
    .addAnalyzer(ApproxCountDistinct("review_id")) \
    .addAnalyzer(Mean("star_rating")) \
    .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
    .addAnalyzer(Correlation("total_votes", "star_rating")) \
    .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
    .run()
                    
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
/tmp/ipykernel_499599/1388970492.py in <module>
      1 from pydeequ.analyzers import *
----> 2 analysisResult = AnalysisRunner(spark) \
      3     .onData(df) \
      4     .addAnalyzer(Size()) \
      5     .addAnalyzer(Completeness("review_id")) \

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in onData(self, df)
     50         """
     51         df = ensure_pyspark_df(self._spark_session, df)
---> 52         return AnalysisRunBuilder(self._spark_session, df)
     53 
     54 

~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in __init__(self, spark_session, df)
    122         self._jspark_session = spark_session._jsparkSession
    123         self._df = df
--> 124         self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
    125 
    126     def addAnalyzer(self, analyzer: _AnalyzerObject):

TypeError: 'JavaPackage' object is not callable

Solution

  • HADOOP_CONF_DIR

    Copy the contents of $HADOOP_HOME/etc/hadoop from the Hadoop/YARN master node to the local host and set the HADOOP_CONF_DIR environment variable to point to the directory.

    Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration.

    os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"
    

    PYTHONPATH

    pyspark

    Need to be able to load the pyspark python modules. Install pyspark with pip or conda which installs the Spark runtime libraries (for standalone). Or copy the pyspark python modules $SPARK_HOME/python/lib from the Spark installation.

    Ensure the SPARK_HOME environment variable points to the directory where the tar file has been extracted. Update PYTHONPATH environment variable such that it can find the PySpark and Py4J under SPARK_HOME/python/lib. One example of doing this is shown below:

    export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo >"${ZIPS[*]}"):$PYTHONPATH
    
    sys.path.extend([
        "/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
        "/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
    ])
    

    PyDeequ

    Install pydeequ with pip or conda. Note that this is not enough to use pydeequ.

    Deequ JAR files

    Deequ jar to the library path

    To use the PyDeequ, need the deequ jar file. Download the one for the Spark/Deequ version from the Maven repository com.amazon.deequ.

    import os
    import sys
    
    root = os.path.dirname(os.path.realpath(os.getcwd()))
    deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
    classpath = f"{root}/jar/deequ-2.0.0-spark-3.1.jar"
    
    !wget -q -O $classpath $deequ_jar
    

    Spark Session

    Specify the Deequ jar files to the Spark jar properties as specified in:

    spark = SparkSession.builder\
        .master('yarn') \
        .config('spark.submit.deployMode', 'client') \
        .config("spark.driver.extraClassPath", classpath) \
        .config("spark.jars.packages", pydeequ.deequ_maven_coord) \
        .config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
        .config('spark.debug.maxToStringFields', 100) \
        .config('spark.executor.memory', '2g') \
        .getOrCreate()
    

    Deequ Job

    Using the excerpt of the Amazon product review data. enter image description here

    df = spark.read.csv(
        path=f"file:///{root}/data/amazon_product_reviews.csv.gz",
        header=True,
    )
    df.printSchema()
    -----
    root
     |-- review_id: string (nullable = true)
     |-- marketplace: string (nullable = true)
     |-- product_id: string (nullable = true)
     |-- year: string (nullable = true)
     |-- star_rating: string (nullable = true)
     |-- total_votes: string (nullable = true)
     |-- helpful_votes: string (nullable = true)
     |-- product_category: string (nullable = true)
    
    from pydeequ.analyzers import *
    analysisResult = AnalysisRunner(spark) \
        .onData(df) \
        .addAnalyzer(Size()) \
        .addAnalyzer(Completeness("review_id")) \
        .addAnalyzer(ApproxCountDistinct("review_id")) \
        .addAnalyzer(Mean("star_rating")) \
        .addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
        .addAnalyzer(Correlation("total_votes", "star_rating")) \
        .addAnalyzer(Correlation("total_votes", "helpful_votes")) \
        .run()
                        
    analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
    analysisResult_df.show()
    -----
    21/08/16 11:17:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                    
    +-------+---------------+-------------------+------+
    | entity|       instance|               name| value|
    +-------+---------------+-------------------+------+
    | Column|      review_id|       Completeness|   1.0|
    | Column|      review_id|ApproxCountDistinct|1040.0|
    |Dataset|              *|               Size|1000.0|
    | Column|top star_rating|         Compliance| 0.657|
    +-------+---------------+-------------------+------+