16

I have an rdd of (String,Int) which is sorted by key

val data = Array(("c1",6), ("c2",3),("c3",4))
val rdd = sc.parallelize(data).sortByKey

Now I want to start the value for the first key with zero and the subsequent keys as sum of the previous keys.

Eg: c1 = 0 , c2 = c1's value , c3 = (c1 value +c2 value) , c4 = (c1+..+c3 value) expected output:

(c1,0), (c2,6), (c3,9)...

Is it possible to achieve this ? I tried it with map but the sum is not preserved inside the map.

var sum = 0 ;
val t = keycount.map{ x => { val temp = sum; sum = sum + x._2 ; (x._1,temp); }}
Knight71
  • 2,927
  • 5
  • 37
  • 63
  • It is sequential and hence not parallelizable so this is seriously not the right way to use Spark. I would suggest that you read up on what Spark is. – Johan S Feb 02 '16 at 13:21
  • I'm sorry but even sequentially I don't understand the relation between your input data and your output ? how do you compute it ? – eliasah Feb 02 '16 at 14:30
  • @JohanS Yes this is sequential and I also believe this is not the right way to use spark. In our use case we ended up in this way. Let me try another way to achieve the functionality – Knight71 Feb 02 '16 at 17:17
  • @eliasah I have corrected my expected output. – Knight71 Feb 02 '16 at 17:18
  • 1
    You need to group by key and sum and then coalesce into one partition to calculate the cumulative sum. For your (strange?) variant of cumulative sum then all you have to do is subtract the value of the first key from all the others... There is a way to parallelize this using a total order partitioner if your key/sum pairs won't fit into memory but to be honest you're probably better off just using MapReduce at that stage. – A.R.Ferguson Feb 02 '16 at 22:00
  • I removed Fibonacci from the title because it is not even remotely the same category of problem and it was rather misleading. – zero323 Feb 02 '16 at 22:27
  • none the less interesting – thebluephantom Feb 11 '18 at 09:53
  • @eliasah Why would you vote to close it, the best solution is rather interesting and in fact tells us something useful – thebluephantom Feb 13 '18 at 20:28
  • @thebluephantom the comment is old and I should remove it. It was written before the OP updated his question. – eliasah Feb 13 '18 at 20:31

5 Answers5

18
  1. Compute partial results for each partition:

    val partials = rdd.mapPartitionsWithIndex((i, iter) => {
      val (keys, values) = iter.toSeq.unzip
      val sums  = values.scanLeft(0)(_ + _)
      Iterator((keys.zip(sums.tail), sums.last))
    })
    
  2. Collect partials sums

    val partialSums = partials.values.collect
    
  3. Compute cumulative sum over partitions and broadcast it:

    val sumMap = sc.broadcast(
      (0 until rdd.partitions.size)
        .zip(partialSums.scanLeft(0)(_ + _))
        .toMap
    )
    
  4. Compute final results:

    val result = partials.keys.mapPartitionsWithIndex((i, iter) => {
      val offset = sumMap.value(i)
      if (iter.isEmpty) Iterator()
      else iter.next.map{case (k, v) => (k, v + offset)}.toIterator
    })
    
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Wow, highly educational, still working thru it. But I get on an example res12: Array[(Seq[(String, Int)], Int)] = Array((Stream((c01,1), ?),10), (Stream((c05,5), ?),18), (Stream((c08,8), ?),27)). What is the ? – thebluephantom Feb 13 '18 at 19:58
  • @thebluephantom Seems this code need `rdd` to be sorted, because you actually computes the cumulative sum in each partition. So the next partition need no repeated computation? – calvin Apr 24 '20 at 01:56
2

Spark has buit-in supports for hive ANALYTICS/WINDOWING functions and the cumulative sum could be achieved easily using ANALYTICS functions.

Hive wiki ANALYTICS/WINDOWING functions.

Example:

Assuming you have sqlContext object-

val datardd = sqlContext.sparkContext.parallelize(Seq(("a",1),("b",2), ("c",3),("d",4),("d",5),("d",6)))
import sqlContext.implicits._

//Register as test table
datardd.toDF("id","val").createOrReplaceTempView("test")

//Calculate Cumulative sum
sqlContext.sql("select id,val, " +
  "SUM(val) over (  order by id  rows between unbounded preceding and current row ) cumulative_Sum " +
  "from test").show()

This approach cause to below warning. In case executor runs outOfMemory, tune job’s memory parameters accordingly to work with huge dataset.

WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation

I hope this helps.

Rahul Sharma
  • 5,614
  • 10
  • 57
  • 91
1

Here is a solution in PySpark. Internally it's essentially the same as @zero323's Scala solution, but it provides a general-purpose function with a Spark-like API.

import numpy as np
def cumsum(rdd, get_summand):
    """Given an ordered rdd of items, computes cumulative sum of
    get_summand(row), where row is an item in the RDD.
    """
    def cumsum_in_partition(iter_rows):
        total = 0
        for row in iter_rows:
            total += get_summand(row)
            yield (total, row)
    rdd = rdd.mapPartitions(cumsum_in_partition)

    def last_partition_value(iter_rows):
        final = None
        for cumsum, row in iter_rows:
            final = cumsum
        return (final,)

    partition_sums = rdd.mapPartitions(last_partition_value).collect()
    partition_cumsums = list(np.cumsum(partition_sums))
    partition_cumsums = [0] + partition_cumsums
    partition_cumsums = sc.broadcast(partition_cumsums)

    def add_sums_of_previous_partitions(idx, iter_rows):
        return ((cumsum + partition_cumsums.value[idx], row)
            for cumsum, row in iter_rows)
    rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions)
    return rdd

# test for correctness by summing numbers, with and without Spark
rdd = sc.range(10000,numSlices=10).sortBy(lambda x: x)
cumsums, values = zip(*cumsum(rdd,lambda x: x).collect())
assert all(cumsums == np.cumsum(values))
Paul
  • 3,321
  • 1
  • 33
  • 42
1

I came across a similar problem and implemented @Paul 's solution. I wanted to do cumsum on a integer frequency table sorted by key(the integer), and there was a minor problem with np.cumsum(partition_sums), error being unsupported operand type(s) for +=: 'int' and 'NoneType'.

Because if the range is big enough, the probability of each partition having something is thus big enough(no None values). However, if the range is much smaller than count, and number of partitions remains the same, some of the partitions would be empty. Here comes the modified solution:

def cumsum(rdd, get_summand):
    """Given an ordered rdd of items, computes cumulative sum of
    get_summand(row), where row is an item in the RDD.
    """
    def cumsum_in_partition(iter_rows):
        total = 0
        for row in iter_rows:
            total += get_summand(row)
            yield (total, row)
    rdd = rdd.mapPartitions(cumsum_in_partition)
    def last_partition_value(iter_rows):
        final = None
        for cumsum, row in iter_rows:
            final = cumsum
        return (final,)
    partition_sums = rdd.mapPartitions(last_partition_value).collect()
    # partition_cumsums = list(np.cumsum(partition_sums))

    #----from here are the changed lines
    partition_sums = [x if x is not None else 0 for x in partition_sums]
    temp = np.cumsum(partition_sums)
    partition_cumsums = list(temp)
    #----

    partition_cumsums = [0] + partition_cumsums   
    partition_cumsums = sc.broadcast(partition_cumsums)
    def add_sums_of_previous_partitions(idx, iter_rows):
        return ((cumsum + partition_cumsums.value[idx], row)
            for cumsum, row in iter_rows)
    rdd = rdd.mapPartitionsWithIndex(add_sums_of_previous_partitions)
    return rdd

#test on random integer frequency
x = np.random.randint(10, size=1000)
D = sqlCtx.createDataFrame(pd.DataFrame(x.tolist(),columns=['D']))
c = D.groupBy('D').count().orderBy('D')
c_rdd =  c.rdd.map(lambda x:x['count'])
cumsums, values = zip(*cumsum(c_rdd,lambda x: x).collect())
AbdealiLoKo
  • 3,261
  • 2
  • 20
  • 36
  • 1
    Seems like this will introduce errors because the length of partition_sums will not be the number of partitions anymore after this. I think the correct one is `[x if x is not None else 0 for x in partition_sums]` – AbdealiLoKo Jul 21 '17 at 18:34
-1

you can want to try out with windows over using rowsBetween. hope still helpful.

import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.Window

val data = Array(("c1",6), ("c2",3),("c3",4))
val df = sc.parallelize(data).sortByKey().toDF("c", "v")
val w = Window.orderBy("c")
val r = df.select( $"c", sum($"v").over(w.rowsBetween(-2, -1)).alias("cs"))
display(r)
keepscoding
  • 146
  • 1
  • 7