-2

I'm using Spark 1.6 with Scala. i was looking here but i didnt find a clear answer I have a big file, after filtering the first lines that contain some copyrights I want to take the header (104 fields) and convert it to StructType schema. I was thinking to use a class extends Product trait to define the schema of the Dataframe and then convert it to Dataframe according to that schema.

What is the best way to do it.

This is a sample from my file:

   text  (06.07.03.216)  COPYRIGHT © skdjh 2000-2016
    text  160614_54554.vf Database    53643_csc   Interface   574 zn  65
    Start   Date    14/06/2016  00:00:00:000
    End Date    14/06/2016  00:14:59:999
    State   "s23"

        cin. Nb      Start     End        Event       Con. Duration  IMSI
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx
        32055680    16/09/2010 16:59:59:245 16/09/2016 17:00:00:000 xxxxxxxxxxxxx

T want to convert it to SparkSQL like this schema

    ----------------------------------------------------------------------------------------
  |    cin_Nb |  Start            |   End          |      Event   |   Con_Duration  | IMSI  |
  | ----------------------------------------------------------------------------------------|
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   20556800 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx |
  |   32055680 |   16/09/2010     |   16:59:59:245 |  16/09/2016  |   17:00:00:000  | xxxxx | 
    ----------------------------------------------------------------------------------------
Community
  • 1
  • 1
Zied Hermi
  • 229
  • 1
  • 2
  • 11
  • 1
    Possible duplicate of [How to convert rdd object to dataframe in spark](http://stackoverflow.com/questions/29383578/how-to-convert-rdd-object-to-dataframe-in-spark) – Avihoo Mamka Mar 31 '17 at 10:31
  • sorry but its diffrent i was looking there but i didnt get my answer – Zied Hermi Mar 31 '17 at 10:34

2 Answers2

0

You can't use either case classes or StructType schema unfortunately! The reason is that scala does not support tuples with more than 22 parts, and both these methods make use of tuples behind the scenes. Since you have more than 22 columns, the approach doesn't work.

However, you can still do it, it's just not as nice :) What you need to do is convert it to a single column dataframe, and call the column something meaningful like "raw"

val df = rdd.toDF("raw")

Next, you need to define a function to extract the column you want for any given column:

val extractData(idx: Long) = udf[String, String, Int](raw => ???)

Now, you need to append each column you want using this function.

val columns = yourColumnNamesList.zipWithIndex

val df2 = columns.foldLeft(df){case (acc,(cname,cid)) => acc.withColumn(cname, extractData(cid)($"raw")}

Although it looks a bit horrendous doing the foldLeft, if you look at plan created by the execution planner, spark is clever enough to flatten all of this into a single map step, and the throughput is better than you'd expect.

Finally, you can drop the raw data since it isn't needed any more.

df2.drop("raw")

Alternatively!

If you're data is in a delimited format on the file system, you should look at the DataBricks csv parser, which works in 1.6 also :-)

Ben Horsburgh
  • 563
  • 4
  • 10
0

you can use zipwithindex and than filter first lines than you can use a class to check the header

class convert( var  cin_Nb:String, var start:String, var end:String, 
        var event:String, var duration:String, var zed:String,......)extends Product  with Serializable {    
      def canEqual(that: Any) = that.isInstanceOf[convert]    
      def productArity = 104   
      def productElement(idx: Int) = idx match {
                    case 0 => cin_Nb;case 1 =>  start;case 2 =>  end;
                    case 3 =>  event;case 4 =>  duration;case 5 =>  zed;
                    ..........
                    }
      }

and you convert the rdd to dataframe using this structure

hermi zied
  • 55
  • 8