scalaapache-sparkrddcustom-object

Updating RDD of Objects


I have just started with learning scala and facing some issues regarding manipulation of RDD of objects.

I have the same problem as stated in the below link

Update the internal state of RDD elements

Is there any other way to achieve the solution of the problem stated in the above link? Also is it possible to use dataset or dataframe to achieve what we are trying to do?


Solution

  • Immutability is one of key concept of functional programming. You can't change RDD or data inside, but you can create new RDD based on data from old RDD.

    I modified example from the link in your question to show how such transformation usually looks like.

    //just case class with foo and bar fields that can be empty.
    case class Test (foo: Option[Double], bar: Option[Double], someOtherVal: String)
    
    // as you can see this is not actually "update"
    // it creates new Test with "updated" foo and bar fields 
    // NOTE: this logic usually lives outside data object 
    def updateFooBar(t: Test) = Test(Some(Math.random()), Some(Math.random()),t.someOtherVal)
    
    
    val testList = Array.fill(5)(Test(None,None,"someString"))
    val testRDD = sc.parallelize(testList)
    
    //creates new RDD based on old one by applying updateFooBar to each element. 
    val newRdd = testRDD.map{ x => updateFooBar(x) }
    //or just  val newRdd = testRDD.map(updateFooBar)
    
    newRdd.collect().foreach { x=> println(x.foo+"~"+x.bar+"~"+x.someOtherVal) }
    

    You can transform Dataset exactly the same way as RDD:

    val newDs = testRDD.toDS().map( x => updateFooBar(x))
    

    or using Dataframe:

    import org.apache.spark.sql.functions.typedLit
    
    val newDf = testRDD.toDF()
      .withColumn("foo",typedLit(Some(Math.random())))
      .withColumn("bar",typedLit(Some(Math.random())))