apache-sparkapache-spark-sqlcassandraspark-cassandra-connectorcodahale-metrics

How to retrieve Metrics like Output Size and Records Written from Spark UI?


How do I collect these metrics on a console (Spark Shell or Spark submit job) right after the task or job is done.

We are using Spark to load data from Mysql to Cassandra and it is quite huge (ex: ~200 GB and 600M rows). When the task the done, we want to verify how many rows exactly did spark process? We can get the number from Spark UI, but how can we retrieve that number ("Output Records Written") from spark shell or in spark-submit job.

Sample Command to load from Mysql to Cassandra.

val pt = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "payment_types").option("user", "hadoop").option("password", "...").load()

pt.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "payment_types", "keyspace" -> "test"))

I want to retrieve all the Spark UI metrics on the above task mainly Output size and Records Written.

Please help.

Thanks for your time!


Solution

  • Found the answer. You can get the stats by using SparkListener.

    If your job has no input or output metrics you might get None.get exceptions which you can safely ignore by providing if stmt.

    sc.addSparkListener(new SparkListener() {
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
        val metrics = taskEnd.taskMetrics
        if(metrics.inputMetrics != None){
          inputRecords += metrics.inputMetrics.get.recordsRead}
        if(metrics.outputMetrics != None){
          outputWritten += metrics.outputMetrics.get.recordsWritten }
      }
    })
    

    Please find the below example.

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    import com.datastax.spark.connector._
    import org.apache.spark.sql._
    import org.apache.spark.storage.StorageLevel
    import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
    
    val conf = new SparkConf()
    .set("spark.cassandra.connection.host", "...")
    .set("spark.driver.allowMultipleContexts","true")
    .set("spark.master","spark://....:7077")
    .set("spark.driver.memory","1g")
    .set("spark.executor.memory","10g")
    .set("spark.shuffle.spill","true")
    .set("spark.shuffle.memoryFraction","0.2")
    .setAppName("CassandraTest")
    sc.stop
    val sc = new SparkContext(conf)
    val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
    
    var outputWritten = 0L
    
    sc.addSparkListener(new SparkListener() {
      override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
        val metrics = taskEnd.taskMetrics
        if(metrics.inputMetrics != None){
          inputRecords += metrics.inputMetrics.get.recordsRead}
        if(metrics.outputMetrics != None){
          outputWritten += metrics.outputMetrics.get.recordsWritten }
      }
    })
    
    val bp = sqlcontext.read.format("jdbc").option("url", "jdbc:mysql://...:3306/...").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "bucks_payments").option("partitionColumn","id").option("lowerBound","1").option("upperBound","14596").option("numPartitions","10").option("fetchSize","100000").option("user", "hadoop").option("password", "...").load()
    bp.save("org.apache.spark.sql.cassandra",SaveMode.Overwrite,options = Map( "table" -> "bucks_payments", "keyspace" -> "test"))
    
    println("outputWritten",outputWritten)
    

    Result:

    scala> println("outputWritten",outputWritten)
    (outputWritten,16383)