2

My question is similar with the one in here, but I'm having problems implementing the answer, and I cannot comment in that thread.

So, I have a big CSV file that contains a nested data, which contains 2 columns separated by whitespace (say first column is Y, second column is X). Column X itself is also a comma-separated value.

21.66 2.643227,1.2698358,2.6338573,1.8812188,3.8708665,...
35.15 3.422151,-0.59515584,2.4994135,-0.19701914,4.0771823,...
15.22 2.8302398,1.9080592,-0.68780196,3.1878228,4.6600842,...
...

I want to read this CSV into 2 different Spark tables using sparklyr.

So far this is what I've been doing:

  1. Use spark_read_csv to import all CSV contents into Spark data table

    df = spark_read_csv(sc, path = "path", name = "simData", delimiter = " ", header = "false", infer_schema = "false")

    The result is a Spark table named simData with 2 columns: C0 and C1

  2. Use dplyr to select first & second column, and then register them as new tables named Y and X respectively

    simY <- df %>% select(C0) %>% sdf_register("simY")

    simX <- df %>% select(C1) %>% sdf_register("simX")

  3. Split the value in simX using ft_regex_tokenizer function, with regards the answer written in here.

    ft_regex_tokenizer(input_DF, input.col = "COL", output.col = "ResultCols", pattern = '\\###')

But when I try to head it using dplyr:

Source:   query [6 x 1]
Database: spark connection master=yarn-client app=sparklyr local=FALSE

        Result
        <list>
1 <list [789]>
2 <list [789]>
3 <list [789]>
4 <list [789]>
5 <list [789]>
6 <list [789]>

I want to turn this into a new Spark table and convert the type to double. Is there any way to do this? I've considered to collect the data into R (using dplyr), convert to matrix, and then do strsplit for each row, but I think this is not a solution because the CSV size can go up to 40GB.

EDIT: Spark version is 1.6.0

Community
  • 1
  • 1
Benny Suryajaya
  • 63
  • 1
  • 12

1 Answers1

4

Let's say your data look like this

library(dplyr)
library(sparklyr)

df <- data.frame(text = c("1.0,2.0,3.0", "4.0,5.0,6.0"))
sdf <- copy_to(sc, df, "df", overwrite = TRUE)

and you've already created a spark_connection you can do following

n <- 3

# There is no function syntax for array access in Hive
# so we have to build [] expressions
# CAST(... AS double) could be handled in sparklyr / dplyr with as.numeric
exprs <- lapply(
  0:(n - 1), 
  function(i) paste("CAST(bits[", i, "] AS double) AS x", i, sep=""))

sdf %>%
  # Convert to Spark DataFrame
  spark_dataframe() %>% 
  # Use expression with split and explode
  invoke("selectExpr", list("split(text, ',') AS  bits")) %>%
  # Select individual columns
  invoke("selectExpr", exprs) %>%
  # Register table in the metastore ("registerTempTable" in Spark 1.x)
  invoke("createOrReplaceTempView", "exploded_df")

And use dplyr::tbl to get back sparklyr object:

tbl(sc, "exploded_df")
Source:   query [2 x 3]
Database: spark connection master=local[8] app=sparklyr local=TRUE

     x0    x1    x2
  <dbl> <dbl> <dbl>
1     1     2     3
2     4     5     6

In the latest versions you can also use sdf_separate_column:

sdf %>% 
  mutate(text=split(text, ",")) %>% 
  sdf_separate_column("text", paste0("x", 0:2))
# Source:   table<sparklyr_tmp_87125f13b89> [?? x 4]
# Database: spark_connection
  text       x0    x1    x2   
  <list>     <chr> <chr> <chr>
1 <list [3]> 1.0   2.0   3.0  
2 <list [3]> 4.0   5.0   6.0  
user10938362
  • 3,991
  • 2
  • 12
  • 29
zero323
  • 322,348
  • 103
  • 959
  • 935
  • 1
    Wow this works like magic! Actually I skipped that error part, and go directly to use `sdf_register`, and it's working. After then I checked this question again and just found out the `registerTempTable`. Ah yes, I'm using Spark 1.6.0. I also modified your code a bit and got it working for the Y part. Thanks a lot! – Benny Suryajaya Apr 24 '17 at 15:45