I have a java spark application that gets data from kafka, performs some work on said data and then save parquet files to s3 using spark's .write()
command. Up until this point my app was saving all received data into spark driver and then would save data using current spark session. Which works fine.
The simplified, generic code of what I have now is as so:
Main class
public static void main(String[] args) throws Exception {
... // setting configs
Processing pr = new Processing(...); // initialising all the classes here
pr.run();
}
Processing class
private DummyClass dummyClass;
// constructor is here
public void run() {
... // some work and fetching data
Dataset<Row> myData = ... // selecting and preparing the data
myData.collectAsList().forEach(row -> {
String myField = row.getAs("colName")
... //some more work
dummyClass.createParquet(myField);
});
}
DummyClass class
private SparkUtility sparkUtility;
// constructor here
public void createParquet(String myField) {
List<Row> rowVals = new ArrayList<>();
StructType schema = createSchema();
...// some work to populate rowVals list
String s3Path = "s3a://bucket/key/key/";
sparkUtility.writeParquet(rowVals,schema,s3Path);
}
private StructType createSchema() {
StructType structType = new StructType();
structType = structType.add("col1", DataTypes.StringType, false);
structType = structType.add("col1w", DataTypes.StringType, false);
return structType;
}
SparkUtility class
private SparkSession session;
// constructor here
private SparkSession getSparkSession() {
SparkConf sparkConf = new SparkConf()
.setAppName("myName")
// further settings here
.set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com");
return SparkSession.builder().config(sparkConf).getOrCreate();
}
public void writeParquet(List<Row> entries, StructType structType,String path) {
session.createDataFrame(entries,structType)
.write().mode("overwrite").format("parquet").save(path);
}
This works and it is fine. However, I now want to make change to Processing
class as so:
// constructor is here
public void run() {
... // some work and fetching data
Dataset<Row> myData = ... // selecting and preparing the data
kafkaDF.foreachPartition(partition -> {
DummyClass dummy = new DummyClass(...); // initialising classes in executors
partition.forEachRemaining(record -> {
String myField = row.getAs("colName");
... //some more work
dummyClass.createParquet(myField);
});
});
The rest of the code right now is unchanged. The code executes fine but it fails to save data and throws the following exception:
Cannot invoke "scala.Option.map(scala.Function1)" because the return value of "org.apache.spark.sql.SparkSession.parentSessionState()" is null
From what I understand, this is because I am trying to use spark session in executors. So how can I convert Dataset into parquet and save in s3? If there a way to access session and tell it to save data using .write()
method?
I tried to create a fresh session but that is not permitted. And various attempts to fetch session results in the same error.
You shouldn't attempt to collect or foreachpartition and write, these are very much anti-patterns in Spark. Use Spark's inbuilt functions for transforming data and for saving parquet files from the Dataset itself.
It's not clear from your code samples what you are trying to do for each row but look at the Dataset.map function and the functions.explode if you want to create more rows from each input row. Ideally, for performance, your transformations should use the Spark functions directly and not map with Row changing code.
Move your logic to use Spark's inbuilt functionality which is designed to run in a distributed fashion.