2

Let's say I want to execute something as follows:

library(SparkR)
...
df = spark.read.parquet(<some_address>)
df.gapply(
    df,
    df$column1,
    function(key, x) {
        return(data.frame(x, newcol1=f1(x), newcol2=f2(x))
    }
)

where the return of the function has multiple rows. To be clear, the examples in the documentation (which sadly echoes much of the Spark documentation where the examples are trivially simple) don't help me identify whether this will be handled as I expect.

I would expect that the outcome of this would be, for k groups created in the DataFrame with n_k output rows per group, that the result of the gapply() call would have sum(1..k, n_k) rows, where the key value is replicated for each of n_k rows for each group in key k ... However, the schema-field suggests to me that this is not how this will be handled - in fact it suggests that it will either want the result pushed into a single row.

Hopefully this is clear, albeit theoretical (I'm sorry I can't share my actual code example). Can someone verify or explain how such a function will actually be treated?

MichaelChirico
  • 33,841
  • 14
  • 113
  • 198
Matt Anthony
  • 121
  • 8

1 Answers1

0

Exact expectations regarding input and output are clearly stated in the official documentation:

Apply a function to each group of a SparkDataFrame. The function is to be applied to each group of the SparkDataFrame and should have only two parameters: grouping key and R data.frame corresponding to that key. The groups are chosen from SparkDataFrames column(s). The output of function should be a data.frame.

Schema specifies the row format of the resulting SparkDataFrame. It must represent R function’s output schema on the basis of Spark data types. The column names of the returned data.frame are set by user. Below is the data type mapping between R and Spark.

In other words your function should take a key and data.frame of rows corresponding to that key and return data.frame that can be represented using Spark SQL types with schema provided as schema argument. There are no restriction regarding number of rows. You could for example apply identity transformation as follows:

df <- as.DataFrame(iris)

gapply(df, "Species", function(k, x) x, schema(df))

the same way as aggregations:

gapply(df, "Species",
  function(k, x) {
    dplyr::summarize(dplyr::group_by(x, Species), max(Sepal_Width))
  },
  structType(
    structField("species", "string"),
    structField("max_s_width", "double"))
)

although in practice you should prefer aggregations directly on DataFrame (groupBy %>% agg).

Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
  • so per posting rules I certainly already read the documentation, and the documentation's clarity is eye of the beholder (it wasn't clear to me at all that a multi-row data frame is allowed, nor is there an example showing it). the identity transform would be an obvious example, so if that works, that should be in the documentation - and it's not. also your second example function would only return a single row since you're grouping by Species on the outside and then again on the inside meaning the insider dpylr::group_by is redundant ... so that doesn't necessarily illustrate it either. – Matt Anthony Sep 08 '16 at 17:46
  • The second example is just to show that number of row can be different than the number of row on input. In practice you think of `gapply` as an equivalent of `dplyr::group_by %>% dplyr::summarize` or `split %>% lapply` – zero323 Sep 08 '16 at 17:49
  • nonetheless, I've run the identity and several other examples and confirmed for myself that returning a multi-row data frame is valid. thus accepting this answer, with the caveats about the documentation in SparkR notwithstanding. – Matt Anthony Sep 08 '16 at 17:50
  • don't mean to sound rude, but I have a growing frustration with a lot of the documentation in the Spark-verse using pointlessly trivial examples (usually only one or two) in documentation that don't adequately address even a modest amount of the possible use cases for the various functions. For example, using 'df = sc.parallelize([1,2,3,4,5,6])' as the seed data to demonstrate an operation ... – Matt Anthony Sep 08 '16 at 17:53
  • 4
    No offense taken me but if you think there is something missing just open [a JIRA](https://issues.apache.org/jira/browse/SPARK) and make a PR. Documentation improvements are usually welcome and a better documentation is always a good thing. – zero323 Sep 08 '16 at 17:58