javaapache-sparkapache-spark-sql

Custom Spark JdbcDialect is not used in cluster mode


I have a custom JdbcDialect, and I am trying to get it registered in my Spark (v3.5.1) cluster.

The dialect works perfectly fine in a local mode. Additionally, canHandle method implemented to always return true for now. This is how I excluded a DB URL and the dialect from the equation.

The Spark cluster also successfully retrieves data from DB whenever I execute queries that don't require dialect customizations. This is how I made sure the cluster is healthy.

I have prepared a minimal, reproducible example.

Dialect implementation:

import org.apache.spark.sql.connector.expressions.Expression;
import org.apache.spark.sql.jdbc.JdbcDialect;
import org.apache.spark.sql.jdbc.MySQLDialect;
import scala.Option;

public class MemSQL5Dialect extends JdbcDialect {

    private static class SQLBuilder extends MySQLDialect.MySQLSQLBuilder {
        // My customizations
    }

    @Override
    public Option<String> compileExpression(Expression expr) {
        try {
            return Option.apply(new SQLBuilder().build(expr));
        } catch (Exception e) {
            return Option.empty();
        }
    }

    @Override
    public boolean canHandle(String url) {
        return true;
    }
}

Application implementation:

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.jdbc.JdbcDialects;

import java.net.Inet4Address;
import java.util.Properties;

import static org.apache.spark.sql.functions.col;

public class Application {

    public static void main(String[] args) throws Exception {
        // Cluster is running using Docker, so I use IPv4.
        var hostAddress = Inet4Address.getLocalHost().getHostAddress();

        // JAR with dialect is copied to the folder inside Docker container, where Bitnami Spark
        // keeps all other JARs. Double-checked JAR is there and JAR includes the dialect!
        var applicationJarWithDialect = "/opt/bitnami/spark/jars/spark-playground-SNAPSHOT.jar";

        var sparkSession = SparkSession.builder()
                .appName("Spark Playground")
                // When set to "local[*]" works perfectly fine!
                .master("spark://localhost:7077")
                .config("spark.driver.host", hostAddress)
                // I tried also these 2 properties below - did not help.
                // .config("spark.driver.extraClassPath", applicationJarWithDialect)
                // .config("spark.executor.extraClassPath", applicationJarWithDialect)
                .getOrCreate();

        // Trying to register the dialect on driver.
        JdbcDialects.registerDialect(new MemSQL5Dialect());

        var jdbcUrl = String.format("jdbc:mariadb://%s:3306/db", hostAddress);
        var jdbcUsername = "root";
        var jdbcPassword = "root";
        var tableName = "phonebook";

        var properties = new Properties();
        properties.put("user", jdbcUsername);
        properties.put("password", jdbcPassword);
        properties.put("driver", "org.mariadb.jdbc.Driver");

        try {
            sparkSession.read().jdbc(jdbcUrl, tableName, properties)
                    // If I change filter to something that doesn't require
                    // dialect changes -- works fine as well!
                    .filter(col("first_name").startsWith("J"))
                    .collectAsList()
                    .forEach(System.out::println);
        } finally {
            sparkSession.close();
        }
    }
}

Please help to understand what I am missing.


Solution

  • It turned out, there were only 2 things that really mattered:

    1. The application JAR with the custom dialect had to be accessible for Spark workers. Same goes for DB driver. I put those into the folder where all other Spark dependency JARs were located.

    2. (The key thing I missed) The dialect registration had to occur on every worker, so I replaced

      JdbcDialects.registerDialect(new MemSQL5Dialect());
      

      with

      new JavaSparkContext(sparkSession.sparkContext())
          .parallelize(List.of(1))
          .foreachPartition((VoidFunction<Iterator<Integer>>) integerIterator -> {
              JdbcDialects.registerDialect(new MemSQL5Dialect());
          });