scalaapache-sparkintellij-idea-2016

How to use correctly mapPartitions function


I'm doing a program with big data and that's why I'm using Spark and Scala. I need to partition the database and for this I use

var data0 = conf.dataBase.repartition (8) .persist (StorageLevel.MEMORY_AND_DISK_SER)

but then I need to do things in the partition before proceeding to work with the piece of database corresponding to that partition and for that I use

var tester = data0.mapPartitions {x =>
   configFuzzyPredProblem ()
   Strategy.getStrategy.executeStrategy (conf.iterByRun, 5, GeneratorType.HillClimbing)
 } .persist (StorageLevel.MEMORY_AND_DISK_SER)

Within the method executeStrategy() I use the database but I do not know if it is the global one or the one corresponding to that partition. How can I know which one I am using and then only perform the partition processing with the database of that partition?


Solution

  • Here is a simple example using mapPartitionsWithIndex which follows the same rules of mapPartitions - excluding the index aspect.

    You can see that inside mapPartitions you need to process an interable, an Interator Int in this example. In this case 3 partitions are processed, in your case 8, with either some entries or possibly zero entries.

    val rdd1 = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3)
    def myfunc(index: Int, iter: Iterator[Int]) : Iterator[String] = {
        iter.map(x => index + "," + x)
    }
    val rdd2 = rdd1.mapPartitionsWithIndex(myfunc)
    

    I cannot see inside your function but I assume it is OK and it will process a partition - part of your database.