1

I am trying to concat multiple columns in spark using concat function.

For example below is the table for which I have to add new concatenated column

table - **t**
+---+----+  
| id|name|
+---+----+  
|  1|   a|  
|  2|   b|
+---+----+

and below is the table which has the information about which columns are to be concatenated for given id (for id 1 column id and name needs to be concatenated and for id 2 only id)

table - **r**
+---+-------+
| id|   att |
+---+-------+
|  1|id,name|
|  2|   id  |
+---+-------+

if I join the two tables and do something like below, I am able to concat but not based on the table r (as the new column is having 1,a for first row but for second row it should be 2 only)

t.withColumn("new",concat_ws(",",t.select("att").first.mkString.split(",").map(c => col(c)): _*)).show
+---+----+-------+---+
| id|name|  att  |new|
+---+----+-------+---+
|  1|   a|id,name|1,a|
|  2|   b|  id   |2,b|
+---+----+-------+---+

I have to apply filter before the select in the above query, but I am not sure how to do that in withColumn for each row.

Something like below, if that is possible.

t.withColumn("new",concat_ws(",",t.**filter**("id="+this.id).select("att").first.mkString.split(",").map(c => col(c)): _*)).show

As it will require to filter each row based on the id.

scala> t.filter("id=1").select("att").first.mkString.split(",").map(c => col(c))
res90: Array[org.apache.spark.sql.Column] = Array(id, name)

scala> t.filter("id=2").select("att").first.mkString.split(",").map(c => col(c))
res89: Array[org.apache.spark.sql.Column] = Array(id)

Below is the final required result.

+---+----+-------+---+
| id|name|  att  |new|
+---+----+-------+---+
|  1|   a|id,name|1,a|
|  2|   b|  id   |2  |
+---+----+-------+---+
Alejandro
  • 7,290
  • 4
  • 34
  • 59
Sunil
  • 13
  • 1
  • 4

2 Answers2

0

This may be done in a UDF:

val cols: Seq[Column] = dataFrame.columns.map(x => col(x)).toSeq
val indices: Seq[String] = dataFrame.columns.map(x => x).toSeq

val generateNew = udf((values: Seq[Any]) => {
  val att = values(indices.indexOf("att")).toString.split(",")
  val associatedIndices = indices.filter(x => att.contains(x))
  val builder: StringBuilder  = StringBuilder.newBuilder
  values.filter(x => associatedIndices.contains(values.indexOf(x)))
  values.foreach{ v => builder.append(v).append(";") }
  builder.toString()
})

val dfColumns = array(cols:_*)
val dNew = dataFrame.withColumn("new", generateNew(dfColumns))

This is just a sketch, but the idea is that you can pass a sequence of items to the user defined function, and select the ones that are needed dynamically.

Note that there are additional types of collection/maps that you can pass - for example How to pass array to UDF

Yosi Dahari
  • 6,794
  • 5
  • 24
  • 44
0

We can use UDF

Requirements for this logic to work.

The column name of your table t should be in same order as it comes in col att of table r

scala> input_df_1.show
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
+---+----+

scala> input_df_2.show
+---+-------+
| id|    att|
+---+-------+
|  1|id,name|
|  2|     id|
+---+-------+

scala> val join_df = input_df_1.join(input_df_2,Seq("id"),"inner")
join_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> val req_cols = input_df_1.columns
req_cols: Array[String] = Array(id, name)

scala> def new_col_udf = udf((cols : Seq[String],row : String,attr : String) => {
     |     val row_values = row.split(",")
     |     val attrs = attr.split(",")
     |     val req_val = attrs.map{at =>
     |     val index = cols.indexOf(at)
     |     row_values(index)
     |     }
     |     req_val.mkString(",")
     |     })
new_col_udf: org.apache.spark.sql.expressions.UserDefinedFunction

scala>  val intermediate_df = join_df.withColumn("concat_column",concat_ws(",",'id,'name)).withColumn("new_col",new_col_udf(lit(req_cols),'concat_column,'att))
intermediate_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 3 more fields]

scala> val result_df = intermediate_df.select('id,'name,'att,'new_col)
result_df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 2 more fields]

scala> result_df.show
+---+----+-------+-------+
| id|name|    att|new_col|
+---+----+-------+-------+
|  1|   a|id,name|    1,a|
|  2|   b|     id|      2|
+---+----+-------+-------+

Hope it answers your question.

Ankush Singh
  • 560
  • 7
  • 17