2

I am reading a csv as a Data Frame by below:

val df  = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("D:/ModelData.csv")

Then I group by three columns as below which returns a RelationalGroupedDataset

df.groupBy("col1", "col2","col3")

And I want each grouped data frame to be send through the below function

 def ModelFunction(daf: DataFrame) = {

    //do some calculation

          }

For example if I have col1 having 2 unique (0,1) values and col2 having 2 unique values(1,2) and col3 having 3 unique values(1,2,3) Then i would like to pass each combination grouping to the Model function Like for col1=0 ,col2=1,col3=1 I will havea dataframe and I want to pass that to the ModelFunction and so on for each combination of the three columns.

I tried

df.groupBy("col1", "col2","col3").ModelFunction();

But it throw an error.

.

Any help is appreciated.

Ricky
  • 2,662
  • 5
  • 25
  • 57

1 Answers1

2

The short answer is that you cannot do that. You can only do aggregate functions on RelationalGroupedDataset (either ones you write as UDAF or built in ones in org.apache.spark.sql.functions)

The way I see it you have several options:

Option 1: The amount of data for each unique combination is small enough and not skewed too much compared to other combinations.

In this case you can do:

val grouped = df.groupBy("col1", "col2","col3").agg(collect_list(struct(all other columns)))
grouped.as[some case class to represent the data including the combination].map[your own logistic regression function).

Option 2: If the total number of combinations is small enough you can do:

val values: df.select("col1", "col2", "col3").distinct().collect()

and then loop through them creating a new dataframe from each combination by doing a filter.

Option 3: Write your own UDAF

This would probably not be good enough as the data comes in a stream without the ability to do iteration, however, if you have an implemenation of logistic regression which matches you can try to write a UDAF to do this. See for example: How to define and use a User-Defined Aggregate Function in Spark SQL?

Assaf Mendelson
  • 12,701
  • 5
  • 47
  • 56