apache-sparkapache-spark-sqllistener

Get all read/write SQL queries made in spark job using Spark Listener


I want to get list of all read/write queries that have been made (using dataset API) in the current spark job. For example,

Dataset<Row> readDataFrame = spark.read()
            .format("jdbc")
            .option("url", drivingUrl)
            .option("dbtable", "Select * from A where country_code='US'")
            .option("driver", driver)
            .load();

I expect to capture the query: Select * from A where country_code='US'. I tried using listeners for this so that I can capture this info for any spark-submit job I am running without having to alter the main code itself.

What I tried

  1. QueryExecutionListener
@Override
    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        SparkPlan sparkPlan = qe.executedPlan();
        //Tried to search the methods/properties inside it, but couldn't find anything
    }

I tried finding in the SQLMetrics, child spark plans etc, but couldn't get the info I'm searching for.

  1. SparkListenerSQLExecutionStart
@Override
    public void onOtherEvent(SparkListenerEvent event) {
        if (event instanceof SparkListenerSQLExecutionStart) {
            SparkListenerSQLExecutionStart sparkListenerSQLExecutionStart = (SparkListenerSQLExecutionStart) event;
            SparkPlanInfo sparkPlanInfo = sparkListenerSQLExecutionStart.sparkPlanInfo();


            System.out.println(sparkListenerSQLExecutionStart.description());
            System.out.println(sparkListenerSQLExecutionStart.details());
            System.out.println(sparkListenerSQLExecutionStart.physicalPlanDescription());
    }

Here also, these details(and other I looked) didn't had the query info I was looking for.

I believe it's possible to capture this info as I have seen projects like SparkSplineAgent and questions in StackOverflow like this have it, but I haven't been able to figure out how.

Can anyone please help me out here?


Solution

  • After lot of trial and error, I finally found a way to do the above. In the listener that implements QueryExecutionListener, I added

    @Override
    public void onSuccess(String funcName, QueryExecution qe, long durationNs) {
        LogicalPlan executedPlan = qe.analyzed();
    
        //maintain a queue to keep track of plans to process
        Queue<LogicalPlan> queue = new LinkedList<>();
        queue.add(executedPlan);
    
        while (!queue.isEmpty()) {
            //get the first plan from queue
            LogicalPlan curPlan = queue.remove();
    
            if (curPlan instanceof LogicalRelation) {
                LogicalRelation logicalRelation = (LogicalRelation) curPlan;
                BaseRelation baseRelation = logicalRelation.relation();
    
                if (baseRelation instanceof JDBCRelation) {
                    JDBCRelation jdbcRelation = (JDBCRelation) baseRelation;
                    System.out.println(jdbcRelation.jdbcOptions().table());
                }
                System.out.println(logicalRelation.relation());
               
            }
    
            //add all child plans to the queue
            Iterator<LogicalPlan> childItr = curPlan.children().iterator();
            while (childItr.hasNext()) {
                LogicalPlan logicalPlan = childItr.next();
                queue.add(logicalPlan);
            }
        }
    }
    

    This gave me the desired output of

    SELECT * from A where country_code='US'