1

I have three columns in df

Col1,col2,col3

X,x1,x2

Z,z1,z2

Y,

X,x3,x4

P,p1,p2

Q,q1,q2

Y

I want to do the following when col1=x,store the value of col2 and col3 and assign those column values to next row when col1=y expected output

X,x1,x2

Z,z1,z2

Y,x1,x2

X,x3,x4

P,p1,p2

Q,q1,q2

Y,x3,x4

Any help would be appreciated Note:-spark 1.6

Aryan Singh
  • 602
  • 1
  • 8
  • 17

2 Answers2

1

Here's one approach using Window function with steps as follows:

  1. Add row-identifying column (not needed if there is already one) and combine non-key columns (presumably many of them) into one
  2. Create tmp1 with conditional nulls and tmp2 using last/rowsBetween Window function to back-fill with the last non-null value
  3. Create newcols conditionally from cols and tmp2
  4. Expand newcols back to individual columns using foldLeft

Note that this solution uses Window function without partitioning, thus may not work for large dataset.

val df = Seq(
  ("X", "x1", "x2"),
  ("Z", "z1", "z2"),
  ("Y", "", ""),
  ("X", "x3", "x4"),
  ("P", "p1", "p2"),
  ("Q", "q1", "q2"),
  ("Y", "", "")
).toDF("col1", "col2", "col3")

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val colList = df.columns.filter(_ != "col1")

val df2 = df.select($"col1", monotonically_increasing_id.as("id"),
  struct(colList.map(col): _*).as("cols")
)

val df3 = df2.
  withColumn( "tmp1", when($"col1" === "X", $"cols") ).
  withColumn( "tmp2", last("tmp1", ignoreNulls = true).over(
    Window.orderBy("id").rowsBetween(Window.unboundedPreceding, 0)
  ) )

df3.show
// +----+---+-------+-------+-------+
// |col1| id|   cols|   tmp1|   tmp2|
// +----+---+-------+-------+-------+
// |   X|  0|[x1,x2]|[x1,x2]|[x1,x2]|
// |   Z|  1|[z1,z2]|   null|[x1,x2]|
// |   Y|  2|    [,]|   null|[x1,x2]|
// |   X|  3|[x3,x4]|[x3,x4]|[x3,x4]|
// |   P|  4|[p1,p2]|   null|[x3,x4]|
// |   Q|  5|[q1,q2]|   null|[x3,x4]|
// |   Y|  6|    [,]|   null|[x3,x4]|
// +----+---+-------+-------+-------+

val df4 = df3.withColumn( "newcols",
  when($"col1" === "Y", $"tmp2").otherwise($"cols")
).select($"col1", $"newcols")

df4.show
// +----+-------+
// |col1|newcols|
// +----+-------+
// |   X|[x1,x2]|
// |   Z|[z1,z2]|
// |   Y|[x1,x2]|
// |   X|[x3,x4]|
// |   P|[p1,p2]|
// |   Q|[q1,q2]|
// |   Y|[x3,x4]|
// +----+-------+

val dfResult = colList.foldLeft( df4 )(
  (accDF, c) => accDF.withColumn(c, df4(s"newcols.$c"))
).drop($"newcols")

dfResult.show
// +----+----+----+
// |col1|col2|col3|
// +----+----+----+
// |   X|  x1|  x2|
// |   Z|  z1|  z2|
// |   Y|  x1|  x2|
// |   X|  x3|  x4|
// |   P|  p1|  p2|
// |   Q|  q1|  q2|
// |   Y|  x3|  x4|
// +----+----+----+

[UPDATE]

For Spark 1.x, last(colName, ignoreNulls) isn't available in the DataFrame API. A work-around is to revert to use Spark SQL which supports ignore-null in its last() method:

df2.
  withColumn( "tmp1", when($"col1" === "X", $"cols") ).
  createOrReplaceTempView("df2table")
  // might need to use registerTempTable("df2table") instead

val df3 = spark.sqlContext.sql("""
  select col1, id, cols, tmp1, last(tmp1, true) over (
    order by id rows between unbounded preceding and current row
    ) as tmp2
  from df2table
""")
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • I am getting some exception "error: overloaded method value last with alternatives:" and i am removing second parameter of from last("tmp1",ignoreNulls = true) then I am getting this exception "38: error: value unboundedPreceding is not a member of object org.apache.spark.sql.expressions.Window" – Aryan Singh Mar 04 '18 at 18:50
  • I am using spark 1.6 – Aryan Singh Mar 04 '18 at 18:56
  • You can replace `Window.unboundedPreceding` with `Long.MinValue`. Unfortunately `last(colName, ignoreNulls)` isn't available prior to Spark 2.0. Please see my updated answer for a work-around. – Leo C Mar 04 '18 at 20:30
  • Still I am getting org.apache.spark.sql.AnalysisException: Couldn't find window function last; let me know if i am doing somethng wrong – Aryan Singh Mar 05 '18 at 15:34
  • I don't have a Spark 1.6 environment hence can't reproduce the said problem. To revert to using Spark SQL make sure you `import sqlContext.implicits._`. Here's a relevant [SO link](https://stackoverflow.com/questions/32905443/spark-couldnt-find-window-function) for your reference. – Leo C Mar 05 '18 at 17:41
  • Thanks @Leo It solved my requirement in spark 2.0 but we r still using 1.6 in production. Can we do it using pure scala code running in distributed mode...is it good idea??? – Aryan Singh Mar 06 '18 at 18:00
  • Glad that it helps, though I'm a little surprised that the Spark SQL work-around doesn't work in Spark 1.6. Not sure I understand what you mean by pure Scala code in distributed mode. If you mean using the RDD API with Scala, here's a relevant [SO link](https://stackoverflow.com/questions/33621319/spark-scala-forward-fill-with-last-observation) which might be of interest. – Leo C Mar 06 '18 at 19:21
0

Yes, there is a lag function that requires ordering

import org.apache.spark.sql.expressions.Window.orderBy
import org.apache.spark.sql.functions.{coalesce, lag}

case class Temp(a: String, b: Option[String], c: Option[String])

val input = ss.createDataFrame(
  Seq(
    Temp("A", Some("a1"), Some("a2")),
    Temp("D", Some("d1"), Some("d2")),
    Temp("B", Some("b1"), Some("b2")),
    Temp("E", None, None),
    Temp("C", None, None)
  ))

+---+----+----+
|  a|   b|   c|
+---+----+----+
|  A|  a1|  a2|
|  D|  d1|  d2|
|  B|  b1|  b2|
|  E|null|null|
|  C|null|null|
+---+----+----+

val order = orderBy($"a")
input
  .withColumn("b", coalesce($"b", lag($"b", 1).over(order)))
  .withColumn("c", coalesce($"c", lag($"c", 1).over(order)))
  .show()

+---+---+---+
|  a|  b|  c|
+---+---+---+
|  A| a1| a2|
|  B| b1| b2|
|  C| b1| b2|
|  D| d1| d2|
|  E| d1| d2|
+---+---+---+
Nazarii Bardiuk
  • 4,272
  • 1
  • 19
  • 22