1

I have a dataframe like this.

+---+---+---+---+
|  M| c2| c3| d1|
+---+---+---+---+
|  1|2_1|4_3|1_2|
|  2|3_4|4_5|1_2|
+---+---+---+---+

I have to transform this df should look like below. Here, c_max = max(c2,c3) after splitting with _.ie, all the columns (c2 and c3) have to be splitted with _ and then getting the max.

In the actual scenario, I have 50 columns ie, c2,c3....c50 and need to take the max from this.

+---+---+---+---+------+
|  M| c2| c3| d1|c_Max |
+---+---+---+---+------+
|  1|2_1|4_3|1_2|  4   |
|  2|3_4|4_5|1_2|  5   |
+---+---+---+---+------+
abc_spark
  • 383
  • 3
  • 19

3 Answers3

4

Here is one way using expr and build-in array functions for Spark >= 2.4.0:

import org.apache.spark.sql.functions.{expr, array_max, array}

val df = Seq(
  (1, "2_1", "3_4", "1_2"),
  (2, "3_4", "4_5", "1_2")
).toDF("M", "c2", "c3", "d1")

// get max c for each c column 
val c_cols = df.columns.filter(_.startsWith("c")).map{ c =>
  expr(s"array_max(cast(split(${c}, '_') as array<int>))")
}

df.withColumn("max_c", array_max(array(c_cols:_*))).show

Output:

+---+---+---+---+-----+
|  M| c2| c3| d1|max_c|
+---+---+---+---+-----+
|  1|2_1|3_4|1_2|    4|
|  2|3_4|4_5|1_2|    5|
+---+---+---+---+-----+

For older versions use the next code:

val c_cols = df.columns.filter(_.startsWith("c")).map{ c =>
  val c_ar = split(col(c), "_").cast("array<int>")
  when(c_ar.getItem(0) > c_ar.getItem(1), c_ar.getItem(0)).otherwise(c_ar.getItem(1))
}

df.withColumn("max_c", greatest(c_cols:_*)).show
abiratsis
  • 7,051
  • 3
  • 28
  • 46
3

Use greatest function:

val df = Seq((1, "2_1", "3_4", "1_2"),(2, "3_4", "4_5", "1_2"),
).toDF("M", "c2", "c3", "d1")

// get all `c` columns and split by `_` to get the values after the underscore
val c_cols = df.columns.filter(_.startsWith("c"))
                       .flatMap{
                           c => Seq(split(col(c), "_").getItem(0).cast("int"), 
                                    split(col(c), "_").getItem(1).cast("int")
                                )
                        } 

// apply greatest func
val c_max = greatest(c_cols: _*)

// add new column
df.withColumn("c_Max", c_max).show()

Gives:

+---+---+---+---+-----+
|  M| c2| c3| d1|c_Max|
+---+---+---+---+-----+
|  1|2_1|3_4|1_2|    4|
|  2|3_4|4_5|1_2|    5|
+---+---+---+---+-----+
blackbishop
  • 30,945
  • 11
  • 55
  • 76
  • Hi blackbishop, Thanks for that. I have edited my question lil bit.Sorry about that i could have done this initially itself. Actually, I have to get the max of the integers on each side of "_" – abc_spark Jan 02 '20 at 09:33
  • Thanks for that.Let me try with this. – abc_spark Jan 02 '20 at 10:24
  • But do you have some way to incorporate float as well? as per Alexandro's comment, there can be an issue when - sorting for an array of ["1", "2", "03"] max will be 2 – abc_spark Jan 02 '20 at 13:00
  • flatMap{ c => Seq(split(col(c), "_" ).getItem(0).cast("Float"), split(col(c), "_").getItem(1).cast("Float")) } This will serve the purpose I guess.. Thanks – abc_spark Jan 02 '20 at 13:10
  • Hi @blackbishop Would you be able to advise on the question - https://stackoverflow.com/questions/62211108/finding-percentile-in-spark-scala-in-dynamic-way – abc_spark Jun 05 '20 at 09:38
1

In spark >= 2.4.0, you can use the array_max function and get some code that would work even with columns containing more than 2 values. The idea is to start by concatenating all the columns (concat column). For that, I use concat_ws on an array of all the columns I want to concat, that I obtain with array(cols.map(col) :_*). Then I split the resulting string to get a big array of strings containing all the values of all the columns. I cast it to an array of ints and I call array_max on it.

val cols = (2 to 50).map("c"+_)

val result = df
    .withColumn("concat", concat_ws("_", array(cols.map(col) :_*)))
    .withColumn("array_of_ints", split('concat, "_").cast(ArrayType(IntegerType)))
    .withColumn("c_max", array_max('array_of_ints))
    .drop("concat", "array_of_ints")

In spark < 2.4, you can define array_max yourself like this:

val array_max = udf((s : Seq[Int]) => s.max)

The previous code does not need to be modified. Note however that UDFs can be slower than predefined spark SQL functions.

Oli
  • 9,766
  • 5
  • 25
  • 46