4

I have a piece of code which works well but uses pandas data frame groupby processing. However because the file is large ( > 70 million groups I need to convert the code to use PYSPARK data frame. Here is the original code using pandas dataframe with small example data:

import pandas as pd
import numpy as np
from scipy.optimize import minimize

df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20), 
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})

# Starting values
startVal = np.ones(2)*(1/2)

#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})

# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
    return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1'])  **2)

# Define a function to call minimize function 
def RunMinimize(data, startVal, bnds, cons):
    ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
    bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')

Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))

Now I am trying to use pySpark dataframe I convert pandas dataframe to pyspark dataframe for the purpose of testing code.

sdf = sqlContext.createDataFrame(df)
type(sdf)
#  <class 'pyspark.sql.dataframe.DataFrame'>

# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')

# Redefine functions
def sSumSqDif(a, sdf):
    return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)

def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType

udf = UserDefinedFunction(sRunMinimize , StringType())

Results = Sgrp_grpVar.agg(sRunMinimize()) 

However after I tried to define the user defined function udf I got the following errors - see below. Any help correcting my errors or suggesting an alternative approach is highly appreciated.

udf = UserDefinedFunction(sRunMinimize , StringType()) Traceback (most recent call last): File "", line 1, in File "/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py", line 1760, in init self._judf = self._create_judf(name).......

martinarroyo
  • 9,389
  • 3
  • 38
  • 75
Paul
  • 51
  • 1
  • 2
  • 1
    My first observation is that you can't send an entire Spark dataframe as an argument to a udf, only columns of a dataframe. – femibyte Sep 16 '17 at 07:51

1 Answers1

4

You're trying to write a User Defined Aggregate Function which can't be done in pyspark see https://stackoverflow.com/a/40030740.

What you can write instead is a UDF on the data within each group collected as a list:

First for the set-up:

import pandas as pd 
import numpy as np 
from scipy.optimize import minimize
import pyspark.sql.functions as psf
from pyspark.sql.types import *

df = pd.DataFrame({
    'y0': np.random.randn(20),
    'y1': np.random.randn(20),
    'x0': np.random.randn(20), 
    'x1': np.random.randn(20),
    'grpVar': ['a', 'b'] * 10})
sdf = sqlContext.createDataFrame(df)

# Starting values
startVal = np.ones(2)*(1/2)
#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

We'll broadcast these variables since we need to call them on every row of the aggregated dataframe, it will copy the values to every node so they don't have to go get them on the driver:

sc.broadcast(startVal)
sc.broadcast(bnds)

Let's aggregate the data using collect_list, we'll change the structure of the data around so we only have one column (you can collect each column into distinct columns but then you'd have to modify the way you pass data to the function):

Sgrp_grpVar = sdf\
    .groupby('grpVar')\
    .agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data"))
Sgrp_grpVar.printSchema()

    root
     |-- grpVar: string (nullable = true)
     |-- data: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- y0: double (nullable = true)
     |    |    |-- y1: double (nullable = true)
     |    |    |-- x0: double (nullable = true)
     |    |    |-- x1: double (nullable = true)

We can now create our UDF, the returned data type is too complex for pyspark, numpy arrays are not supported by pyspark so we'll need to change it a bit:

def sSumSqDif(a, data):
    return np.sum(
        (data['y0'] - a[0]*data['x0'])**2 \
        + (data['y1'] - a[1]*data['x1'])**2)

def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons):
    data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)})
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
    return ResultByGrp.x.tolist()

sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf(
    lambda data: sRunMinimize(data, startVal, bnds, cons), 
    ArrayType(DoubleType())
)

We can now apply this function to the collected data in each group:

Results = Sgrp_grpVar.select(
    "grpVar", 
    sRunMinimize_udf(startVal, bnds, cons)("data").alias("res")
)
Results.show(truncate=False)

    +------+-----------------------------------------+
    |grpVar|res                                      |
    +------+-----------------------------------------+
    |b     |[0.4073139282953772, 0.5926860717046227] |
    |a     |[0.8275186444565927, 0.17248135554340727]|
    +------+-----------------------------------------+

But I don't think pyspark is the right tool for this.

MaFF
  • 9,551
  • 2
  • 32
  • 41
  • Marie, Thank you very much for your solution. I will try it with my real data. It is large: about 600 million records/70 million groups and 12 variables ( x0..x6 and y0..y6 in my example). In your opinion, if pyspark is not appropriate for this problem what is appropriate? – Paul Sep 16 '17 at 13:44
  • 2
    Marie, Before running your code on a large dataset I tried the code as it is with the small example data. It gave me an error at the last step: Traceback (most recent call last): File "", line 1, in NameError: name 'sdf_agg' is not defined – Paul Sep 18 '17 at 19:06
  • 1
    You need to have scikit learn installed on all the nodes or zip anaconda and use --archive. The functions you call for every row need to load these modules. Best solution is to install anaconda on every node – MaFF Sep 18 '17 at 19:18
  • 2
    Thank you so much for the answer. For anyone who may encounter the NameError of `sdf_egg`, try change `Results = sdf_agg.select(` in the last block to `Results = Sgrp_grpVar.select(` – Molly Zhou Sep 16 '21 at 09:23