0
+----+----+--------+
| Id | M1 |  trx   |
+----+----+--------+
| 1  | M1 | 11.35  |
| 2  | M1 | 3.4    |
| 3  | M1 | 10.45  |
| 2  | M1 | 3.95   |
| 3  | M1 | 20.95  |
| 2  | M1 | 25.55  |
| 1  | M1 |  9.95  |
| 2  | M1 | 11.95  |
| 1  | M1 |  9.65  |
| 1  | M1 | 14.54  |
+----+----+--------+

With the above dataframe I should be able to generate a histogram as below using the below code.

val (Range,counts) = df
    .select(col("trx"))
    .rdd.map(r => r.getDouble(0))
    .histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1) 

counts contains the number of elements in each range.

But how to get the sum of the elements, sum(trx), in each range like:

sumOfTrx: Array[Long] = Array(7.3,0,19.6,xx,xx,xx,xx,xx,xx,25.55).

Oli
  • 9,766
  • 5
  • 25
  • 46
abc_spark
  • 383
  • 3
  • 19

1 Answers1

1

So you built a histogram over the trx column and you want to get the sum of the values within each range.

What we can do is define a UDF that returns the index of the range given the value of the trx column. Then, we can use groupBy to compute what you want. The, a few manipulations will allow us to get the Array you want.

// getting the histogram
val (ranges, counts) = df
    .select("trx").rdd.map(_.getDouble(0))
    .histogram(10)

// the UDF I was referring to
val rangeIndex = udf((x : Double) => ranges.lastIndexWhere(x >= _))

// Summing the elements and building a map that associates indices to sums
val sumMap = df
    .withColumn("rangeIndex", rangeIndex('trx))
    .groupBy("rangeIndex")
    .agg(sum('trx))
    .rdd.map(x=> x.getAs[Int]("rangeIndex") -> x.getAs[Double]("sum(trx)") )
    .collectAsMap

// Building the array
ranges.indices.map(i => sumMap.getOrElse(i, 0d)).toArray
// res: Array[Double] = Array(7.35, 0.0, 19.6, 33.75, 0.0, 14.54, 0.0, 20.95, 0.0, 0.0, 25.55)
Oli
  • 9,766
  • 5
  • 25
  • 46
  • HI Oli,This is 99 % percent working. Thanks for that. the count of ranges.indices.map(i => sumMap.getOrElse(i, 0d)).toArray is 11. Is there any way to make this 10 (same as that of counts array) – abc_spark Dec 06 '19 at 18:01
  • This worked.!!!!! Thanks a lot Oli!!!!!!!!! This is a gem!!!!! – abc_spark Dec 07 '19 at 08:21
  • Yes.I have given ranges.dropRight(1) and then val rangeIndex = udf((x : Double) => rangeArray.lastIndexWhere(x >= _)) – abc_spark Dec 09 '19 at 10:02
  • But your approach was awesome. Thanks for that. Do you have any possible solution for https://stackoverflow.com/questions/59224388/histogram-doing-it-in-a-parallel-way – abc_spark Dec 09 '19 at 10:03
  • Thanks, I fixed the answer with your comment. I have an answer for your other question, let me write it down ;) – Oli Dec 09 '19 at 11:15
  • Hi @Oli , Would you be able to advice on the pivot solution here - https://stackoverflow.com/questions/60509684/pivot-in-spark-scala – abc_spark Mar 03 '20 at 15:07
  • I see that someone answered. Do you still need help? – Oli Mar 04 '20 at 16:30
  • Hi @Oli , Would you be able to advice on the issue here ??? Thanks for all your previous support ... :) https://stackoverflow.com/questions/62211108/finding-percentile-in-spark-scala-in-dynamic-way – abc_spark Jun 05 '20 at 09:12