0

I'm using Spark SQL to pull a rows from a table. Some of this data is recurring, and I'm trying to count the number of occurrences. In essence, I'm trying to perform the basic "word count" example, but instead of my data being of the form: (Word : String, Count : Int), we have a row of data replacing the word/string.

More specifically, my data looks like: RDD[((row), count)], where row is pulled from the sql table, and contains strings, doubles, ints, etc.

It is in RDD form, because I want to use reduceByKey. See: Avoid groupByKey. It is a (Key, Value) pair with a very long key (some row from a sql database) and its value being the "word count".

My app is doing this:

myDataframe
    // Append a 1 to each row
    .map(row => (row, 1))
    // Convert to RDD so we can use the reduceByKey method
    .rdd
    // Add up the 1's corresponding to matching keys
    .reduceByKey(_ + _)
    //Filter by rows that show up more than 10 times
    .filter(_._2 > 100)

    ...

Now let's say my row data contains (string, double, int). This is where I want to unpack my data from RDD[((string, double, int), count)] to RDD[(string, double, int, count)] so that I can eventually save this data to another SQL table.

Is there some method that allows me to unpack the contents of this ... nested tuple ... sort of thing?

My solution has been to "unpack" the elements of the RDD like so: .map(row => (row._1._1, row._1._2, row._1._3, row._2))

But there must be a better way! If I decide to grab more elements from the row, I'd have to modify this .map() call.

Thanks!

R. Gosman
  • 31
  • 6

2 Answers2

2

You can make use of Row's toSeq and fromSeq as in the following example:

val df = Seq(
  ("a", 10.0, 1),
  ("a", 10.0, 1),
  ("b", 20.0, 2),
  ("c", 30.0, 3),
  ("c", 30.0, 3)
).toDF("c1", "c2", "c3")

import org.apache.spark.sql.Row

df.rdd.
  map((_, 1)).
  reduceByKey(_ + _).
  filter(_._2 > 1).
  map{
    case (row: Row, count: Int) => Row.fromSeq(row.toSeq :+ count)
  }
// res1: Array[org.apache.spark.sql.Row] = Array([a,10.0,1,2], [c,30.0,3,2])
Leo C
  • 22,006
  • 3
  • 26
  • 39
  • Thanks for the advice! I came across a somewhat similar suggestion, but found it funny to create a sequence... simply to extract things from that sequence in the next step. Seemed like more of a patch than a solution. I also didn't want to re-write the code "e.g.: _._2 to _._3" if my columns ever changed. – R. Gosman May 30 '18 at 21:57
  • `Row` is like `Tuple` in which elements can be of various types and adding (or removing) an element to a `Tuple` is not as trivial as doing that to a `Seq`. That's why `Row` is equipped with the `to/fromSeq` methods to address such need. In this use case, they will take whatever the row is when the transformation takes place, hence no code change is needed if the row has been changed. These methods are also commonly used in other use cases (e.g. [creating contiguous indexes](https://stackoverflow.com/a/49702677/6316508) in a dataframe). – Leo C May 30 '18 at 22:44
2

You DO NOT have to revert to using RDD; The article you referenced correctly warns from using RDD.groupByKey, but it should not be applied to DataFrame's groupBy. It's safe (and performant) to use groupBy on a DataFrame! See more here.

So, to group by all of the DataFrame's columns, count occurences of each group, and filter for groups with count > 10, you can simply use:

df.groupBy(df.columns.map(col): _*) // alternatively: df.groupBy(df.columns.head, df.columns.tail: _*)
  .count()
  .filter($"count" > 10)

The result has a schema similar to input, with the additional count long column.

Tzach Zohar
  • 37,442
  • 3
  • 79
  • 85
  • Thank you for pointing this out! The posts on performance different were super interesting to read. I see that in your example you count the number of elements in a group, and filter for "popular" ones, but I'm not sure why you're doing a map on 'col'. I don't think that's a placeholder for a column name... but I'm getting a syntax error. – R. Gosman May 30 '18 at 21:53
  • there are two `groupBy` methods: one expects argument of type `cols: Column*` - meaning a series of `Column` objects, which is the one I'm using, and another one that expects *two* arguments: `(col1: String, cols: String*)` - meaning the _name_ of the first column, and then the rest of the names; You can use either one, but I find the former more convenient as it doesn't require separately passing the first column; Simply calling `groupBy(df.columns: _*)` does not match either if these signatures (common mistake - see similar issue with `select`: https://stackoverflow.com/a/36131805/5344058). – Tzach Zohar May 30 '18 at 22:34
  • ... Turns out I just had to `import org.apache.spark.sql.functions._` Makes a lot more sense ; I thought there was some voodoo going on with 'col' as a placeholder. Turns out it's just a good ole' function. Thanks for the clarification and further reading! – R. Gosman May 30 '18 at 23:01