I'm using Spark SQL to perform a simple query from my Iceberg table. Some info about the table itself because that might be useful (state from the moment of posting this question):
Code:
Long start = System.nanoTime();
SparkSession spark = getSparkSession();
System.out.println("Session creation took (ms): " + (System.nanoTime() - start) / 1000000);
Dataset<Row> data = spark.sql("SELECT * FROM myschema.mytable WHERE time BETWEEN CAST('2024-07-10 14:00:00.0' AS TIMESTAMP) AND CAST('2024-07-10 15:40:00.0' AS TIMESTAMP)");
System.out.println("Count: " + data.count());
System.out.println("Partitions: " + data.rdd().getNumPartitions());
System.out.println("Execution took (ms): " + (System.nanoTime() - start) / 1000000);
spark.stop();
Output:
Session creation took (ms): 4333
Count: 0
Partitions: 143
Execution took (ms): 107029
Important note: This number of partitions is going up when I load more and more data into my source table. If I perform the same query after some time count will remain 0, but number of partitions will be higher.
Two big questions are:
1. Why is this simple query so slow (~100 seconds) even when I deliberately extract 0 rows from source table by pointing to timestamps from the future? When I perform this query via Trino is takes 1-2 seconds. Also when I set proper timestamps and extract e.g. 500 rows it makes no difference, still runs ~100 seconds.
2. What is this number of partitions and why it keeps increasing? Why is it e.g. 143 if table has 28 partitions?
UPDATE (10.07.2024):
When I change my query to simple "SELECT * FROM myschema.mytable" it executes like 10-12x times faster.
Turned out Spark had a problem with my "SELECT..." statement for some reason, while I focused mostly on fixing partitioning.
I've formatted my times like this "yyyy-MM-dd HH:mm:ss" and replaced "spark.sql(...)" with:
spark.read().format("iceberg").load(getSourceTable()).filter(col("time").between(startTime, endTime));
After this, everything works as expected.