0

New to the RDD api of spark - thanks to Spark migrate sql window function to RDD for better performance - I managed to generate the following table:

+-----------------+---+
|               _1| _2|
+-----------------+---+
|  [col3TooMany,C]|  0|
|         [col1,A]|  0|
|         [col2,B]|  0|
|  [col3TooMany,C]|  1|
|         [col1,A]|  1|
|         [col2,B]|  1|
|[col3TooMany,jkl]|  0|
|         [col1,d]|  0|
|         [col2,a]|  0|
|  [col3TooMany,C]|  0|
|         [col1,d]|  0|
|         [col2,g]|  0|
|  [col3TooMany,t]|  1|
|         [col1,A]|  1|
|         [col2,d]|  1|
|  [col3TooMany,C]|  1|
|         [col1,d]|  1|
|         [col2,c]|  1|
|  [col3TooMany,C]|  1|
|         [col1,c]|  1|
+-----------------+---+

with an initial input of

val df = Seq(
    (0, "A", "B", "C", "D"),
    (1, "A", "B", "C", "D"),
    (0, "d", "a", "jkl", "d"),
    (0, "d", "g", "C", "D"),
    (1, "A", "d", "t", "k"),
    (1, "d", "c", "C", "D"),
    (1, "c", "B", "C", "D")
  ).toDF("TARGET", "col1", "col2", "col3TooMany", "col4")

  val columnsToDrop = Seq("col3TooMany")
  val columnsToCode = Seq("col1", "col2")
  val target = "TARGET"

import org.apache.spark.sql.functions._

  val exploded = explode(array(
    (columnsToDrop ++ columnsToCode).map(c =>
      struct(lit(c).alias("k"), col(c).alias("v"))): _*
  )).alias("level")

  val long = df.select(exploded, $"TARGET")

  import org.apache.spark.util.StatCounter

then

long.as[((String, String), Int)].rdd.aggregateByKey(StatCounter())(_ merge _, _ merge _).collect.head
res71: ((String, String), org.apache.spark.util.StatCounter) = ((col2,B),(count: 3, mean: 0,666667, stdev: 0,471405, max: 1,000000, min: 0,000000))

is aggregating statistics of all the unique values for each column.

How can I add to the count (which is 3 for B in col2) a second count (maybe as a tuple) which represents the number of B in col2 where TARGET == 1. In this case, it should be 2.

Community
  • 1
  • 1
Georg Heiler
  • 16,916
  • 36
  • 162
  • 292
  • Sorry but I think that you should think about reformulating your question, I didn't understand what you wanted. I recommend you to include the input and its expected output. – Alberto Bonsanto Jan 04 '17 at 18:36
  • @AlbertoBonsanto in case of the `long.as[((String, String), Int)].rdd.aggregateByKey(StatCounter())(_ merge _, _ merge _).collect.head`for the example of col2 and value B expected output would be `((col2,B),(count: 3, countTarget1:2, mean: 0,666667, stdev: 0,471405, max: 1,000000, min: 0,000000))` – Georg Heiler Jan 04 '17 at 18:38

1 Answers1

2

You shouldn't need additional aggregate here. With binary target column, mean is just an empirical probability of target being equal 1:

  • number of 1 - count * mean
  • number of 0 - count * (1 - mean)
zero323
  • 322,348
  • 103
  • 959
  • 935