I am passing a type into a flatmap like so;
val customData: RDD[(Text/String, Custom)] = origRDD.flatMap { case(x) => parse(x)}
this returns a key value pair of String and Custom(I have used Text in place on String as well to no avail, that is the explanation of 'Text/String'). This class extends Serializable and is registered in Kryo.
When I try to run the program it just runs and never ends. By never ends I mean I have left it running for 18 hrs and it did not finish. If I change it to Text(hadoop io) with an Int (counter) instead of the custom object it finishes very quickly. When I say quickly I mean 30 mins. The data it is running through is the same data and parse(as well as flatmap) method is used in both so it is running through the exact same logic. The method it uses in the flatmap is the same parse method. The behavior declines when I change it to (Text/String, Custom) from (Text, Int).
I would like to know what do I need to add to make this work. Does it need to be a Writable?
Example of Custom object class implementation(obviously not the exact but mimics it very well);
class Custom(dateAsLong: java.lang.Long, typeOfTransaction: util.HashSet[java.lang.Long], isCustomer: Boolean, amount: String, customerMatch: ObjectMatch) extends Serializable {
//has getters and setters here
val startDate = dateAsLong
val transType = typeOfTransaction
val customer = isCustomer
val cost = amount
val matchedData = customerMatch
def getStartDate(): java.lang.Long = startDate
def getTransType(): util.HashSet[java.lang.Long] = transType
def getCustomer(): Boolean = customer
def getCost(): String = amount
def getMatchedData(): ObjectMatch = matchedData
}
Example of the parse method inside of a object that extends java Serializable;
object Paser extends Serializable {
def parse(transaction: Transaction, customerList: util.HashMap[String, String], storeList: util.HashMap[String, String]): List[(Text, Custom)] ={ //list because flatmap emits 0, 1 or 2
//adds items to the list depending on conditions
var list:List[(Text, Custom)] = List()
val typeOfTransaction = getType(transaction)
val dateAsLong = getDate(transaction)
val amount = getAmount(transaction)
val customerMatch = getCustomerMatch(transaction, customerList)
val storeMatch = getStoreMatch(transaction, storeList)
//more fields parsed
if (customerMatch != Some(null)){
isCustomer = true
val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction, isCustomer, amount, customerMatch)
val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
list = list :+ (new Text(transactionId), transaction)
}
if (storeMatch != Some(null)){
isCustomer = false
val transaction: Custom = getTransaction(dateAsLong, typeOfTransaction, isCustomer, typeOfTransaction, storeMatch)
val transactionId = hash(transaction.getCustomer(), transaction.getTransType(), transaction.getMatchedData().getItem())
list = list :+ (new Text(transactionId), transaction)
}
}
list
}
Kryo Serialization is like so;
conf.registerKryoClasses(Array(classOf[Custom]))
Any help is appreciated with code examples or links to example.
Spark UI for (Text/String, Custom) run
Bottom progress with 1/11 task is the flatmap, Top is saveAsNewHadoopAPIFile
The flatmap stage 0 is saveAsNewHadoopAPIFile -> filter x7 -> flatmap
Run with (Text, Int)
The slow run (Text/String, Custom) run says 1.1h, however I have let it run 18hrs. When it runs for 18hrs it progresses slowly through, however it is not ideal to let it run a day. There is something wrong, very wrong. Again the parse method is used in both so it is running through the exact same logic even though the faster run is not outputting the custom value, instead it outputs the keys Text and int.
Not sure if this is helpful but whatever is wrong is causing the scanning in the Accumulo to appear different as well. The Text, Int run has a normal increase in scans stays at relatively the same scans/s for the 30 mins then drops. When I run with the custom it increase then immediately drops down. in a ^ like fashion then drags on at a lower scan rate for hours.
Spark Version: 1.6.2, Scala 2.11
You should not use var list:List[(Text, Custom)] = List()
. Every execution of the code list = list :+ (new Text(transactionId), transaction)
creates a new list (it is not just adding to the existing List). List
in Scala is immutable. You should replace it with val myListBuffer = ListBuffer[(Text, Custom)]()
. I am not sure if that is the only problem - but this change should help if your list is huge.
Also, a couple of comments on writing code in Scala - there is no need to have getters and setters in a Scala class. All the members are immutable anyway. You need to think really hard before you use a var
in Scala. The immutability will make your code resilient, improve readability, and easier to change.