How to configure the environment to submit a PyDeequ job to a Spark/YARN (client mode) from a Jupyter notebook. There is no comprehensive explanation other than those using the environment. How to setup the environment to use with non-AWS environment?
There are errors caused such as TypeError: 'JavaPackage' object is not callable
if just follow the example e.g. Testing data quality at scale with PyDeequ.
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("review_id")) \
.addAnalyzer(ApproxCountDistinct("review_id")) \
.addAnalyzer(Mean("star_rating")) \
.addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
.addAnalyzer(Correlation("total_votes", "star_rating")) \
.addAnalyzer(Correlation("total_votes", "helpful_votes")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/tmp/ipykernel_499599/1388970492.py in <module>
1 from pydeequ.analyzers import *
----> 2 analysisResult = AnalysisRunner(spark) \
3 .onData(df) \
4 .addAnalyzer(Size()) \
5 .addAnalyzer(Completeness("review_id")) \
~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in onData(self, df)
50 """
51 df = ensure_pyspark_df(self._spark_session, df)
---> 52 return AnalysisRunBuilder(self._spark_session, df)
53
54
~/home/repository/git/oonisim/aws/venv/lib/python3.8/site-packages/pydeequ/analyzers.py in __init__(self, spark_session, df)
122 self._jspark_session = spark_session._jsparkSession
123 self._df = df
--> 124 self._AnalysisRunBuilder = self._jvm.com.amazon.deequ.analyzers.runners.AnalysisRunBuilder(df._jdf)
125
126 def addAnalyzer(self, analyzer: _AnalyzerObject):
TypeError: 'JavaPackage' object is not callable
Copy the contents of $HADOOP_HOME/etc/hadoop
from the Hadoop/YARN master node to the local host and set the HADOOP_CONF_DIR
environment variable to point to the directory.
Ensure that HADOOP_CONF_DIR or YARN_CONF_DIR points to the directory which contains the (client side) configuration files for the Hadoop cluster. These configs are used to write to HDFS and connect to the YARN ResourceManager. The configuration contained in this directory will be distributed to the YARN cluster so that all containers used by the application use the same configuration.
os.environ['HADOOP_CONF_DIR'] = "/opt/hadoop/hadoop-3.2.2/etc/hadoop"
Need to be able to load the pyspark python modules. Install pyspark with pip or conda which installs the Spark runtime libraries (for standalone). Or copy the pyspark python modules $SPARK_HOME/python/lib
from the Spark installation.
Ensure the SPARK_HOME environment variable points to the directory where the tar file has been extracted. Update PYTHONPATH environment variable such that it can find the PySpark and Py4J under SPARK_HOME/python/lib. One example of doing this is shown below:
export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo >"${ZIPS[*]}"):$PYTHONPATH
sys.path.extend([
"/opt/spark/spark-3.1.2/python/lib/py4j-0.10.9-src.zip",
"/opt/spark/spark-3.1.2/python/lib/pyspark.zip"
])
Install pydeequ with pip or conda. Note that this is not enough to use pydeequ.
To use the PyDeequ, need the deequ jar file. Download the one for the Spark/Deequ version from the Maven repository com.amazon.deequ.
import os
import sys
root = os.path.dirname(os.path.realpath(os.getcwd()))
deequ_jar = "https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar"
classpath = f"{root}/jar/deequ-2.0.0-spark-3.1.jar"
!wget -q -O $classpath $deequ_jar
Specify the Deequ jar files to the Spark jar properties as specified in:
spark = SparkSession.builder\
.master('yarn') \
.config('spark.submit.deployMode', 'client') \
.config("spark.driver.extraClassPath", classpath) \
.config("spark.jars.packages", pydeequ.deequ_maven_coord) \
.config("spark.jars.excludes", pydeequ.f2j_maven_coord) \
.config('spark.debug.maxToStringFields', 100) \
.config('spark.executor.memory', '2g') \
.getOrCreate()
Using the excerpt of the Amazon product review data.
df = spark.read.csv(
path=f"file:///{root}/data/amazon_product_reviews.csv.gz",
header=True,
)
df.printSchema()
-----
root
|-- review_id: string (nullable = true)
|-- marketplace: string (nullable = true)
|-- product_id: string (nullable = true)
|-- year: string (nullable = true)
|-- star_rating: string (nullable = true)
|-- total_votes: string (nullable = true)
|-- helpful_votes: string (nullable = true)
|-- product_category: string (nullable = true)
from pydeequ.analyzers import *
analysisResult = AnalysisRunner(spark) \
.onData(df) \
.addAnalyzer(Size()) \
.addAnalyzer(Completeness("review_id")) \
.addAnalyzer(ApproxCountDistinct("review_id")) \
.addAnalyzer(Mean("star_rating")) \
.addAnalyzer(Compliance("top star_rating", "star_rating >= 4.0")) \
.addAnalyzer(Correlation("total_votes", "star_rating")) \
.addAnalyzer(Correlation("total_votes", "helpful_votes")) \
.run()
analysisResult_df = AnalyzerContext.successMetricsAsDataFrame(spark, analysisResult)
analysisResult_df.show()
-----
21/08/16 11:17:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
+-------+---------------+-------------------+------+
| entity| instance| name| value|
+-------+---------------+-------------------+------+
| Column| review_id| Completeness| 1.0|
| Column| review_id|ApproxCountDistinct|1040.0|
|Dataset| *| Size|1000.0|
| Column|top star_rating| Compliance| 0.657|
+-------+---------------+-------------------+------+