2

I have a wide spark data frame of a few thousand columns by about a million rows, for which I would like to calculate the row totals. My solution so far is below. I used: dplyr - sum of multiple columns using regular expressions and https://github.com/tidyverse/rlang/issues/116

library(sparklyr)
library(DBI)
library(dplyr)
library(rlang)

sc1 <- spark_connect(master = "local")
wide_df = as.data.frame(matrix(ceiling(runif(2000, 0, 20)), 10, 200))
wide_sdf = sdf_copy_to(sc1, wide_df, overwrite = TRUE, name = "wide_sdf")

col_eqn = paste0(colnames(wide_df), collapse = "+" )

# build up the SQL query and send to spark with DBI
query = paste0("SELECT (",
               col_eqn,
               ") as total FROM wide_sdf")

dbGetQuery(sc1, query)

# Equivalent approach using dplyr instead
col_eqn2 = quo(!! parse_expr(col_eqn))

wide_sdf %>% 
    transmute("total" := !!col_eqn2) %>%
        collect() %>%
            as.data.frame()

The problems come when the number of columns is increased. On spark SQL it seems to be calculated one element at a time i.e. (((V1 + V1) + V3) + V4)...) This is leading to errors due to very high recursion.

Does anyone have an alternative more efficient approach? Any help would be much appreciated.

zero323
  • 322,348
  • 103
  • 959
  • 935
swany
  • 65
  • 4

1 Answers1

1

You're out of luck here. One way or another you're are going to hit some recursion limits (even if you go around SQL parser, sufficiently large sum of expressions will crash query planner). There are some slow solutions available:

  • Use spark_apply (at the cost of conversion to and from R):

    wide_sdf %>% spark_apply(function(df) { data.frame(total = rowSums(df)) })
    
  • Convert to long format and aggregate (at the cost of explode and shuffle):

    key_expr <- "monotonically_increasing_id() AS key"
    
    value_expr <- paste(
     "explode(array(", paste(colnames(wide_sdf), collapse=","), ")) AS value"
    )
    
    wide_sdf %>% 
      spark_dataframe() %>% 
      # Add id and explode. We need a separate invoke so id is applied
      # before "lateral view"
      sparklyr::invoke("selectExpr", list(key_expr, "*")) %>% 
      sparklyr::invoke("selectExpr", list("key", value_expr)) %>% 
      sdf_register() %>% 
      # Aggregate by id
      group_by(key) %>% 
      summarize(total = sum(value)) %>% 
      arrange(key)
    

To get something more efficient you should consider writing Scala extension and applying sum directly on a Row object, without exploding:

package com.example.sparklyr.rowsum

import org.apache.spark.sql.{DataFrame, Encoders}

object RowSum {
  def apply(df: DataFrame, cols: Seq[String]) = df.map {
    row => cols.map(c => row.getAs[Double](c)).sum
  }(Encoders.scalaDouble)
}

and

invoke_static(
  sc, "com.example.sparklyr.rowsum.RowSum", "apply",
  wide_sdf %>% spark_dataframe
) %>% sdf_register()
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Thanks very much for your response. I think I'll see if collecting the data frame to R will work for my program. If not I'll look into scala extensions. – swany Dec 15 '17 at 12:10