5

I am taking this course.

It says that the reduce operation on RDD is done one machine at a time. That mean if your data is split across 2 computers, then the below function will work on data in the first computer, will find the result for that data and then it will take a single value from second machine, run the function and it will continue that way until it finishes with all values from machine 2. Is this correct?

I thought that the function will start operating on both machines at the same time and then once it has results from 2 machines, it will again run the function for the last time

rdd1=rdd.reduce(lambda x,y: x+y)

update 1--------------------------------------------

will below steps give faster answer as compared to reduce function?

Rdd=[3,5,4,7,4]
seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
collData.aggregate(0, seqOp, combOp)

Update 2-----------------------------------

Should both set of codes below execute in same amount time? I checked and it seems that both take the same time.

import datetime

data=range(1,1000000000)
distData = sc.parallelize(data,4)
print(datetime.datetime.now())
a=distData.reduce(lambda x,y:x+y)
print(a)
print(datetime.datetime.now())

seqOp = (lambda x, y: x+y)
combOp = (lambda x, y: x+y)
print(datetime.datetime.now())
b=distData.aggregate(0, seqOp, combOp)
print(b)
print(datetime.datetime.now())
user2543622
  • 5,760
  • 25
  • 91
  • 159

1 Answers1

3

reduce behavior differs a little bit between native (Scala) and guest languages (Python) but simplifying things a little:

  • each partition is processed sequentially element by element
  • multiple partitions can be processed at the same time either by a single worker (multiple executor threads) or different workers
  • partial results are fetched to the driver where the final reduction is applied (this is a part which has different implementation in PySpark and Scala)

Since it looks like you're using Python lets take a look at the code:

  1. reduce creates a simple wrapper for a user provided function:

    def func(iterator):
        ...
    
  2. This is wrapper is used to mapPartitions:

    vals = self.mapPartitions(func).collect()
    

    It should be obvious this code is embarrassingly parallel and doesn't care how the results are utilized

  3. Collected vals are reduced sequentially on the driver using standard Python reduce:

    reduce(f, vals)
    

    where f is a functions passed to RDD.reduce

In comparison Scala will merge partial results asynchronously as they come from the workers.

In case of treeReduce step 3. can performed in a distributed manner as well. See Understanding treeReduce() in Spark

To summarize reduce, excluding driver side processing, uses exactly the same mechanisms (mapPartitions) as the basic transformations like map or filter, and provide the same level of parallelism (once again excluding driver code). If you have a large number of partitions or f is expensive you can parallelism / distribute final merging using tree* family of methods.

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • I read your answer. I am having hard time understanding your inputs and also in figuring out whether the statement made in the course is correct or not. It seems that the statement is incorrect based upon "multiple partitions can be processed at the same time either by a single worker (multiple executor threads) or different workers". Please provide a direct answer? Please highlight what your are saying using an example - for example RDD is [1,2,3,4,5,6] and [1,2,3] are on one machine and rest of the elements on the other machine..How spark and scala handle these separately? Thanks for ur work – user2543622 Mar 24 '16 at 17:48
  • I haven't watched the course, so I cannot refer to that, but if they really tell you that it is done machine at the time you've waisted $200. `reduce`, excluding driver part, is uses the same mechanism are standard Spark transformations, hence exhibits the same parallelism. – zero323 Mar 24 '16 at 17:54
  • Please highlight what your are saying using an example - for example RDD is [1,2,3,4,5,6] and [1,2,3] are on one machine and rest of the elements on the other machine..How spark and scala handle these differently? Also would it be possible to answer my updated question? – user2543622 Mar 24 '16 at 18:56
  • a) There should no significant performance difference between `aggregate` and `reduce` b) I cannot use an example because in general the order is not deterministic. You can see very crude visualization [here](http://stackoverflow.com/q/35190049/1560062) but fundamentally operations are not synchronized. 3) About Scala - like I already said - Scala fetches task results asynchronously not by collect. – zero323 Mar 24 '16 at 19:18
  • @zero323 From you answer, I understand that `reduce` is also executed finally in the driver, am I correct? Thanks – jack Aug 10 '20 at 23:05