0

I have a Pyspark DataFrame which I want to aggregate using a function that does row-by-row operations.

I have 4 columns, and for each unique value in column A I have to do the row-by-row aggregation in columns B,C,D

I am using this method :

  1. get unique values in A using

    A_uniques = df.select('A').distinct()
    
  2. def func(x):
        y  = df.filter(df.A==x)
        y  = np.array(y.toPandas())
        for i in y.shape[0]:
            y[i,1] = y[i-1,0]
            y[i,0] = (y[i,0]+y[i,2])/y[i,3] 
        agg = sum(y[:,1])
        return agg
    
  3. A_uniques.rdd.map(lambda x: (x['A'], func(x['A'])))
    

I am getting this error :

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o64.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)

Is there a solution to saving numpy arrays in RDDs ? Or Can I do this entire operation in some other way ?

  • can you post sample input and output so we can try some different approach. – Rakesh Kumar Dec 19 '17 at 08:42
  • 1
    i think you are looking for `groupby('col').agg(sum(col2))` – Rakesh Kumar Dec 19 '17 at 08:43
  • The issue you have is that you are referencing and rdd from within an rdd transformation. If your aggregation uses builtin pyspark functions then you can use DataFrame `groupby(...).agg(...)`. If not, then you may need to use rdd `groupby` and a bespoke aggregation. – ags29 Dec 19 '17 at 08:51
  • @ags29 Once I groupby() can I send the data into a function, convert it into a numpy array an do the transformations ? – Ipsit Pradhan Dec 19 '17 at 11:23
  • Unfortunately not, as far as I know, PySpark does not allow you to create UDAF (user defined aggregation functions) as this stage. – ags29 Dec 19 '17 at 12:23
  • if you provide some idea of the actual aggregation you want to perform, then maybe we can find another way to do it without using base Python. – ags29 Dec 19 '17 at 12:24
  • @ags29 You can take a look now – Ipsit Pradhan Dec 21 '17 at 06:41

1 Answers1

0

In Pyspark, use groupBy() (in my case I grouped by 2 cols) function to obtain a GroupedDataFrame and pipe the agg() function. See the example below...

sqlContext.sql("select * from retail_db.orders").groupBy("order_status", "order_date").agg({"order_customer_id": "sum", "order_id": "count"}).show()

+---------------+--------------------+----------------------+---------------+
|   order_status|          order_date|sum(order_customer_id)|count(order_id)|
+---------------+--------------------+----------------------+---------------+
|PENDING_PAYMENT|2013-07-28 00:00:...|                237876|             37|
|       COMPLETE|2013-08-22 00:00:...|                415843|             64|
|PENDING_PAYMENT|2013-10-20 00:00:...|                168223|             28|
|SUSPECTED_FRAUD|2013-11-22 00:00:...|                 36354|              6|
|PENDING_PAYMENT|2013-12-19 00:00:...|                131972|             22|
|PENDING_PAYMENT|2014-03-12 00:00:...|                352832|             52|
|        ON_HOLD|2014-03-28 00:00:...|                 74970|             13|
|SUSPECTED_FRAUD|2014-04-14 00:00:...|                 18145|              2|
|        PENDING|2014-04-21 00:00:...|                174419|             26|
|         CLOSED|2014-06-04 00:00:...|                 66677|             10|
|PENDING_PAYMENT|2014-06-26 00:00:...|                249542|             45|
|PENDING_PAYMENT|2013-08-17 00:00:...|                405980|             56|
|         CLOSED|2013-09-10 00:00:...|                164670|             23|
|SUSPECTED_FRAUD|2013-09-19 00:00:...|                 26613|              4|
|        PENDING|2013-09-26 00:00:...|                176547|             28|
|       COMPLETE|2013-10-20 00:00:...|                314462|             54|
|       CANCELED|2013-10-31 00:00:...|                 36881|              6|
|     PROCESSING|2013-11-09 00:00:...|                149164|             23|
| PAYMENT_REVIEW|2013-11-29 00:00:...|                 17368|              3|
|SUSPECTED_FRAUD|2013-12-11 00:00:...|                 45085|              7|
+---------------+--------------------+----------------------+---------------+
only showing top 20 rows

You can also use grouped_Series_Owner = x_gb["Owner"].apply(list) .apply() function for GroupedDataFrame, in this example I converted the aggregated data into a list and worked with them.

CarloV
  • 132
  • 1
  • 12
  • The aggregation function is much more complicated in my case and requires numpy array operations. – Ipsit Pradhan Dec 19 '17 at 13:07
  • `def func(x): y = df.filter(df.A==x) y = np.array(y.toPandas()) #Aggregation using a numpy array happens here return agg` Is this one your agg function? For more complicated aggregations consider using RDD [aggregateByKey](https://spark.apache.org/docs/2.2.1/api/python/pyspark.html?highlight=aggregatebykey#pyspark.RDD.aggregateByKey) – CarloV Dec 19 '17 at 13:11
  • To use agg with custom functions [see this](https://stackoverflow.com/questions/35989558/pyspark-custom-function-in-aggregation-on-grouped-data). – CarloV Dec 19 '17 at 13:18