15

So, I have a DataFrame in Spark which looks like this:

It has 30 columns: only showing some of them!

[ABCD,color,NORMAL,N,2015-02-20,1]
[XYZA,color,NORMAL,N,2015-05-04,1]
[GFFD,color,NORMAL,N,2015-07-03,1]
[NAAS,color,NORMAL,N,2015-08-26,1]
[LOWW,color,NORMAL,N,2015-09-26,1]
[KARA,color,NORMAL,N,2015-11-08,1]
[ALEQ,color,NORMAL,N,2015-12-04,1]
[VDDE,size,NORMAL,N,2015-12-23,1]
[QWER,color,NORMAL,N,2016-01-18,1]
[KDSS,color,NORMAL,Y,2015-08-29,1]
[KSDS,color,NORMAL,Y,2015-08-29,1]
[ADSS,color,NORMAL,Y,2015-08-29,1]
[BDSS,runn,NORMAL,Y,2015-08-29,1]
[EDSS,color,NORMAL,Y,2015-08-29,1]

So, I have to convert this dataFrame into a key-Value Pair in Scala, using the key as some of the columns in the Dataframe and assigning unique values to those keys from index 0 to the count(distinct number of keys).

For example: using the case above, I want to have an output in a map(key-value) collection in Scala like this:

    ([ABC_color_NORMAL_N_1->0]
    [XYZA_color_NORMAL_N_1->1]
    [GFFD_color_NORMAL_N_1->2]
    [NAAS_color_NORMAL_N_1->3]
    [LOWW_color_NORMAL_N_1->4]
    [KARA_color_NORMAL_N_1->5]
    [ALEQ_color_NORMAL_N_1->6]
    [VDDE_size_NORMAL_N_1->7]
    [QWER_color_NORMAL_N_1->8]
    [KDSS_color_NORMAL_Y_1->9]
    [KSDS_color_NORMAL_Y_1->10]
    [ADSS_color_NORMAL_Y_1->11]
    [BDSS_runn_NORMAL_Y_1->12]
    [EDSS_color_NORMAL_Y_1->13]
    )

I'm new to Scala and Spark and I tried doing something Like this.

 var map: Map[String, Int] = Map()
    var i = 0
    dataframe.foreach( record =>{
    //Is there a better way of creating a key!
        val key = record(0) + record(1) + record(2) + record(3)
        var index = i
        map += (key -> index)
        i+=1
          }
        )

But, this is not working.:/ The Map is null after this completes.

Pierre
  • 6,047
  • 1
  • 30
  • 49
Abhinav Bhardwaj
  • 167
  • 1
  • 1
  • 8

1 Answers1

19

The main issue in your code is trying to modify a variable created on driver-side within code executed on the workers. When using Spark, you can use driver-side variables within RDD transformations only as "read only" values.

Specifically:

  • The map is created on the driver machine
  • The map (with its initial, empty value) is serialized and sent to worker nodes
  • Each node might change the map (locally)
  • Result is just thrown away when foreach is done - result is not sent back to driver.

To fix this - you should choose a transformation that returns a changed RDD (e.g. map) to create the keys, use zipWithIndex to add the running "ids", and then use collectAsMap to get all the data back to the driver as a Map:

val result: Map[String, Long] = dataframe
  .map(record => record(0) + record(1) + record(2) + record(3))
  .zipWithIndex()
  .collectAsMap()

As for the key creation itself - assuming you want to include first 5 columns, and add a separator (_) between them, you can use:

record => record.toList.take(5).mkString("_")
Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • Thanks for the great answer! I am new to spark/scala and I am trying to do the same in my code, the only difference is that I have two columns in my Dataframe and I am trying to make it into a map with one column being the key and the other as the value. ex: column1, column2 => Map("column1" -> "column2", "column1" -> "column2",....). Is there a way to do this? Any help would be appreciated. – Hemanth Annavarapu Oct 19 '17 at 19:31
  • see `org.apache.spark.sql.functions.map` to create a Map column – Tzach Zohar Oct 19 '17 at 19:35
  • thanks for the reply! Is it available in spark 1.6? That's the version I am using.. I tried to import it but it seems like it's not available :( . I also tried in the same way as given in the posted question. But it gives me this error `java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;` at the line where I am trying to take the second column element as a string.. – Hemanth Annavarapu Oct 19 '17 at 19:59
  • Right, I'm afraid that function was only added in version 2.0. I'm afraid I won't be able to help further - feel free to post a separate question with all the details (the code you've tried, the exact expected output) and I/someone might be able to help. – Tzach Zohar Oct 19 '17 at 20:25
  • 1
    https://stackoverflow.com/q/46838313/8690528 Posted the question. Please take a look. Thanks! – Hemanth Annavarapu Oct 19 '17 at 20:30