javaapache-sparkuser-defined-functions

Stop the Application in Driver when Worker is failed


There is Application, that is run at Driver.

This Application initialize SparkSession:

   SparkSession sparkSession;

And UDF function registered:

     session.udf().register(BASE64_CLOB_DECODER, (UDF2<String, String, String>) (base64, charSet) -> decode2String(base64, charSet), DataTypes.StringType);

Application runs operations in SparkSession: read, transform and write dataframe.

During trasform-operations udf-function called. Result of transformation is used in spark-write operations then. This is the realization of this function:

 public String decode2String(String inputString, String charSet) {
        try {
              byte[] byteArray =  decode2Bytes(inputString);
              return new String(byteArray, charSet);
           } catch (Exception e) {
                sparkSession.close();
                // sparkSession.stop();
                throw new DecodeFormatException(charSet);
           }
        }

The problem is that, when some exception is throwed by this function, the execution at worker is stopped with Exception:

          org.apache.spark.SparkException: Job aborted due to stage failure: executor 2): org.apache.spark.SparkException: Task failed while writing rows. …
          Caused by: org.apache.spark.SparkException: Failed to execute user defined function ….
          Caused by: exceptions.decode.DecodeFormatException:

But the Application in Driver continues to work. Application is hanging, as a result.

Neither sparkSession.close() nor sparkSession.stop() help to stop Application. Is there any way to stop Application programmicaly?


Solution

  • You're trying to close your SparkSession from inside of a UDF. This does not make sense, as that UDF is executed by the executors.

    It should be the driver that closes your SparkSession.

    It seems like you do not want your decode2String UDF to fail even once. To this end, try setting:

    Then you can catch the exception in your driver, so not inside of your decode2String UDF but right after and call sparkSession.stop() there. By the way, sparkSession.close() does not exist :)