0

I am trying to see if we can create new columns from value in one of the columns in a dataFrame using spark/scala. I have a dataframe with following data in it

df.show()

+---+-----------------------+
|id |allvals                |
+---+-----------------------+
|1  |col1,val11|col3,val31  |
|3  |col3,val33|col1,val13  |
|2  |col2,val22             |
+---+-----------------------+

In the above data col1/col2/col3 are the column names followed by it's value. Column name and value are separated by ,. Each set is separated by |.

Now, I want to achieve like this

+---+----------------------+------+------+------+
|id |allvals               |col1  |col2  |col3  |
+---+----------------------+------+------+------+
|1  |col1,val11|col3,val31 |val11 |null  |val31 |
|3  |col3,val33|col1,val13 |val13 |null  |val13 |
|2  |col2,val22            |null  |val22 |null  |
+---+----------------------+------+------+------+

Appreciate any help.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
Lux
  • 65
  • 1
  • 8

2 Answers2

0

You can convert column to Map with udf:

import org.apache.spark.sql.functions._
import spark.implicits._

val df = Seq(
  (1, "col1,val11|col3,val31"), (2, "col3,val33|col3,val13"), (2, "col2,val22")
).toDF("id", "allvals")

val to_map = udf((s: String) => s.split('|').collect { _.split(",") match {
  case Array(k, v) => (k, v)
}}.toMap )

val dfWithMap = df.withColumn("allvalsmap", to_map($"allvals"))
val keys = dfWithMap.select($"allvalsmap").as[Map[String, String]].flatMap(_.keys.toSeq).distinct.collect

keys.foldLeft(dfWithMap)((df, k) => df.withColumn(k, $"allvalsmap".getItem(k))).drop("allvalsmap").show
// +---+--------------------+-----+-----+-----+
// | id|             allvals| col3| col1| col2|
// +---+--------------------+-----+-----+-----+
// |  1|col1,val11|col3,v...|val31|val11| null|
// |  2|col3,val33|col3,v...|val13| null| null|
// |  2|          col2,val22| null| null|val22|
// +---+--------------------+-----+-----+-----+

Inspired by this answer by user6910411.

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • Thank You! This is nice. I encountered following issue under .as[Map[String,String]]... scala> val keys = dfWithMap.select($"allvalsmap").as[Map[String, String]].flatMap(_.keys.toSeq).distinct.collect :40: error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. – Lux May 04 '18 at 21:30
  • You have to ` import spark.implicits._` where `spark` is `SparkSession` – Alper t. Turker May 04 '18 at 21:39
  • I imported that but still had the issue. – Lux May 04 '18 at 21:50
  • I cannot reproduce that. Which Spark version do you use? – Alper t. Turker May 04 '18 at 22:55
  • I am using spark 2.2 and scala 2.11 – Lux May 05 '18 at 11:43
  • How about `flatMap(_.keys.toArray)`? – Alper t. Turker May 05 '18 at 12:37
0

You can transform the DataFrame using split, explode and groupBy/pivot/agg, as follows:

val df = Seq(
  (1, "col1,val11|col3,val31"),
  (2, "col3,val33|col1,val13"),
  (3, "col2,val22")
).toDF("id", "allvals")

import org.apache.spark.sql.functions._

df.withColumn("temp", split($"allvals", "\\|")).
  withColumn("temp", explode($"temp")).
  withColumn("temp", split($"temp", ",")).
  select($"id", $"allvals", $"temp".getItem(0).as("k"), $"temp".getItem(1).as("v")).
  groupBy($"id", $"allvals").pivot("k").agg(first($"v"))

// +---+---------------------+-----+-----+-----+
// |id |allvals              |col1 |col2 |col3 |
// +---+---------------------+-----+-----+-----+
// |1  |col1,val11|col3,val31|val11|null |val31|
// |3  |col2,val22           |null |val22|null |
// |2  |col3,val33|col1,val13|val13|null |val33|
// +---+---------------------+-----+-----+-----+
Leo C
  • 22,006
  • 3
  • 26
  • 39