I have a custom JdbcDialect
, and I am trying to get it registered in my Spark (v3.5.1) cluster.
The dialect works perfectly fine in a local mode. Additionally, canHandle
method implemented to always return true
for now. This is how I excluded a DB URL and the dialect from the equation.
The Spark cluster also successfully retrieves data from DB whenever I execute queries that don't require dialect customizations. This is how I made sure the cluster is healthy.
I have prepared a minimal, reproducible example.
Dialect implementation:
import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.MySQLDialect;
import scala.Option;
public class MemSQL5Dialect extends JdbcDialect {
private static class SQLBuilder extends MySQLDialect.MySQLSQLBuilder {
// My customizations
}
@Override
public Option<String> compileExpression(Expression expr) {
try {
return Option.apply(new SQLBuilder().build(expr));
} catch (Exception e) {
return Option.empty();
}
}
@Override
public boolean canHandle(String url) {
return true;
}
}
Application implementation:
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialects;
import java.net.Inet4Address;
import java.util.Properties;
import static org.apache.spark.sql.functions.col;
public class Application {
public static void main(String[] args) throws Exception {
// Cluster is running using Docker, so I use IPv4.
var hostAddress = Inet4Address.getLocalHost().getHostAddress();
// JAR with dialect is copied to the folder inside Docker container, where Bitnami Spark
// keeps all other JARs. Double-checked JAR is there and JAR includes the dialect!
var applicationJarWithDialect = "/opt/bitnami/spark/jars/spark-playground-SNAPSHOT.jar";
var sparkSession = SparkSession.builder()
.appName("Spark Playground")
// When set to "local[*]" works perfectly fine!
.master("spark://localhost:7077")
.config("spark.driver.host", hostAddress)
// I tried also these 2 properties below - did not help.
// .config("spark.driver.extraClassPath", applicationJarWithDialect)
// .config("spark.executor.extraClassPath", applicationJarWithDialect)
.getOrCreate();
// Trying to register the dialect on driver.
JdbcDialects.registerDialect(new MemSQL5Dialect());
var jdbcUrl = String.format("jdbc:mariadb://%s:3306/db", hostAddress);
var jdbcUsername = "root";
var jdbcPassword = "root";
var tableName = "phonebook";
var properties = new Properties();
properties.put("user", jdbcUsername);
properties.put("password", jdbcPassword);
properties.put("driver", "org.mariadb.jdbc.Driver");
try {
sparkSession.read().jdbc(jdbcUrl, tableName, properties)
// If I change filter to something that doesn't require
// dialect changes -- works fine as well!
.filter(col("first_name").startsWith("J"))
.collectAsList()
.forEach(System.out::println);
} finally {
sparkSession.close();
}
}
}
Please help to understand what I am missing.
It turned out, there were only 2 things that really mattered:
The application JAR with the custom dialect had to be accessible for Spark workers. Same goes for DB driver. I put those into the folder where all other Spark dependency JARs were located.
(The key thing I missed) The dialect registration had to occur on every worker, so I replaced
JdbcDialects.registerDialect(new MemSQL5Dialect());
with
new JavaSparkContext(sparkSession.sparkContext())
.parallelize(List.of(1))
.foreachPartition((VoidFunction<Iterator<Integer>>) integerIterator -> {
JdbcDialects.registerDialect(new MemSQL5Dialect());
});