4
+----+----+--------+
| Id | M1 |  trx   |
+----+----+--------+
| 1  | M1 | 11.35  |
| 2  | M1 | 3.4    |
| 3  | M1 | 10.45  |
| 2  | M1 | 3.95   |
| 3  | M1 | 20.95  |
| 2  | M2 | 25.55  |
| 1  | M2 |  9.95  |
| 2  | M2 | 11.95  |
| 1  | M2 |  9.65  |
| 1  | M2 | 14.54  |
+----+----+--------+

With the above dataframe I should be able to generate a histogram as below using the below code. Similar Queston is here

val (Range,counts) = df
.select(col("trx"))
.rdd.map(r => r.getDouble(0))
.histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1) 

But Issue here is,how can I parallely create the histogram based on column 'M1' ?This means I need to have two histogram output for column Values M1 and M2.

abc_spark
  • 383
  • 3
  • 19

3 Answers3

2

First, you need to know that histogram generates two separate sequential jobs. One to detect the minimum and maximum of your data, one to compute the actual histogram. You can check this using the Spark UI.

We can follow the same scheme to build histograms on as many columns as you wish, with only two jobs. Yet, we cannot use the histogram function which is only meant to handle one collection of doubles. We need to implement it by ourselves. The first job is dead simple.

val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head

Then we compute locally the ranges of the histogram. Note that I use the same ranges for all the columns. It allows to compare the results easily between the columns (by plotting them on the same figure). Having different ranges per column would just be a small modification of this code though.

val hist_size = 10
val hist_step = (max_trx - min_trx) / hist_size
val hist_ranges = (1 until hist_size)
    .scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
// I add max_trx manually to avoid rounding errors that would exclude the value

That was the first part. Then, we can use a UDF to determine in what range each value ends up, and compute all the histograms in parallel with spark.

val range_index = udf((x : Double) => hist_ranges.lastIndexWhere(x >= _))
val hist_df = df
    .withColumn("rangeIndex", range_index('trx))
    .groupBy("M1", "rangeIndex")
    .count()
// And voilà, all the data you need is there.
hist_df.show()
+---+----------+-----+
| M1|rangeIndex|count|
+---+----------+-----+
| M2|         2|    2|
| M1|         0|    2|
| M2|         5|    1|
| M1|         3|    2|
| M2|         3|    1|
| M1|         7|    1|
| M2|        10|    1|
+---+----------+-----+

As a bonus, you can shape the data to use it locally (within the driver), either using the RDD API or by collecting the dataframe and modifying it in scala.

Here is one way to do it with spark since this is a question about spark ;-)

val hist_map = hist_df.rdd
    .map(row => row.getAs[String]("M1") ->
             (row.getAs[Int]("rangeIndex"), row.getAs[Long]("count")))
    .groupByKey
    .mapValues( _.toMap)
    .mapValues( hists => (1 to hist_size)
                    .map(i => hists.getOrElse(i, 0L)).toArray )
    .collectAsMap

EDIT: how to build one range per column value:

Instead of computing the min and max of M1, we compute it for each value of the column with groupBy.

val min_max_map = df.groupBy("M1")
    .agg(min('trx), max('trx))
    .rdd.map(row => row.getAs[String]("M1") ->
      (row.getAs[Double]("min(trx)"), row.getAs[Double]("max(trx)")))
    .collectAsMap // maps each column value to a tuple (min, max)

Then we adapt the UDF so that it uses this map and we are done.

// for clarity, let's define a function that generates histogram ranges
def generate_ranges(min_trx : Double, max_trx : Double, hist_size : Int) = {
    val hist_step = (max_trx - min_trx) / hist_size
    (1 until hist_size).scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
}
// and use it to generate one range per column value
val range_map = min_max_map.keys
    .map(key => key ->
        generate_ranges(min_max_map(key)._1, min_max_map(key)._2, hist_size))
    .toMap

val range_index = udf((x : Double, m1 : String) =>
                       range_map(m1).lastIndexWhere(x >= _))

Finally, just replace range_index('trx) by range_index('trx, 'M1) and you will have one range per column value.

Oli
  • 9,766
  • 5
  • 25
  • 46
  • This one is awesome. But any way to take max and min per each column Values 'M1', instead of taking the entire column and then creating the histogram. Probably some tweaking here -- val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head – abc_spark Dec 09 '19 at 12:13
  • This means range Index differs for each and every group of M1 – abc_spark Dec 09 '19 at 12:26
  • That's possible with something like `df.groupBy("M1").agg(min('trx), max('trx))`. I'll add the possibility to the answer. – Oli Dec 09 '19 at 12:58
  • Thanks for that. But it seems there are some error with my code then. – abc_spark Dec 09 '19 at 13:12
  • I edited the answer, there are a few other things that needs to be adapted. – Oli Dec 09 '19 at 13:14
  • hist_df -- this DF will omit some rangeIndex, if no trx belongs to that range. Is there any way to include all rangeIndex with this DF.This wil helps in creating the full histogram for all the histogram indexes. – abc_spark Dec 10 '19 at 10:54
  • That's what I am doing just after that. `hist_map` contains all the indices. This line `.mapValues( hists => (1 to hist_size).map(i => hists.getOrElse(i, 0L)).toArray )` in particular adds zeros if the index is not there. – Oli Dec 10 '19 at 13:45
  • Exactly. But when it is applied to the UDF, If the matching ranges are not there it wont populate the rangeindex to the DF. But this is fine.I got the solution for that... – abc_spark Dec 10 '19 at 14:03
  • Glad I could help! – Oli Dec 10 '19 at 14:54
1

The way I do histograms with Spark is as follows:

val binEdes = 0.0 to 25.0 by 5.0
val bins = binEdes.init.zip(binEdes.tail).toDF("bin_from","bin_to")

df
  .join(bins,$"trx">=$"bin_from" and $"trx"<$"bin_to","right")
  .groupBy($"bin_from",$"bin_to")
  .agg(
    count($"trx").as("count")
   // add more, e.g. sum($"trx)
  )
  .orderBy($"bin_from",$"bin_to")
  .show()

gives:

+--------+------+-----+
|bin_from|bin_to|count|
+--------+------+-----+
|     0.0|   5.0|    2|
|     5.0|  10.0|    2|
|    10.0|  15.0|    4|
|    15.0|  20.0|    0|
|    20.0|  25.0|    1|
+--------+------+-----+

Now if you have more dimensions, just add that to the groupBy-clause

df
  .join(bins,$"trx">=$"bin_from" and $"trx"<$"bin_to","right")
  .groupBy($"M1",$"bin_from",$"bin_to")
  .agg(
    count($"trx").as("count")
  )
  .orderBy($"M1",$"bin_from",$"bin_to")
  .show()

gives:

+----+--------+------+-----+
|  M1|bin_from|bin_to|count|
+----+--------+------+-----+
|null|    15.0|  20.0|    0|
|  M1|     0.0|   5.0|    2|
|  M1|    10.0|  15.0|    2|
|  M1|    20.0|  25.0|    1|
|  M2|     5.0|  10.0|    2|
|  M2|    10.0|  15.0|    2|
+----+--------+------+-----+

You may tweak to code a bit to get the output you want, but this should get you started. You could also do the UDAF approach I posted here : Spark custom aggregation : collect_list+UDF vs UDAF

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • Thanks for the approach.This is a good one Indeed! But removal of nulls could be difficult in my case.I am still trying to figure out how I can use rdd.histogram() function for multiple M1. I am not able to do a group by M1 and then do a histogram over that. That is the real challenge!!! – abc_spark Dec 07 '19 at 17:58
1

I think its not easily possible using RDD's, because histogram is only available on DoubleRDD, i.e. RDDs of Double. If you really need to use RDD API, you can do it in parallel by firing parallel jobs, this can be done using scalas parallel collection:

import scala.collection.parallel.immutable.ParSeq

val List((rangeM1,histM1),(rangeM2,histM2)) = ParSeq("M1","M2")
  .map(c => df.where($"M1"===c)
  .select(col("trx"))
  .rdd.map(r => r.getDouble(0))
  .histogram(10)
).toList



println(rangeM1.toSeq,histM1.toSeq)
println(rangeM2.toSeq,histM2.toSeq)

gives:

(WrappedArray(3.4, 5.155, 6.91, 8.665000000000001, 10.42, 12.175, 13.930000000000001, 15.685, 17.44, 19.195, 20.95),WrappedArray(2, 0, 0, 0, 2, 0, 0, 0, 0, 1))
(WrappedArray(9.65, 11.24, 12.83, 14.420000000000002, 16.01, 17.6, 19.19, 20.78, 22.37, 23.96, 25.55),WrappedArray(2, 1, 0, 1, 0, 0, 0, 0, 0, 1))

Note that the bins differ here for M1 and M2

Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
  • That was a good approach. But is there any way to get a dataframe from this in the format. M1,2, 0, 0, 0, 2, 0, 0, 0, 0, 1 and M2,2, 1, 0, 1, 0, 0, 0, 0, 0, 1. I do not want the ranges here. This is again a challenge for me...Any help is much appreciated – abc_spark Dec 07 '19 at 19:54