0

Problem: I have a data set A {filed1, field2, field3...}, and I would like to first group A by say, field1, then in each of the resulting groups, I would like to do bunch of subqueries, for example, count the number of rows that have field2 == true, or count the number of distinct field3 that have field4 == "some_value" and field5 == false, etc.

Some alternatives I can think of: I can write a customized user defined aggregate function that takes a function that computes the condition for filtering, but this way I have to create an instance of it for every query condition. I've also looked at the countDistinct function can achieve some of the operations, but I can't figure out how to use it to implement the filter-distinct-count semantic.

In Pig, I can do:

FOREACH (GROUP A by field1) {
        field_a = FILTER A by field2 == TRUE;
        field_b = FILTER A by field4 == 'some_value' AND field5 == FALSE;
        field_c = DISTINCT field_b.field3;

        GENERATE  FLATTEN(group),
                  COUNT(field_a) as fa,
                  COUNT(field_b) as fb,
                  COUNT(field_c) as fc,

Is there a way to do this in Spark SQL?

elgoog
  • 509
  • 6
  • 14

1 Answers1

1

Excluding distinct count this is can solved by simple sum over condition:

import org.apache.spark.sql.functions.sum

val df = sc.parallelize(Seq(
  (1L, true, "x", "foo", true), (1L, true, "y", "bar", false), 
  (1L, true, "z", "foo", true), (2L, false, "y", "bar", false), 
  (2L, true, "x", "foo", false)
)).toDF("field1", "field2", "field3", "field4", "field5")

val left = df.groupBy($"field1").agg(
  sum($"field2".cast("int")).alias("fa"),
  sum(($"field4" === "foo" && ! $"field5").cast("int")).alias("fb")
)
left.show

// +------+---+---+
// |field1| fa| fb|
// +------+---+---+
// |     1|  3|  0|
// |     2|  1|  1|
// +------+---+---+

Unfortunately is much more tricky. GROUP BY clause in Spark SQL doesn't physically group data. Not to mention that finding distinct elements is quite expensive. Probably the best thing you can do is to compute distinct counts separately and simply join the results:

val right = df.where($"field4" === "foo" && ! $"field5")
  .select($"field1".alias("field1_"), $"field3")
  .distinct
  .groupBy($"field1_")
  .agg(count("*").alias("fc"))

val joined = left
  .join(right, $"field1" === $"field1_", "leftouter")
  .na.fill(0)

Using UDAF to count distinct values per condition is definitely an option but efficient implementation will be rather tricky. Converting from internal representation is rather expensive, and implementing fast UDAF with a collection storage is not cheap either. If you can accept approximate solution you can use bloom filter there.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 2
    After some googling I found that you can avoid doing join using `case when then`, so in your example, `SELECT field1, SUM(CAST(field2 AS INT)) AS fa, SUM(CAST(field4 = "foo" AND NOT field5 AS INT)) AS fb, COUNT(DISTINCT(CASE WHEN field4 = "foo" AND NOT field5 THEN field3 END)) AS fc FROM df GROUP BY field1` produces the same final result, and it should also be more efficient – elgoog Dec 21 '15 at 22:28