scalaapache-sparkfastutil

How to compare two datasets?


I am running a spark application that reads data from a few hive tables(IP addresses) and compares each element(IP address) in a dataset with all other elements(IP addresses) from the other datasets. The end result would be something like:

+---------------+--------+---------------+---------------+---------+----------+--------+----------+
|     ip_address|dataset1|dataset2       |dataset3       |dataset4 |dataset5  |dataset6|      date|
+---------------+--------+---------------+---------------+---------+----------+--------+----------+
| xx.xx.xx.xx.xx|     1  |              1|              0|        0|         0|      0 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              0|              0|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     0  |              0|              1|        0|         0|      1 |2017-11-06|
| xx.xx.xx.xx.xx|     1  |              1|              0|        1|         0|      0 |2017-11-06|
---------------------------------------------------------------------------------------------------

For doing the comparison, I am converting the dataframes resulting from the hiveContext.sql("query") statement into Fastutil objects. Like this:

val df= hiveContext.sql("query")
val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())

Then, I am using an iterator to iterate over each collection and write the rows to a file using FileWriter.

val dfIterator = dfBuffer.iterator()
while (dfIterator.hasNext){
     val p = dfIterator.next().toString
     //logic
}

I am running the application with --num-executors 20 --executor-memory 16g --executor-cores 5 --driver-memory 20g

The process runs for about 18-19 hours in total for about 4-5 million records with one to one comparisons on a daily basis.

However, when I checked the Application Master UI, I noticed that no activity takes place after the initial conversion of dataframes to fastutil collection objects is done (this takes only a few minutes after the job is launched). I see the count and collect statements used in the code producing new jobs till the conversion is done. After that, no new jobs are launched when the comparison is running.

Any help would be appreciated, Thank you!


Solution

  • After the line:

    val dfBuffer = new it.unimi.dsi.fastutil.objects.ObjectArrayList[String](df.map(r => r(0).toString).collect())
    

    esp. that part of the above line:

    df.map(r => r(0).toString).collect()
    

    which collect is the very main thing to notice, no Spark jobs are ever performed on dfBuffer (which is a regular local one JVM data structure).

    Does it mean that the distributed processing is not happening at all?

    Correct. collect brings all the data on a single JVM where the driver runs (and is exactly the reason why you should not be doing it unless...you know what you are doing and what problems it may cause).

    I think the above answers all the other questions.


    A possible solution to your problem of comparing two datasets (in Spark and a distributed fashion) would be to join a dataset with the reference dataset and count to compare whether the number of records didn't change.