I have a simple Java/Spark app where I try to push CSV data in Iceberg format to my locally running storage (MinIO) using Iceberg's NESSIE catalog.
This is my entire code (Read CSV -> Create Table -> Write To Table):
package com.comarch;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
public class SparkIcebergNessie {
private static final String WAREHOUSE_PATH = "s3a://data/test";
private static final String NAMESPACE_NAME = "default";
private static final String TABLE_NAME = "twamp";
private static final String TABLE_PATH = WAREHOUSE_PATH + "/" + NAMESPACE_NAME + "/" + TABLE_NAME;
public static void main(String[] args) {
SparkSession spark = createSparkSession();
Dataset<Row> csv = spark.read()
.option("inferSchema","true")
.option("delimiter",",")
.option("header","true")
.csv("src/main/resources/csv/twamp.csv");
try {
// Create table
HadoopTables tables = new HadoopTables(spark.sparkContext().hadoopConfiguration());
Schema tableSchema = SparkSchemaUtil.convert(csv.schema());
PartitionSpec partitionSpec = PartitionSpec.builderFor(tableSchema).build();
tables.create(tableSchema, partitionSpec, TABLE_PATH);
// Write data to table
csv.write().format("iceberg").mode(SaveMode.Append).save(TABLE_PATH);
System.out.println("END");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static SparkSession createSparkSession() {
return SparkSession.builder()
.appName("Java Spark Iceberg Example")
.master("local")
.config("spark.ui.enabled", "false")
//Filesystem config
.config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.config("fs.s3a.endpoint", "http://127.0.0.1:9000")
.config("fs.s3a.access.key", "VUtRVIf0hg7szCp3k0Pz")
.config("fs.s3a.secret.key", "lHzYClEjh2AH5mEfRdPS720pMl3UZl7riR3uL4pL")
.config("spark.sql.warehouse.dir", WAREHOUSE_PATH)
//Nessie catalog config
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.13:1.3.0,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.13:0.77.1")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions")
.config("spark.sql.catalog.nessie", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.nessie.catalog-impl", "org.apache.iceberg.nessie.NessieCatalog")
.config("spark.sql.catalog.nessie.authentication.type", "NONE")
.config("spark.sql.catalog.nessie.uri", "http://localhost:19120/api/v1")
.config("spark.sql.catalog.nessie.ref", "main")
.config("spark.sql.defaultCatalog", "nessie")
.config("spark.sql.catalog.nessie.ref", "main")
.config("spark.sql.catalog.nessie.warehouse", "/test")
.getOrCreate();
}
}
Code executes successfully (I can see data in MinIO), but nothing happens in Nessie (no new branches, no commits, no nothing).
I've tried to play a bit with Nessie catalog properties but nothing worked. For example, when I remove last "spark.sql.catalog.nessie.warehouse" property, I get an error:
Exception in thread "main" java.lang.IllegalStateException: Parameter 'warehouse' not set, Nessie can't store data.
So it feels like Spark is indeed trying to use (or using) this catalog, but why can't I see any metadata there after code executes?
Try this instead:
spark.sql("CREATE DATABASE IF NOT EXISTS default");
Dataset<Row> csv =
spark
.read()
.option("inferSchema", "true")
.option("delimiter", ",")
.option("header", "true")
.csv("src/main/resources/csv/twamp.csv");
csv.writeTo("default.twamp").createOrReplace();