javascalaapache-sparkparquet

Is it possible to write data from spark executors in java spark?


I have a java spark application that gets data from kafka, performs some work on said data and then save parquet files to s3 using spark's .write() command. Up until this point my app was saving all received data into spark driver and then would save data using current spark session. Which works fine.

The simplified, generic code of what I have now is as so:

Main class

public static void main(String[] args) throws Exception {
  ... // setting configs
  Processing pr = new Processing(...); // initialising all the classes here
  pr.run();
}

Processing class

private DummyClass dummyClass;

// constructor is here
public void run() {
  ... // some work and fetching data
  Dataset<Row> myData = ... // selecting and preparing the data
  myData.collectAsList().forEach(row -> {
    String myField = row.getAs("colName")
    ... //some more work
    dummyClass.createParquet(myField);
  });
}

DummyClass class

private SparkUtility sparkUtility;

// constructor here

public void createParquet(String myField) {
  List<Row> rowVals = new ArrayList<>();
  StructType schema = createSchema();
  ...// some work to populate rowVals list
  String s3Path = "s3a://bucket/key/key/";
  sparkUtility.writeParquet(rowVals,schema,s3Path);
}

private StructType createSchema() {
  StructType structType = new StructType();
  structType = structType.add("col1", DataTypes.StringType, false);
  structType = structType.add("col1w", DataTypes.StringType, false);
  return structType;
}

SparkUtility class

private SparkSession session;

// constructor here

private SparkSession getSparkSession() {
  SparkConf sparkConf = new SparkConf()
                          .setAppName("myName")
                          // further settings here
                          .set("fs.s3a.endpoint", "s3-us-east-1.amazonaws.com");
  return SparkSession.builder().config(sparkConf).getOrCreate();
}

public void writeParquet(List<Row> entries, StructType structType,String path) {
  session.createDataFrame(entries,structType)
    .write().mode("overwrite").format("parquet").save(path);
}

This works and it is fine. However, I now want to make change to Processing class as so:

// constructor is here
public void run() {
  ... // some work and fetching data
  Dataset<Row> myData = ... // selecting and preparing the data
  kafkaDF.foreachPartition(partition -> {
            DummyClass dummy = new DummyClass(...); // initialising classes in executors
            partition.forEachRemaining(record -> {
              String myField = row.getAs("colName");
              ... //some more work
              dummyClass.createParquet(myField);
            });
  });

The rest of the code right now is unchanged. The code executes fine but it fails to save data and throws the following exception:

Cannot invoke "scala.Option.map(scala.Function1)" because the return value of "org.apache.spark.sql.SparkSession.parentSessionState()" is null

From what I understand, this is because I am trying to use spark session in executors. So how can I convert Dataset into parquet and save in s3? If there a way to access session and tell it to save data using .write() method?

I tried to create a fresh session but that is not permitted. And various attempts to fetch session results in the same error.


Solution

  • You shouldn't attempt to collect or foreachpartition and write, these are very much anti-patterns in Spark. Use Spark's inbuilt functions for transforming data and for saving parquet files from the Dataset itself.

    It's not clear from your code samples what you are trying to do for each row but look at the Dataset.map function and the functions.explode if you want to create more rows from each input row. Ideally, for performance, your transformations should use the Spark functions directly and not map with Row changing code.

    Move your logic to use Spark's inbuilt functionality which is designed to run in a distributed fashion.