I use spark-3.4.1-hadoop3 on windows 11. And I try to generate the schema to pass into from_csv function parameter. Belows are my codes.
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_csv;
import static org.apache.spark.sql.functions.not;
import java.util.HashMap;
import java.util.Map;
SparkSession spark = SparkSession.builder().appName("FromCsvStructExample").getOrCreate();
Dataset<Row> df = spark.read().format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/path/to/csv/file");
Map<String, String> options = new HashMap<String, String>();
String schemaString = "name string, age int, job string";
Column schema = from_csv(col("csv"), col(schemaString), options);
Dataset<Row> parsed = df.select(schema.as("data"));
parsed.printSchema();
spark.close();
But the codes throw the following exceptions.
Exception in thread "main" org.apache.spark.sql.AnalysisException: [INVALID_SCHEMA.NON_STRING_LITERAL] The input schema "name string, age int, job string" is not a valid schema string. The input expression must be string literal and not null.
at org.apache.spark.sql.errors.QueryCompilationErrors$.unexpectedSchemaTypeError(QueryCompilationErrors.scala:1055)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalTypeExpr(ExprUtils.scala:42)
at org.apache.spark.sql.catalyst.expressions.ExprUtils$.evalSchemaExpr(ExprUtils.scala:47)
at org.apache.spark.sql.catalyst.expressions.CsvToStructs.<init>(csvExpressions.scala:72)
at org.apache.spark.sql.functions$.from_csv(functions.scala:4955)
at org.apache.spark.sql.functions.from_csv(functions.scala)
at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:43)
I am afraid the schemaString is not correct for org.apache.spark.sql.functions.col
function. Kindly inform me how to generate the schema with org.apache.spark.sql.functions.col
function. I know there is overloaded from_csv function which schema parameter type is StructType. But in using this function I have to make scala function and I even have no basic knowledge of scala.
== Updated Part
I try to use the Java-specific from_csv method.
from_csv(Column e, Column schema, java.util.Map<String,String> options)
As you know the type of schema is not StructType, but Column. I am stuck on this part. I have no idea how to generate the Column type schema in java. If you have any reference which generate the java Column type schema, Kindly inform me how.
You are right you can't generate directly a Column
given a DDL string. One way is to use lit or StructType.fromDDL function. As you already mentioned one signature of from_csv function accepts a StructType for the schema. Then the Scala code would look as next:
import org.apache.spark.sql.types.StructType
var schema: StructType = StructType.fromDDL("name string, age int, job string")
// StructType(
// StructField(name,StringType,true),
// StructField(age,IntegerType,true),
// StructField(job,StringType,true)
// )
val targetCol = from_csv(col("csv"), schema, options)
The code should be very similar for Java.
As per the other signature of from_csv, which accepts a Column instead of a StructType
, it is used in combination with lit
function as shown in the corresponding unit test. This is for cases where you prefer passing the schema as a string.
For your case that would have been:
val schema = "name string, age int, job string"
val targetCol = from_csv(col("csv"), lit(schema), options)