#!/usr/bin/python3.9
from pyspark.sql import SparkSession
iceberg_spark_jar = '/AKE/iceberg-spark-runtime-3.5_2.12-1.9.0.jar'
warehouse_path = '/user/hive/warehouse'
catalog = 'ake_catalog'
# Initialize Spark session with Hive support and Iceberg configuration
spark = SparkSession.builder \
.appName("PySpark Hive Iceberg Example") \
.enableHiveSupport() \
.config(f"spark.sql.catalog.{catalog}", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config(f"spark.sql.catalog.{catalog}.type", "hive") \
.config(f"spark.sql.catalog.{catalog}.uri", "thrift://localhost:9083") \
.config(f"spark.sql.catalog.{catalog}.warehouse", f"{warehouse_path}") \
.config("spark.sql.defaultCatalog", f"{catalog}") \
.config("spark.jars", f"{iceberg_spark_jar}") \
.getOrCreate()
# Example DataFrame
data = [("Alice", 34), ("Bob", 45), ("Cathy", 29)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)
# Create the database
spark.sql("CREATE DATABASE IF NOT EXISTS my_database")
spark.sql("SHOW DATABASES").show()
spark.sql("SHOW TABLES IN my_database").show()
# Create an Iceberg table directly in the database
spark.sql("""
CREATE TABLE IF NOT EXISTS my_database.iceberg_table (
name STRING,
age INT
)
USING iceberg
""")
df.write.format("iceberg").mode("overwrite").saveAsTable(f"my_database.iceberg_table")
iceberg_df = spark.read.format("iceberg").load(f"my_database.iceberg_table")
iceberg_df.show()
spark.stop()
<configuration>
<property>
<name>hive.server2.thrift.bind.host</name>
<value>0.0.0.0</value> <!-- Bind to all interfaces -->
</property>
<property>
<name>hive.server2.thrift.port</name>
<value>10000</value> <!-- Default port for HiveServer2 -->
</property>
<property>
<name>hive.server2.authentication</name>
<value>NONE</value> <!-- Authentication mode (NONE, KERBEROS, etc.) -->
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>FALSE</value>
<description>
Setting this property to true will have HiveServer2 execute
Hive operations as the user making the calls to it.
</description>
</property>
<!-- <property>
<name>hive.metastore.uris</name>
<value>thrift://0.0.0.0:9083</value>
</property> -->
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:postgresql://localhost:5432/metastore_db?createDatabaseIfNotExist=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionDriverName</name>
<value>org.postgresql.Driver</value>
<description>Driver class name for a JDBC metastore</description>
</property>
<property>
<name>javax.jdo.option.ConnectionUserName</name>
<value>hiveuser</value>
<description>Username to use against metastore database</description>
</property>
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>[PASSWORD]</value>
<description>Password to use against metastore database</description>
</property>
<property>
<name>datanucleus.autoCreateSchema</name>
<value>false</value>
</property>
</configuration>
# Drop an Iceberg table from database
spark.sql("DROP TABLE IF EXISTS my_database.iceberg_table")
Exception:
Caused by: org.datanucleus.exceptions.NucleusException: Attempt to invoke the "BONECP" plugin to create a ConnectionPool gave an error : The specified datastore driver ("org.postgresql.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:232)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:117)
at org.datanucleus.store.rdbms.ConnectionFactoryImpl.<init>(ConnectionFactoryImpl.java:82)
... 164 more
Caused by: org.datanucleus.store.rdbms.connectionpool.DatastoreDriverNotFoundException: The specified datastore driver ("org.postgresql.Driver") was not found in the CLASSPATH. Please check your CLASSPATH specification, and the name of the driver.
I have found the solution
Solution 1
# add the two line to the SparkSession.builder
.config("spark.driver.extraClassPath", "/path/to/postgresql-42.7.3.jar") \
.config("spark.executor.extraClassPath", "/path/to/postgresql-42.7.3.jar") \
Solution 2
copy your postgresql-xx.x.x.jar "postgresql-42.7.4.jar" to python site packages pyspark jars path
/usr/local/lib/python3.9/site-packages/pyspark/jars