2

Under the previous RDD paradigm, I could specify a key and then map an operation to RDD elements corresponding to each key. I don't see a clear way to do this with DataFrame in SparkR as of 1.5.1. What I would like to do is something like a dplyr operation:

new.df <- old.df %>%
  group_by("column1") %>%
  do(myfunc(.))

I currently have a large SparkR DataFrame of the form:

            timestamp  value  id
2015-09-01 05:00:00.0  1.132  24
2015-09-01 05:10:00.0  null   24
2015-09-01 05:20:00.0  1.129  24
2015-09-01 05:00:00.0  1.131  47
2015-09-01 05:10:00.0  1.132  47
2015-09-01 05:10:00.0  null   47

I've sorted by id and timestamp.

I want to group by id, but I don't want to aggregate. Instead I want to do a set of transformations and computations on each group -- for example, interpolating to fill NAs (which are generated when I collect the DataFrame and then convert value to numeric). I've tested using agg, but while my computations do appear to run, the results aren't returned, because I'm not returning a single value in myfunc:

library(zoo)

myfunc <- function(df) {

  df.loc <- collect(df)
  df.loc$value <- as.numeric(df.loc$value)
  df.loc$newparam <- na.approx(df.loc$value, na.rm = FALSE)
  return(df.loc)

  # I also tested return(createDataFrame(sqlContext, df.loc)) here

}

df <- read.df( # some stuff )

grp <- group_by(df, "id")

test <- agg(grp, "myfunc")

15/11/11 18:45:33 INFO scheduler.DAGScheduler: Job 2 finished: dfToCols at NativeMethodAccessorImpl.java:-2, took 0.463131 s
   id
1  24
2  47

Note that the operations in myfunc all work correctly when I filter the DataFrame down to a single id and run it. Based on the time it takes to run (about 50 seconds per task) and the fact that no exceptions are thrown, I believe myfunc is indeed being run on all of the ids -- but I need the output!

Any input would be appreciated.

Ajar
  • 1,786
  • 2
  • 15
  • 23
  • Unfortunately what you are trying to achieve is not possible using DataFrame API. Data is not physically grouped (see [DataFrame groupBy behaviour/optimization](http://stackoverflow.com/q/32902982/1560062)). It is possible to define [UDAFs](https://databricks.com/blog/2015/09/16/spark-1-5-dataframe-api-highlights-datetimestring-handling-time-intervals-and-udafs.html) but these are redirected and overall performance is far from optimal. – zero323 Nov 11 '15 at 19:36
  • That's disappointing. So I have no choice but to convert to an RDD? And the RDD API functions seem to be gone from SparkR 1.5. – Ajar Nov 11 '15 at 20:01
  • 2
    I feel your pain @Ajar. See https://issues.apache.org/jira/browse/SPARK-7230 – piccolbo Nov 12 '15 at 00:40
  • If you want to execute arbitrary R code without interacting with Scala / Python code then you are out of other options. Either `:::` or nothing. Generally speaking DataFrame API is quite expressive (R version a little bit less but still). One possible approach, especially with smart source, is to filter by group -> apply transformation -> union-all. – zero323 Nov 12 '15 at 02:37
  • Within SparkR, is there a way to ensure the filtering by group is done in parallel? Giving that up defeats the purpose of using Spark for this task to begin with. Otherwise, `:::` or Scala are my fallbacks. – Ajar Nov 12 '15 at 16:11
  • I don't understand the question. Could you elaborate what you mean by __filtering by group (...) done in parallel_ – zero323 Nov 15 '15 at 14:09
  • In my original question, I was hoping to be able to pass a custom R function to each group after `groupBy` in order to transform the groups in parallel. If I'm going to `filter` the data instead and apply the transform to the filtered data, it's not happening in parallel. The other option is to use an RDD from the start and use `map` after grouping by a key, but it's been difficult to work with the RDD API in R since it isn't exposed and the documentation is... limited. – Ajar Nov 16 '15 at 21:08

0 Answers0