0

i have the following Spark DataFrame :

agent_product_sale=data.frame(agent=c('a','b','c','d','e','f','a','b','c','a','b'),
                         product=c('P1','P2','P3','P4','P1','p1','p2','p2','P2','P3','P3'),
                         sale_amount=c(1000,2000,3000,4000,1000,1000,2000,2000,2000,3000,3000))

RDD_aps=createDataFrame(sqlContext,agent_product_sale)

   agent product sale_amount
1      a      P1        1000
2      b      P1        1000
3      c      P3        3000
4      d      P4        4000
5      d      P1        1000
6      c      P1        1000
7      a      P2        2000
8      b      P2        2000
9      c      P2        2000
10     a      P4        4000
11     b      P3        3000

I need to group the Spark DataFrame by agent and for each agent find the product with highest sale_amount

      agent  most_expensive
      a           P4        
      b           P3                
      c           P3        
      d           P4        

I use the following code but it would return the maximum sale_amount for each agent

schema <-  structType(structField("agent", "string"),
 structField("max_sale_amount", "double"))

result <- gapply(
RDD_aps,
c("agent"),
function(key, x) {
y <- data.frame(key,max(x$sale_amount), stringsAsFactors = FALSE)
}, schema)
chessosapiens
  • 3,159
  • 10
  • 36
  • 58
  • Try with `which.max` – akrun Sep 06 '16 at 06:28
  • Or may be `gD <- agg(groupBy(RDD_aps, RDD_aps$agent); agg(arrange(gD, desc(gD$sale_amount)), most_expensive = first(gD$product))` (not tested) – akrun Sep 06 '16 at 06:37
  • I may be wrong, but you could call the `groupBy` again after the `arrange` – akrun Sep 06 '16 at 06:44
  • Try with the second one. i.e. `ar1 <- arrange(gD, desc(gD$sale_amount)); gD2 <- groupBy(ar1, ar1$agent); agg(gD2, most_expensive = first(gD2$product))` – akrun Sep 06 '16 at 06:45
  • @akrun Warning message: 'createDataFrame(sqlContext...)' is deprecated. Use 'createDataFrame(data, schema = NULL, samplingRatio = 1.0)' instead. See help("Deprecated") Error in first(gD2$product) : error in evaluating the argument 'x' in selecting a method for function 'first': Error in gD2$product : $ operator not defined for this S4 class – chessosapiens Sep 06 '16 at 06:52
  • okay, I can't test your code as I don't have sparkR – akrun Sep 06 '16 at 06:54
  • which version of sparkR are you using ? – eliasah Sep 06 '16 at 07:05
  • @akrun Problem solved: ar1 <- arrange(RDD_aps, desc(RDD_aps$sale_amount));collect(summarize(groupBy(ar1,ar1$agent),most_expensive=first(ar1$product))) – chessosapiens Sep 06 '16 at 07:27
  • @sanaz You could post that as a solution and close this. – akrun Sep 06 '16 at 07:28
  • The only issue with this solution is that it doesn't actually scale since it needs to pull all the data to the driver. So the reason why I asked for spark version is to know if you are using spark 2+ you'd be able to use windowspec for spark < 2+ you'd been need to use a join. – eliasah Sep 06 '16 at 08:03
  • @eliasah yes i am using spark2+ and 1. what if i don't use collect() ? then it is scalable , isn't it? 2. can you tell me how windowspec can solve the problem? – chessosapiens Sep 06 '16 at 08:53
  • if you use collect, all the data will be pulled to the driver, thus it can overwhelm the driver and result in a out of memory exception if your data doesn't fit in the driver's memory. WindowSpec on the other will allow your to group by agent, partition by product and order by sale_amount then you can select the first (max). the solution provided here http://stackoverflow.com/questions/33878370/spark-dataframe-select-the-first-row-of-each-group is in the scala, but it's the same principle. – eliasah Sep 06 '16 at 09:06

3 Answers3

1
ar1 <- arrange(RDD_aps,desc(RDD_aps$sale_amount))
collect(summarize(groupBy(ar1,ar1‌​$agent),most_expensi‌​ve=first(ar1$product‌​)))
chessosapiens
  • 3,159
  • 10
  • 36
  • 58
0

with tapply() or aggregate() you can find the max value within a group

agent_product_sale=data.frame(agent=c('a','b','c','d','e','f','a','b','c','a','b'),
        +                               product=c('P1','P2','P3','P4','P1','p1','p2','p2','P2','P3','P3'),
        +                               sale_amount=c(1000,2000,3000,4000,1000,1000,2000,2000,2000,3000,3000))


tapply(agent_product_sale$sale_amount,agent_product_sale$agent, max)
               a    b    c    d    e    f 
            3000 3000 3000 4000 1000 1000 



aggregate(agent_product_sale$sale_amount,by=list(agent_product_sale$agent), max)
          Group.1    x
        1       a 3000
        2       b 3000
        3       c 3000
        4       d 4000
        5       e 1000
        6       f 1000

aggregate returns a data.frame and typply an array, up to you, what you prefer, to continue working with the results.

Benjamin Mohn
  • 301
  • 3
  • 12
0

Window partition in sparkR can help to get the expected outcome. It creates a partition on the agent column, then creates a new column named row_number which is basically a sort order on sales_amount per partition. Finally, filtering them on highest rank and selecting the desired column can yield the result.

df <- as.DataFrame(agent_product_sale)   
ws <- orderBy(windowPartitionBy("agent"),desc(df$sale_amount))  
df_new <-  select(df,c(alias(over(row_number(), ws), "row_number_desc"), names(df)))  
df_new <- withColumnRenamed(df_new, "product", "most_expensive")  
showDF(select(filter(df_new, df_new$row_number_desc ==1), c("agent","most_expensive")))
agent most_expensive
a P4
b P3
c P3
d P4
e P1
f p1

Intermediate table from line 4 will be as below

row_number_desc agent most_expensive sale_amount
1 a P4 4000.0
2 a P3 3000.0
3 a p2 2000.0
4 a P1 1000.0
1 b P3 3000.0
2 b P2 2000.0
3 b p2 2000.0
1 c P3 3000.0
2 c P2 2000.0
1 d P4 4000.0
1 e P1 1000.0
1 f p1 1000.0

I think this windowfunction in sparkR was introduced recently which avoids the usage any kind of apply functions and collect statements.

Martin Gal
  • 16,640
  • 5
  • 21
  • 39
Ash
  • 41
  • 1
  • 1
  • 10