scalaapache-sparkserializationwritable

Issue with RDD of Custom Objects in Spark


I am passing a type into a flatmap like so;

 val customData: RDD[(Text/String, Custom)] = origRDD.flatMap { case(x) => parse(x)}

this returns a key value pair of String and Custom(I have used Text in place on String as well to no avail, that is the explanation of 'Text/String'). This class extends Serializable and is registered in Kryo.

When I try to run the program it just runs and never ends. By never ends I mean I have left it running for 18 hrs and it did not finish. If I change it to Text(hadoop io) with an Int (counter) instead of the custom object it finishes very quickly. When I say quickly I mean 30 mins. The data it is running through is the same data and parse(as well as flatmap) method is used in both so it is running through the exact same logic. The method it uses in the flatmap is the same parse method. The behavior declines when I change it to (Text/String, Custom) from (Text, Int).

I would like to know what do I need to add to make this work. Does it need to be a Writable?

Example of Custom object class implementation(obviously not the exact but mimics it very well);

class Custom(dateAsLong: java.lang.Long, typeOfTransaction: util.HashSet[java.lang.Long], isCustomer: Boolean, amount: String, customerMatch: ObjectMatch) extends Serializable {
//has getters and setters here 

 val startDate = dateAsLong
 val transType = typeOfTransaction
 val customer = isCustomer
 val cost = amount
 val matchedData = customerMatch

 def getStartDate(): java.lang.Long = startDate
 def getTransType(): util.HashSet[java.lang.Long] = transType
 def getCustomer(): Boolean = customer
 def getCost(): String = amount
 def getMatchedData(): ObjectMatch = matchedData
}

Example of the parse method inside of a object that extends java Serializable;

object Paser extends Serializable { 
    def parse(transaction: Transaction, customerList: util.HashMap[String, String], storeList: util.HashMap[String, String]): List[(Text, Custom)] ={ //list because flatmap emits 0, 1 or 2 
//adds items to the list depending on conditions
    var list:List[(Text, Custom)] = List()
    val typeOfTransaction = getType(transaction)
    val dateAsLong = getDate(transaction)
    val amount = getAmount(transaction)
    val customerMatch = getCustomerMatch(transaction, customerList)
    val storeMatch = getStoreMatch(transaction, storeList)
     //more fields parsed

    if (customerMatch != Some(null)){
       isCustomer = true
       val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction,      isCustomer, amount, customerMatch)
       val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
       list = list :+ (new Text(transactionId), transaction)

    }  
    if (storeMatch != Some(null)){
       isCustomer = false
       val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction,      isCustomer, typeOfTransaction, storeMatch)
       val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
       list = list :+ (new Text(transactionId), transaction)
    }
  }
 list
}

Kryo Serialization is like so;

 conf.registerKryoClasses(Array(classOf[Custom]))

Any help is appreciated with code examples or links to example.

Spark UI for (Text/String, Custom) run

Spark UI Total Job

Bottom progress with 1/11 task is the flatmap, Top is saveAsNewHadoopAPIFile

Bottom progress with 1/11 task is the flatmap, Top is saveAsNewHadoopAPIFile

The flatmap stage 0 is saveAsNewHadoopAPIFile -> filter x7 -> flatmap

Summary Metrics

Tasks

Run with (Text, Int)

Main Job Page

Completed Stages

Summary Metrics

Tasks

The slow run (Text/String, Custom) run says 1.1h, however I have let it run 18hrs. When it runs for 18hrs it progresses slowly through, however it is not ideal to let it run a day. There is something wrong, very wrong. Again the parse method is used in both so it is running through the exact same logic even though the faster run is not outputting the custom value, instead it outputs the keys Text and int.

Not sure if this is helpful but whatever is wrong is causing the scanning in the Accumulo to appear different as well. The Text, Int run has a normal increase in scans stays at relatively the same scans/s for the 30 mins then drops. When I run with the custom it increase then immediately drops down. in a ^ like fashion then drags on at a lower scan rate for hours.

Spark Version: 1.6.2, Scala 2.11


Solution

  • You should not use var list:List[(Text, Custom)] = List(). Every execution of the code list = list :+ (new Text(transactionId), transaction) creates a new list (it is not just adding to the existing List). List in Scala is immutable. You should replace it with val myListBuffer = ListBuffer[(Text, Custom)](). I am not sure if that is the only problem - but this change should help if your list is huge.

    Also, a couple of comments on writing code in Scala - there is no need to have getters and setters in a Scala class. All the members are immutable anyway. You need to think really hard before you use a var in Scala. The immutability will make your code resilient, improve readability, and easier to change.