scalaapache-sparkdataframecollecttake

Spark: Difference between collect(), take() and show() outputs after conversion toDF


I am using Spark 1.5.

I have a column of 30 ids which I am loading as integers from a database:

val numsRDD = sqlContext
     .table(constants.SOURCE_DB + "." + IDS)
     .select("id")
     .distinct
     .map(row=>row.getInt(0))

This is the output of numsRDD:

numsRDD.collect.foreach(println(_))

643761
30673603
30736590
30773400
30832624
31104189
31598495
31723487
32776244
32801792
32879386
32981901
33469224
34213505
34709608
37136455
37260344
37471301
37573190
37578690
37582274
37600896
37608984
37616677
37618105
37644500
37647770
37648497
37720353
37741608

Right next, I want to produce all combinations of 3 for those ids then save each combination as a tuple of the form: < tripletID: String, triplet: Array(Int)> and convert it into a dataframe, which I do as follows:

// |combinationsDF| = 4060 combinations
val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","), 
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

As soon as I do that I try to print some of the combinationsDF's contents just to make sure that everything is the way it should be. So I try this:

combinationsDF.show

which returns:

+--------------------+--------------------+
|           tripletID|             triplet|
+--------------------+--------------------+
|,37136455,3758227...|[32776244, 371364...|
|,37136455,3761667...|[32776244, 371364...|
|,32776244,3713645...|[31723487, 327762...|
|,37136455,3757869...|[32776244, 371364...|
|,32776244,3713645...|[31598495, 327762...|
|,37136455,3760089...|[32776244, 371364...|
|,37136455,3764849...|[32776244, 371364...|
|,37136455,3764450...|[32776244, 371364...|
|,37136455,3747130...|[32776244, 371364...|
|,32981901,3713645...|[32776244, 329819...|
|,37136455,3761810...|[32776244, 371364...|
|,34213505,3713645...|[32776244, 342135...|
|,37136455,3726034...|[32776244, 371364...|
|,37136455,3772035...|[32776244, 371364...|
|2776244,37136455...|[643761, 32776244...|
|,37136455,3764777...|[32776244, 371364...|
|,37136455,3760898...|[32776244, 371364...|
|,32879386,3713645...|[32776244, 328793...|
|,32776244,3713645...|[31104189, 327762...|
|,32776244,3713645...|[30736590, 327762...|
+--------------------+--------------------+
only showing top 20 rows

As it is evident, the first element of every tripletID is missing. So, just to be 100% sure I use take(20) as follows:

combinationsDF.take(20).foreach(println(_))

which returns a more detailed representation as per below:

[,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[2776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]

So now I am sure that the first id from tripletID is somehow for whatever reason deprecated. But still, if I try to use collect instead of take(20):

combinationsDF.collect.foreach(println(_))

everything goes back to being fine again (!!!):

[32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
...

1. I have exhaustively queried the steps just before I parallelize the array of combinations into an RDD and everything is ok. 2. I have also printed the output right after parallelize is applied and again everything is ok. 3. The problem appears to be related with the conversion of the numsRDD to a DF and despite my best efforts I cannot deal with it. 4. I was also incapable of reproducing the problem with mock data using the same code snippet.

So first: What's causing this problem? and second: How do I fix it?


Solution

  • I would check your original numsRDD, it looks like you might have an empty string or null value in there. This works for me:

    scala> val numsRDD = sc.parallelize(0 to 30)
    numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27
    
    scala> :pa
    // Entering paste mode (ctrl-D to finish)
    
    val combinationsDF = sc
      .parallelize(numsRDD
         .collect
         .combinations(3)
         .toArray
         .map(row => row.sorted)
         .map(row => (
            List(row(0), row(1), row(2)).mkString(","),
            List(row(0), row(1), row(2)).toArray)))
      .toDF("tripletID","triplet")
    
    // Exiting paste mode, now interpreting.
    
    combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>]
    
    scala> combinationsDF.show
    +---------+----------+
    |tripletID|   triplet|
    +---------+----------+
    |    0,1,2| [0, 1, 2]|
    |    0,1,3| [0, 1, 3]|
    |    0,1,4| [0, 1, 4]|
    |    0,1,5| [0, 1, 5]|
    |    0,1,6| [0, 1, 6]|
    |    0,1,7| [0, 1, 7]|
    |    0,1,8| [0, 1, 8]|
    |    0,1,9| [0, 1, 9]|
    |   0,1,10|[0, 1, 10]|
    |   0,1,11|[0, 1, 11]|
    |   0,1,12|[0, 1, 12]|
    |   0,1,13|[0, 1, 13]|
    |   0,1,14|[0, 1, 14]|
    |   0,1,15|[0, 1, 15]|
    |   0,1,16|[0, 1, 16]|
    |   0,1,17|[0, 1, 17]|
    |   0,1,18|[0, 1, 18]|
    |   0,1,19|[0, 1, 19]|
    |   0,1,20|[0, 1, 20]|
    |   0,1,21|[0, 1, 21]|
    +---------+----------+
    only showing top 20 rows
    

    The only other thing I can think of is mkString not working like you would expect. Try out this string interpolation (also no need to recreate the List):

    val combinationsDF = sc
      .parallelize(numsRDD
         .collect
         .combinations(3)
         .toArray
         .map(row => row.sorted)
         .map{case List(a,b,c) => (
            s"$a,$b,$c", 
            Array(a,b,c))}
      .toDF("tripletID","triplet")
    
    scala> combinationsDF.show
    +---------+----------+
    |tripletID|   triplet|
    +---------+----------+
    |    0,1,2| [0, 1, 2]|
    |    0,1,3| [0, 1, 3]|
    |    0,1,4| [0, 1, 4]|
    |    0,1,5| [0, 1, 5]|
    |    0,1,6| [0, 1, 6]|
    |    0,1,7| [0, 1, 7]|
    |    0,1,8| [0, 1, 8]|
    |    0,1,9| [0, 1, 9]|
    |   0,1,10|[0, 1, 10]|
    |   0,1,11|[0, 1, 11]|
    |   0,1,12|[0, 1, 12]|
    |   0,1,13|[0, 1, 13]|
    |   0,1,14|[0, 1, 14]|
    |   0,1,15|[0, 1, 15]|
    |   0,1,16|[0, 1, 16]|
    |   0,1,17|[0, 1, 17]|
    |   0,1,18|[0, 1, 18]|
    |   0,1,19|[0, 1, 19]|
    |   0,1,20|[0, 1, 20]|
    |   0,1,21|[0, 1, 21]|
    +---------+----------+
    only showing top 20 rows