javaapache-sparkspark-java

java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.spark.unsafe.types.VariantVal


I evaluate Spark 4 try_variant_get method handling variant type data. First I make sql statements examples.

CREATE TABLE family (
  id INT,
  data VARIANT
);


INSERT INTO family (id, data)
VALUES
(1, PARSE_JSON('{"name":"Alice","age":30}')),
(2, PARSE_JSON('[1,2,3,4,5]')),
(3, PARSE_JSON('42'));

When SQL is executed, no errors are brought. Then Below codes are the select command using try_variant_get method

SELECT
  id,
  try_variant_get(data, '$.name', 'STRING') AS name,
  try_variant_get(data, '$.age', 'INT') AS age
FROM
  family
WHERE 
  try_variant_get(data, '$.name', 'STRING') IS NOT NULL;

SQL output is successfully returned. Then I transform these SQL statements into java api codes.

SparkSession spark = SparkSession.builder().master("local[*]").appName("VariantExample").getOrCreate();

StructType schema = new StructType()
       .add("id", DataTypes.IntegerType)
       .add("data", DataTypes.VariantType);

Dataset<Row> df = spark.createDataFrame(
       Arrays.asList(
            RowFactory.create(1, "{\"name\":\"Alice\",\"age\":30}"),
            RowFactory.create(2, "[1,2,3,4,5]"),
            RowFactory.create(3, "42")
       ),
       schema
);

 Dataset<Row> df_sel = df.select(
            col("id"),
            try_variant_get(col("data"), "$.name", "String").alias("name"),
            try_variant_get(col("data"), "$.age", "Integer").alias("age")
        ).where("name IS NOT NULL");

df_sel.printSchema();
df_sel.show();

But these java codes throw the following exceptions.

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

Exception in thread "main" java.lang.ClassCastException: class java.lang.String cannot be cast to class org.apache.spark.unsafe.types.VariantVal (java.lang.String is in module java.base of loader 'bootstrap'; org.apache.spark.unsafe.types.VariantVal is in unnamed module of loader 'app')
        at org.apache.spark.sql.catalyst.expressions.variant.VariantGet.nullSafeEval(variantExpressions.scala:282)
        at org.apache.spark.sql.catalyst.expressions.BinaryExpression.eval(Expression.scala:692)
        at org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:159)
        at org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(InterpretedMutableProjection.scala:89)
        at org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation$$anonfun$apply$48.$anonfun$applyOrElse$83(Optimizer.scala:2231)
        at scala.collection.immutable.List.map(List.scala:247)
        at scala.collection.immutable.List.map(List.scala:79).....

The "String" parameter of try_variant_get method has some problems. But I have no idea what is wrong with these java codes. Kindly inform me how to fix these errors.


Solution

  • In your Java code, you're constructing a DataFrame with the data column as a String instead of the expected Variant type, causing a ClassCastException.

    We use try_parse_json() to handle the conversion of JSON strings into a format compatible with try_variant_get.

    Solution:

    import org.apache.spark.sql.*;
    import org.apache.spark.sql.types.*;
    import static org.apache.spark.sql.functions.*;
    
    import java.util.Arrays;
    
    public class VariantExample {
        public static void main(String[] args) {
            SparkSession spark = SparkSession.builder()
                .master("local[*]")
                .appName("VariantExample")
                .getOrCreate();
    
            // Define schema with String type for input data
            StructType schema = new StructType()
                .add("id", DataTypes.IntegerType)
                .add("data", DataTypes.StringType);
    
            // Create DataFrame with raw string data
            Dataset<Row> df = spark.createDataFrame(
                Arrays.asList(
                    RowFactory.create(1, "{\"name\":\"Alice\",\"age\":30}"),
                    RowFactory.create(2, "[1,2,3,4,5]"),
                    RowFactory.create(3, "42")
                ),
                schema
            );
    
            // Convert 'data' column to JSON-compatible format using try_parse_json()
            Dataset<Row> dfWithParsedJson = df.withColumn("data", expr("try_parse_json(data)"));
    
            // Use try_variant_get to extract fields
            Dataset<Row> df_sel = dfWithParsedJson.select(
                    col("id"),
                    expr("try_variant_get(data, '$.name', 'STRING')").alias("name"),
                    expr("try_variant_get(data, '$.age', 'INT')").alias("age")
                )
                .where("name IS NOT NULL");
    
            // Show results
            df_sel.printSchema();
            df_sel.show();
        }
    }
    

    Good luck!