0

I have a data frame that looks something like this:

val df = sc.parallelize(Seq(
  (3,1,"A"),(3,2,"B"),(3,3,"C"),
  (2,1,"D"),(2,2,"E"),
  (3,1,"F"),(3,2,"G"),(3,3,"G"),
  (2,1,"X"),(2,2,"X")
)).toDF("TotalN", "N", "String")

+------+---+------+
|TotalN|  N|String|
+------+---+------+
|     3|  1|     A|
|     3|  2|     B|
|     3|  3|     C|
|     2|  1|     D|
|     2|  2|     E|
|     3|  1|     F|
|     3|  2|     G|
|     3|  3|     G|
|     2|  1|     X|
|     2|  2|     X|
+------+---+------+

I need to aggregate the strings by concatenating them together based on the TotalN and the sequentially increasing ID (N). The problem is there is not a unique ID for each aggregation I can group by. So, I need to do something like "for each row look at the TotalN, loop through the next N rows and concatenate, then reset".

+------+------+
|TotalN|String|
+------+------+
|     3|   ABC|
|     2|    DE|
|     3|   FGG|
|     2|    XX|
+------+------+

Any pointers much appreciated.

Using Spark 2.3.1 and the Scala Api.

wab
  • 797
  • 6
  • 19
  • I think the issue here is the order matter for you to aggregate while the data is distributed! Can you explain a bit more what you are trying to achieve? – proximator Aug 10 '18 at 09:36
  • from what you say, TotalN doesn't come into it - it's just aggregating based on sequences of N ... as soon as sequence ends, start new aggregration – joel Aug 10 '18 at 09:37
  • Hi @proximator Yes, concerned about the data getting distributed across different workers. What I need to do is aggregate (concatenate) a string for further processing downstream, but the only grouping variables I have are TotalN (how many rows is String X split across) and a sequentially increasing row count for that aggregation. The order of the rows in the input file respects the N sequence. In the sense that each message goes 1,2,3 and never 2,3,1 or 3,2,1 – wab Aug 10 '18 at 09:52
  • @JoelBerkeley - Yes, exactly the problem, I'm just unsure of the 'spark way' of doing this and there is too much data to do this on a single machine... I'm new to Spark, so there may be something obvious I'm missing. – wab Aug 10 '18 at 09:54
  • @JoelBerkeley Lots of delimited text files – wab Aug 10 '18 at 10:01
  • 1
    see https://stackoverflow.com/a/20405902/5986907 ie add a column which is (N - rowNumber), then group by that – joel Aug 10 '18 at 10:12
  • @JoelBerkeley - Just given that a read over - think you're right. Thanks! – wab Aug 10 '18 at 10:15

2 Answers2

2

Try this:

val df = spark.sparkContext.parallelize(Seq(
  (3, 1, "A"), (3, 2, "B"), (3, 3, "C"),
  (2, 1, "D"), (2, 2, "E"),
  (3, 1, "F"), (3, 2, "G"), (3, 3, "G"),
  (2, 1, "X"), (2, 2, "X")
)).toDF("TotalN", "N", "String")


df.createOrReplaceTempView("data")

val sqlDF = spark.sql(
  """
    | SELECT TotalN d, N, String, ROW_NUMBER() over (order by TotalN) as rowNum
    | FROM data
  """.stripMargin)

sqlDF.withColumn("key", $"N" - $"rowNum")
  .groupBy("key").agg(collect_list('String).as("texts")).show()
proximator
  • 687
  • 6
  • 18
0

Solution is to calculate a grouping variable using the row_number function which can be used in later groupBy.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.row_number

var w = Window.orderBy("TotalN")
df.withColumn("GeneratedID", $"N" - row_number.over(w)).show

+------+---+------+-----------+
|TotalN|  N|String|GeneratedID|
+------+---+------+-----------+
|     2|  1|     D|          0|
|     2|  2|     E|          0|
|     2|  1|     X|         -2|
|     2|  2|     X|         -2|
|     3|  1|     A|         -4|
|     3|  2|     B|         -4|
|     3|  3|     C|         -4|
|     3|  1|     F|         -7|
|     3|  2|     G|         -7|
|     3|  3|     G|         -7|
+------+---+------+-----------+
wab
  • 797
  • 6
  • 19