I have a RDD like this: RDD[(Any, Array[(Any, Any)])]
I just want to convert it into a DataFrame. Thus I use this schema
val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false)))
val df = Seq(
("A",1,"12/06/2012"),
("A",2,"13/06/2012"),
("B",3,"12/06/2012"),
("B",4,"17/06/2012"),
("C",5,"14/06/2012")).toDF("C1", "C2","C3")
df.show(false)
val rdd = df.map( line => ( line(0), (line(1), line(2))))
.groupByKey()
.mapValues(i => i.toList).foreach(println)
val output_df = sqlContext.createDataFrame(rdd, schema)
My rdd look like this:
(B,List((3,12/06/2012), (4,17/06/2012)))
(A,List((1,12/06/2012), (2,13/06/2012)))
(C,List((5,14/06/2012)))
or like this
(A,[Lscala.Tuple2;@3e8f27c9)
(C,[Lscala.Tuple2;@6f22defb)
(B,[Lscala.Tuple2;@1b8692ec)
if I use:
.mapValues(i => i.toArray)
I already try this:
val output_df = sqlContext.createDataFrame(rdd, schema)
But I get:
Error:(40, 32) overloaded method value createDataFrame with alternatives:
(data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
val output_df = sqlContext.createDataFrame(rdd, schema)
to Raphael Roth
tried the second method which does not work, I get:
Error:(41, 24) No TypeTag available for MySchema
val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()
the first method work fine but I lost the first element of my tuple with .mapValues(i => i.map(_._2))
Do you know if I can complete the first method to keep the two elements
I resolved it converting my tuple in string but this is not elegant solution according to me because I will have to split my String tuple to read the column:
val rdd = df.map(line => ( line(0), (line(1), line(2)))).groupByKey()
.mapValues(i => i.map(w => (w._1,w._2).toString))
.map(i=>Row(i._1,i._2))
Thank you for the help
GroupByKey
gives you a Seq of Tuples, you did not take this into account in your schema. Further, sqlContext.createDataFrame
needs an RDD[Row]
which you didn't provide.
This should work using your schema
:
val rdd = df.map(line => (line(0), (line(1), line(2))))
.groupByKey()
.mapValues(i => i.map(_._2))
.map(i=>Row(i._1,i._2))
val output_df = sqlContext.createDataFrame(rdd, schema)
You can also use a case class
which can be used to map tuples (not sure of tuples schemas can be created programmatically):
val df = Seq(
("A", 1, "12/06/2012"),
("A", 2, "13/06/2012"),
("B", 3, "12/06/2012"),
("B", 4, "17/06/2012"),
("C", 5, "14/06/2012")).toDF("C1", "C2", "C3")
df.show(false)
val rdd = df.map(line => (line(0), (line(1), line(2))))
.groupByKey()
.mapValues(i => i.toList)
// this should be placed outside of main()
case class MySchema(C1: String, C4: List[(Int, String)])
val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()