2

This is proving difficult to find for some reason. I can easily find the repartition function in pyspark and in sparkr, but no such function seems to exist in sparklyr.

Does anyone know how to repartition a Spark dataframe in sparklyr?

eliasah
  • 39,588
  • 11
  • 124
  • 154
Dave Kincaid
  • 3,970
  • 3
  • 24
  • 32

2 Answers2

6

Now you can use sdf_repartition(), e.g.

iris_tbl %>%
  sdf_repartition(5L, columns = c("Species", "Petal_Width")) %>%
  spark_dataframe() %>%
  invoke("queryExecution") %>%
  invoke("optimizedPlan") 
# <jobj[139]>
#   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
# RepartitionByExpression [Species#14, Petal_Width#13], 5
#                          +- InMemoryRelation [Sepal_Length#10, Sepal_Width#11, Petal_Length#12, Petal_Width#13, Species#14], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
#                                               +- *FileScan csv [Sepal_Length#10,Sepal_Width#11,Petal_Length#12,Petal_Width#13,Species#14] Batched: false, Format: CSV, Location: InMemoryFileIndex[file:/var/folders/ry/_l__tbl57d940bk2kgj8q2nj3s_d9b/T/Rtmpjgtnl6/spark_serializ..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>
kevinykuo
  • 4,600
  • 5
  • 23
  • 31
  • I must be missing a step. I'm getting an error trying to use this: `Error: java.lang.Exception: No matched method found for class sparklyr.Repartition.repartition at sparklyr.Invoke$.invoke(invoke.scala:99) at ` – Dave Kincaid Jun 07 '17 at 00:23
  • @DaveKincaid could you paste your function call? If you don't pass an integer to `partitions` it'll fail, but I've opened a PR relaxing that requirement. – kevinykuo Jun 07 '17 at 22:21
  • You must have fixed it since it is working now. Either that or I was misusing it before. In any event many thanks for adding this! – Dave Kincaid Jun 19 '17 at 20:14
2

You can try something like this

library(dplyr)
library(stringi)


#' @param df tbl_spark
#' @param numPartitions numeric number of partitions
#' @param ... character column names
repartition <- function(df, numPartitions, ...) {
  # Create output name
  alias <- paste(deparse(substitute(df)), stri_rand_strings(1, 10), sep="_")

  # Convert to Spark DataFrame
  sdf <- df %>% spark_dataframe

  # Convert names to Columns
  exprs <- lapply(
    list(...),
    function(x) invoke(sdf, "apply", x)
  )

  sdf %>% 
    invoke("repartition", as.integer(numPartitions), exprs) %>%
    # Use "registerTempTable" with Spark 1.x
    invoke("createOrReplaceTempView", alias)

  tbl(sc, alias)
}

Example usage:

df <- copy_to(sc, iris)

repartition(df, 3, "Species") %>% optimizedPlan

## <jobj[182]>
##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
##   RepartitionByExpression [Species#775], 3
## +- InMemoryRelation [Sepal_Length#771, Sepal_Width#772, Petal_Length#773, Petal_Width#774, Species#775], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
##    :  +- *Scan csv [Sepal_Length#771,Sepal_Width#772,Petal_Length#773,Petal_Width#774,Species#775] Format: CSV, InputPaths: file:/tmp/Rtmpp150bt/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

repartition(df, 7) %>% optimizedPlan
## <jobj[69]>
##   class org.apache.spark.sql.catalyst.plans.logical.RepartitionByExpression
##   RepartitionByExpression 7
## +- InMemoryRelation [Sepal_Length#19, Sepal_Width#20, Petal_Length#21, Petal_Width#22, Species#23], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `iris`
##    :  +- *Scan csv [Sepal_Length#19,Sepal_Width#20,Petal_Length#21,Petal_Width#22,Species#23] Format: CSV, InputPaths: file:/tmp/RtmpSw6aPg/spark_serialize_9fd0bf1861994b0d294634211269ec9e591b014b83a5683f179dd18e7e70..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Sepal_Length:double,Sepal_Width:double,Petal_Length:double,Petal_Width:double,Species:string>

Function optimizedPlan as defined in Sparklyr: how to center a Spark table based on column?

zero323
  • 322,348
  • 103
  • 959
  • 935