0

I have a python program to analyse data and want to run it with Spark. I distribute data among workers and do some transformation on it. But finally I need to collect the results to the master node and run another function on it.

On driver program I have this code:

sc = SparkContext(conf=spark_conf)
sc.parallelize(group_list, 4) \
    .map(function1, preservesPartitioning=True) \
    .map(function2, preservesPartitioning=True) \
    .map(function3, preservesPartitioning=True) \
    .map(function4, preservesPartitioning=True) \
    .map(function5, preservesPartitioning=True) \
    .map(function6, preservesPartitioning=True) \
    .map(function7, preservesPartitioning=True) \
    .map(function8, preservesPartitioning=True) \
    .map(function9, preservesPartitioning=True)

The last RDD which is made by function9 is a table with several rows and unique key. When master node collect all the last RDD from workers, they have repetitive rows in master node. I have to group by the last table and do some aggregation on some columns, so I have a final function which takes the last table and makes group by and aggregation on it. But I do not know how to pass the last RDD on the final function.

For example on worker1, I have this data:

    key    count   average
     B       3       0.2
     x       2       0.1
     y       5       1.2

On worker2, I have this data:

    key    count    average
     B       2         0.1
     c       1         0.01
     x       3         0.34

When master node receives all data from workers, it has:

    key    count    average
     B       3       0.2
     x       2       0.1
     y       5       1.2
     B       2       0.1
     c       1       0.01
     x       3       0.34

You see that data have two B and two x key. I have to use another function in master node to group by on key column and calculate new average for the average column. I used reduce and give my final function to it, but it gives me error since it takes two arguments. Would you please guide me what spark action I can use to run my function on the last RDD?

Any guidance would be really appreciated.

M_Gh
  • 1,046
  • 4
  • 17
  • 43

2 Answers2

1

I suggest you to pass to a DataFrame format (it's simpler to use), then apply this:

df.groupBy('key').agg(f.sum('count'), f.avg('average'))

If you want to keep the rdd format you should do something like this but applying average instead of list.

From what you wrote this should work:

sqlContext = sql.SQLContext(sc)
from pyspark.sql import SQLContext

 (sqlContext.createDataFrame(
     [['B',3,0.2],
      ['x',2,0.1],
      ['y',5,1.2],
      ['B',2,0.1],
      ['c',1,0.01],
      ['x',3,0.34]], ['key', 'count', 'average'])
 .groupBy('key')
 .agg(f.sum('count').alias('count'), f.avg('average').alias('avg'))
 .show()
)

You can(and probably should) pass also the initial rdd sc.parallelize(group_list, 4), in that case f.sum() should be f.count(). Hope this helps

ggagliano
  • 1,004
  • 1
  • 11
  • 27
  • Dear @ggagliano, thank you for your feedback, however I knew about **df.goupby**. The problem is how to have all worker rdd in the master to run **df.groupby** in the master node. – M_Gh May 05 '20 at 09:10
  • If I understand well, I don't think this would be the most efficient way of using spark. You should retrieve to the master only the final results of your aggregations. What I'd do is to groupBy..agg an then collect to the master node. But, if you are confident that your rows are few you can collect before the final aggregation and, let's say, convert to a pandas dataframe and do your aggregations on a single node. Hope this helps – ggagliano May 05 '20 at 09:23
  • You mean, if I do **collect()** action in the program and convert the result to the **pandas Dataframe**, I can do **groupby** on master node? Many thanks for your guidance. – M_Gh May 05 '20 at 09:29
  • You can directly use the df.toPandas() to collect as Pandas Dataframe. It won't run exactly on the spark master node since you are going outside spark. It will run on your current python interpreter, for this reason I don't suggest you to do so if you are not confident on the cardinality of rows. In the case, it's better to do something like df.limit(10000).toPandas() to be sure not collecting a billion of rows – ggagliano May 05 '20 at 09:34
  • In fact, I did **df.groupby** on each worker before, but since I could not send all billion of rows to the workers I had to divide them among workers. Therefore, when I collect all data from workers I have repetitive rows in the master node, as a result of that I must do **df.groupby** in master node again to remove repetitive rows. Would you please write your answer in the new post to choose it as an accepted answer? – M_Gh May 05 '20 at 09:49
  • I updated the answer, hope this help you understand what I meant – ggagliano May 05 '20 at 10:30
  • There are some transformations in spark that may help you achieve what you want: `reducebykey` and `groubbykey`. Look in [spark official doc](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) – Henrique Branco May 05 '20 at 10:40
  • Dear @ggagliano, when I run the above code, I receive this error: **AttributeError: 'RDD' object has no attribute 'toDF'** – M_Gh May 06 '20 at 05:24
  • you probably missed the sqlContext import (https://stackoverflow.com/questions/47341048/converting-rdd-to-dataframe-attributeerror-rdd-object-has-no-attribute-todf), anyway I updated the answer, there was a logic error (count of the counts instead of sum of counts) and I added sqlcontext import – ggagliano May 06 '20 at 09:10
  • **toDF()** is for Spark Dataframe, even though I have pandas Dataframe. – M_Gh May 09 '20 at 07:34
-1

For example I have a pandas Dataframe like this:

'a'     'b'
 1       3
 1       4
 2       5

I wrote a function to use in groupby:

def process_json(x):
print(x)
temp = 0
for item in x.items():
    temp += item[1]
print('-------', temp)

Therefore,

a.groupby(['a'])['b'].agg(process_json)

Output is:

0    3
1    4
Name: b, dtype: int64
------- 7
2    5
Name: b, dtype: int64
------- 5
M_Gh
  • 1,046
  • 4
  • 17
  • 43