-1

I have a RDD like this : RDD[(Any, Array[(Any, Any)])] I just want to convert it into a DataFrame. Thus i use this schema

val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false)))

val df = Seq(
  ("A",1,"12/06/2012"),
  ("A",2,"13/06/2012"),
  ("B",3,"12/06/2012"),
  ("B",4,"17/06/2012"),
  ("C",5,"14/06/2012")).toDF("C1", "C2","C3")
df.show(false)

val rdd = df.map( line => ( line(0), (line(1), line(2))))
  .groupByKey()
  .mapValues(i => i.toList).foreach(println)

val output_df = sqlContext.createDataFrame(rdd, schema)

My rdd look like this:

(B,List((3,12/06/2012), (4,17/06/2012)))    
(A,List((1,12/06/2012), (2,13/06/2012)))    
(C,List((5,14/06/2012)))

or like this

(A,[Lscala.Tuple2;@3e8f27c9)
(C,[Lscala.Tuple2;@6f22defb)
(B,[Lscala.Tuple2;@1b8692ec)

if I use:

.mapValues(i => i.toArray)

I already try this:

val output_df = sqlContext.createDataFrame(rdd, schema)

But i get :

Error:(40, 32) overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
    val output_df = sqlContext.createDataFrame(rdd, schema)

to Raphael Roth
tried the second method wich does not work, I get:

Error:(41, 24) No TypeTag available for MySchema
    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()

the first method work fine but I lost the first element of my tuple with .mapValues(i => i.map(_._2))

Do you know if I can complete the first method to keep the two elements

I resolved it converting my tuple in string but this is not elegant solution according to me because i will have to split my String tuple to read the column:

val rdd = df.map(line => ( line(0), (line(1), line(2)))).groupByKey()
      .mapValues(i => i.map(w => (w._1,w._2).toString))
      .map(i=>Row(i._1,i._2))

Thank you for the help

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
a.moussa
  • 2,977
  • 7
  • 34
  • 56
  • Possible duplicate of [How to convert rdd object to dataframe in spark](http://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark) – cheseaux Nov 09 '16 at 19:11
  • 3
    I think it will help if you add the error to the question – maasg Nov 09 '16 at 19:30
  • @a.moussa to resolve `No TypeTag available for MySchema`, you have to define the case class outside of the main method (if you have any) – Raphael Roth Nov 10 '16 at 12:01

1 Answers1

0

GroupByKey gives you a Seq of Tuples, you did not take this into account in your schema. Further, sqlContext.createDataFrame needs an RDD[Row] which you didn't provide.

This should work using your schema:

val rdd = df.map(line => (line(0), (line(1), line(2))))
  .groupByKey()
  .mapValues(i => i.map(_._2))
  .map(i=>Row(i._1,i._2))

val output_df = sqlContext.createDataFrame(rdd, schema)

You can also use a case class which can be used to map tuples (not sure of tuples schemas can be created programmatically):

 val df = Seq(
      ("A", 1, "12/06/2012"),
      ("A", 2, "13/06/2012"),
      ("B", 3, "12/06/2012"),
      ("B", 4, "17/06/2012"),
      ("C", 5, "14/06/2012")).toDF("C1", "C2", "C3")
    df.show(false)

    val rdd = df.map(line => (line(0), (line(1), line(2))))
      .groupByKey()
      .mapValues(i => i.toList)

    // this should be placed outside of main()
    case class MySchema(C1: String, C4: List[(Int, String)])

    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Hi, thanks you for your answer, It does not work, I complete my question with your remark. If you have any idea, it is realy helpful. – a.moussa Nov 10 '16 at 11:50
  • Thank you very much, it work very well when I move MySchema outside of my method – a.moussa Nov 10 '16 at 13:16