1

I have a dataframe that looks like this:

column1_ID column2 column3 column4
A_123      12      A       1
A_123      12      B       2
A_123      23      A       1 
B_456      56      DB      4 
B_456      56      BD      5
B_456      60      BD      3

I would like to convert above dataframe/rdd into below OUTPUT column1_ID(KEY): HashMap(Long, HashMap(String, Long))

'A_123': {12 : {'A': 1, 'B': 2}, 23: {'A': 1} }, 
'B_456': {56 : {'DB': 4, 'BD': 5}, 60: {'BD': 3} }

Tried with reduceByKey and groupByKey but couldn't convert the output as expected.

Vivek Narayanasetty
  • 522
  • 2
  • 5
  • 12

2 Answers2

2

Can be done with creating complex structure from three last columns, and then apply UDF:

val data = List(
  ("A_123", 12, "A", 1),
  ("A_123", 12, "B", 2),
  ("A_123", 23, "A", 1),
  ("B_456", 56, "DB", 4),
  ("B_456", 56, "BD", 5),
  ("B_456", 60, "BD", 3))
val df = data.toDF("column1_ID", "column2", "column3", "column4")

val twoLastCompacted = df.withColumn("lastTwo", struct($"column3", $"column4"))
twoLastCompacted.show(false)
val grouppedByTwoFirst = twoLastCompacted.groupBy("column1_ID", "column2").agg(collect_list("lastTwo").alias("lastTwoArray"))
grouppedByTwoFirst.show(false)

val treeLastCompacted = grouppedByTwoFirst.withColumn("lastThree", struct($"column2", $"lastTwoArray"))
treeLastCompacted.show(false)

val gruppedByFirst = treeLastCompacted.groupBy("column1_ID").agg(collect_list("lastThree").alias("lastThreeArray"))
gruppedByFirst.printSchema()
gruppedByFirst.show(false)

val structToMap = (value: Seq[Row]) =>
  value.map(v => v.getInt(0) ->
    v.getSeq(1).asInstanceOf[Seq[Row]].map(r => r.getString(0) -> r.getInt(1)).toMap)
    .toMap
val structToMapUDF = udf(structToMap)
gruppedByFirst.select($"column1_ID", structToMapUDF($"lastThreeArray")).show(false)

Output:

+----------+-------+-------+-------+-------+
|column1_ID|column2|column3|column4|lastTwo|
+----------+-------+-------+-------+-------+
|A_123     |12     |A      |1      |[A,1]  |
|A_123     |12     |B      |2      |[B,2]  |
|A_123     |23     |A      |1      |[A,1]  |
|B_456     |56     |DB     |4      |[DB,4] |
|B_456     |56     |BD     |5      |[BD,5] |
|B_456     |60     |BD     |3      |[BD,3] |
+----------+-------+-------+-------+-------+

+----------+-------+----------------+
|column1_ID|column2|lastTwoArray    |
+----------+-------+----------------+
|B_456     |60     |[[BD,3]]        |
|A_123     |12     |[[A,1], [B,2]]  |
|B_456     |56     |[[DB,4], [BD,5]]|
|A_123     |23     |[[A,1]]         |
+----------+-------+----------------+

+----------+-------+----------------+---------------------------------+
|column1_ID|column2|lastTwoArray    |lastThree                        |
+----------+-------+----------------+---------------------------------+
|B_456     |60     |[[BD,3]]        |[60,WrappedArray([BD,3])]        |
|A_123     |12     |[[A,1], [B,2]]  |[12,WrappedArray([A,1], [B,2])]  |
|B_456     |56     |[[DB,4], [BD,5]]|[56,WrappedArray([DB,4], [BD,5])]|
|A_123     |23     |[[A,1]]         |[23,WrappedArray([A,1])]         |
+----------+-------+----------------+---------------------------------+

root
 |-- column1_ID: string (nullable = true)
 |-- lastThreeArray: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- column2: integer (nullable = false)
 |    |    |-- lastTwoArray: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- column3: string (nullable = true)
 |    |    |    |    |-- column4: integer (nullable = false)

+----------+--------------------------------------------------------------+
|column1_ID|lastThreeArray                                                |
+----------+--------------------------------------------------------------+
|B_456     |[[60,WrappedArray([BD,3])], [56,WrappedArray([DB,4], [BD,5])]]|
|A_123     |[[12,WrappedArray([A,1], [B,2])], [23,WrappedArray([A,1])]]   |
+----------+--------------------------------------------------------------+

+----------+----------------------------------------------------+
|column1_ID|UDF(lastThreeArray)                                 |
+----------+----------------------------------------------------+
|B_456     |Map(60 -> Map(BD -> 3), 56 -> Map(DB -> 4, BD -> 5))|
|A_123     |Map(12 -> Map(A -> 1, B -> 2), 23 -> Map(A -> 1))   |
+----------+----------------------------------------------------+
pasha701
  • 6,831
  • 1
  • 15
  • 22
  • thanks pasha , i have scenario for look up like this 1) for a particular column _1 contains some "x" , then it column_2 , column_3 should be have to look up in table 2) this look up table/db pulls records of which has a map i..e fileds of column_2 , column_3 and convert respective mapping fields ....how can this be achieved ? – BdEngineer Jul 16 '20 at 09:32
  • any advice how to solve this in spark ... https://stackoverflow.com/questions/62933135/dataframe-look-up-and-optimization – BdEngineer Jul 16 '20 at 10:55
  • 1
    @BdEngineer answer in yours question by Someshwar Kale looks fine. – pasha701 Jul 16 '20 at 15:46
  • thanks pasha for your quick reply ...one generic question , how can get alerts on SOF for any spark question postings? – BdEngineer Jul 16 '20 at 16:19
  • @BdEngineer this is harder question, maybe, SOF has special help page for this. – pasha701 Jul 16 '20 at 16:51
  • what is enrolling process for the same ? – BdEngineer Jul 16 '20 at 17:54
  • any suggestion on this https://stackoverflow.com/questions/63074569/copy-current-row-modify-it-and-add-a-new-row-in-spark – BdEngineer Jul 24 '20 at 13:31
1

You can convert the DF to an rdd and apply the operations like below:

scala> case class Data(col1: String, col2: Int, col3: String, col4: Int)
defined class Data

scala> var x: Seq[Data] = List(Data("A_123",12,"A",1), Data("A_123",12,"B",2), Data("A_123",23,"A",1), Data("B_456",56,"DB",4), Data("B_456",56,"BD",5), Data("B_456",60,"BD",3))
x: Seq[Data] = List(Data(A_123,12,A,1), Data(A_123,12,B,2), Data(A_123,23,A,1), Data(B_456,56,DB,4), Data(B_456,56,BD,5), Data(B_456,60,BD,3))

scala> sc.parallelize(x).groupBy(_.col1).map{a => (a._1, HashMap(a._2.groupBy(_.col2).map{b => (b._1, HashMap(b._2.groupBy(_.col3).map{c => (c._1, c._2.map(_.col4).head)}.toArray: _*))}.toArray: _*))}.toDF()
res26: org.apache.spark.sql.DataFrame = [_1: string, _2: map<int,map<string,int>>]

I have initialized an rdd with the data structure as in your case by sc.parallelize(x)

Shubham Jain
  • 392
  • 1
  • 3
  • 15
  • Thanks for sharing the code. It works fine after removing the "toDF()". I am getting "Error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases." when converting it to dataframe. Any idea ? – Vivek Narayanasetty Sep 13 '18 at 10:26
  • you need to import spark.implicits._ above your code then it will work fine. – Teena Vashist Sep 13 '18 at 13:53
  • Tried that already....but still giving the same error. Not sure if I am missing anything. – Vivek Narayanasetty Sep 13 '18 at 16:35