0

I am trying to read a fixed-width file with sparklyr.

When using spark_read_text() the file is read as a single column file. Is there a way to split it up in columns?

Thank you

  • [Relevant - Maybe duplicate](https://stackoverflow.com/questions/41810015/sparklyr-separate-one-spark-dataframe-column-into-two-columns) – Sotos Sep 27 '18 at 09:02
  • `substring(str:Column,pos:Int,len:Int)` - https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@substring(str:org.apache.spark.sql.Column,pos:Int,len:Int):org.apache.spark.sql.Column (where positions are 0 based). – zero323 Sep 28 '18 at 10:39

1 Answers1

0

If you only have a few columns and you can specify them all by name up front, then I gave a simpler answer to a more constrained problem (https://stackoverflow.com/a/76864852/4630129 ).

But you didn't specify that you knew all that up front... so providing a more general solution.

I often have situations where I'm given a table of metadata with the variable positions, names, type, and number format, and I need to dynamically parse them out from a column of data in the spark table. That's a lot harder than the constrained case. I've tried it a few ways, and using spark SQL to parse the columns after you've uploaded it via sparklyr::spark_read_text() seems to be the best way to go. This is especially true for large number of columns. With a large number of columns, the approach I linked to above will break.

Here is an example with a large number of columns (108), and metadata about the columns with the width and type (and a further scale parameter for numbers in decimal format).

library(magrittr)
library(sparklyr)
library(dplyr)

# for some cases reading in as DECIMAL with the width and scale isn't important
#   so I've set it up with a parameter here
# if you set this to FALSE it will just read them in as DOUBLE
strict_decimal_types <- TRUE

## creating a temp table name where we will read in the data

tempname <- sparklyr::spark_table_name("")

# I get data that is all one big string each line that I will need to parse
# creating some placeholder text

line_text <- "1a2b3c4d5e6f7g8h9i10jj11kk12ll13mm14nn15oo16pp17qq18rr19ss20tt21uu22vv23ww24xx25yy26zz27AA28BB29CC30DD31EE32FF33GG34HH35II36JJ37KK38LL39MM40NN41OO42PP43QQ44RR45SS46TT47UU48VV49WW50XX51YY52ZZ5000.01aaaaaaa50.0001bbbbbbb"

# repeating it a couple of times with a little variation
lines_text <- c(
  line_text,
  toupper(line_text),
  tolower(line_text)
)            

# changing to a table             
my_table <- tibble::tibble(
  line = lines_text
)

# uploading table to spark
initial_fwf <- dplyr::copy_to(
  sc,
  my_table,
  name = tempname,
  overwrite = TRUE)

# I get metadata with column name, width, scale (for numeric variables), and type
# creating some placeholder metadata here

this_metadata_tbl <- tibble::tibble(
  id = 1:108,
  name = glue::glue("var_{1:108}"),
  width = c(rep(1,18),rep(2,86),rep(7,4)),
  scale = c(rep(c(0,NA),52),2,NA,4,NA)
) %>%
  dplyr::mutate(type = dplyr::case_when(
    id %% 2 == 0 ~ "string",
    TRUE ~ "decimal"
  ))

print(this_metadata_tbl)
# # A tibble: 108 x 5
# id name   width scale type   
#    <int> <glue> <dbl> <dbl> <chr>  
# 1     1 var_1      1     0 decimal
# 2     2 var_2      1    NA string 
# 3     3 var_3      1     0 decimal
# 4     4 var_4      1    NA string 
# 5     5 var_5      1     0 decimal
# 6     6 var_6      1    NA string 
# 7     7 var_7      1     0 decimal
# 8     8 var_8      1    NA string 
# 9     9 var_9      1     0 decimal
# 10    10 var_10     1    NA string 
# # ... with 98 more rows


# calculating the start and stop characters for each column
this_split_tbl <- this_metadata_tbl %>%
  dplyr::mutate(lag_width = dplyr::lag(width)) %>%
  dplyr::mutate(lag_width = dplyr::case_when(
    is.na(lag_width) ~ 0,
    TRUE ~ lag_width
  )) %>%
  dplyr::mutate(cume_lag_width = cumsum(lag_width)) %>%
  dplyr::mutate(start = 1 + cume_lag_width) %>%
  dplyr::mutate(stop = start + width - 1) %>%
  dplyr::select(
    name, start, width, type, scale
  )

print(this_split_tbl)
# # A tibble: 108 x 5
# name   start width type    scale
#   <glue> <dbl> <dbl> <chr>   <dbl>
# 1 var_1      1     1 decimal     0
# 2 var_2      2     1 string     NA
# 3 var_3      3     1 decimal     0
# 4 var_4      4     1 string     NA
# 5 var_5      5     1 decimal     0
# 6 var_6      6     1 string     NA
# 7 var_7      7     1 decimal     0
# 8 var_8      8     1 string     NA
# 9 var_9      9     1 decimal     0
# 10 var_10    10     1 string     NA
# # ... with 98 more rows

working_fwf <- initial_fwf %>%
  dplyr::mutate(id = dplyr::row_number())

code_split_fwf <- this_split_tbl %>%
  purrr::pmap(
    ~ if ("decimal" == ..4) {
      if (strict_decimal_types == TRUE){
        glue::glue('CAST(SUBSTR(line FROM {as.numeric(..2)} FOR {as.numeric(..3)}) AS DECIMAL({as.numeric(..3)},{as.numeric(..5)})) AS `{..1}`')
      } else {
        glue::glue('CAST(SUBSTR(line FROM {as.numeric(..2)} FOR {as.numeric(..3)}) AS DOUBLE) AS `{..1}`')
      }
      
    } else {
      glue::glue('SUBSTR(line FROM {as.numeric(..2)} FOR {as.numeric(..3)}) AS `{..1}`')
    }
  )

print(head(code_split_fwf))
# [[1]]
# CAST(SUBSTR(line FROM 1 FOR 1) AS DECIMAL(1,0)) AS `var_1`
# 
# [[2]]
# SUBSTR(line FROM 2 FOR 1) AS `var_2`
# 
# [[3]]
# CAST(SUBSTR(line FROM 3 FOR 1) AS DECIMAL(1,0)) AS `var_3`
# 
# [[4]]
# SUBSTR(line FROM 4 FOR 1) AS `var_4`
# 
# [[5]]
# CAST(SUBSTR(line FROM 5 FOR 1) AS DECIMAL(1,0)) AS `var_5`
# 
# [[6]]
# SUBSTR(line FROM 6 FOR 1) AS `var_6`

print(tail(code_split_fwf))

# [[1]]
# CAST(SUBSTR(line FROM 187 FOR 2) AS DECIMAL(2,0)) AS `var_103`
# 
# [[2]]
# SUBSTR(line FROM 189 FOR 2) AS `var_104`
# 
# [[3]]
# CAST(SUBSTR(line FROM 191 FOR 7) AS DECIMAL(7,2)) AS `var_105`
# 
# [[4]]
# SUBSTR(line FROM 198 FOR 7) AS `var_106`
# 
# [[5]]
# CAST(SUBSTR(line FROM 205 FOR 7) AS DECIMAL(7,4)) AS `var_107`
# 
# [[6]]
# SUBSTR(line FROM 212 FOR 7) AS `var_108`

code_expression <- paste0(
  "SELECT ",
  paste(code_split_fwf, collapse =",\n "),
  " FROM ",
  tempname
)

extended_table <- dplyr::tbl(sc, dplyr::sql(code_expression)) %>% dplyr::compute()

dplyr::glimpse(extended_table)

# Rows: ??
#   Columns: 108
# Database: spark_connection
# $ var_1   <dbl> 1, 1, 1
# $ var_2   <chr> "a", "A", "a"
# $ var_3   <dbl> 2, 2, 2
# $ var_4   <chr> "b", "B", "b"
# $ var_5   <dbl> 3, 3, 3
# $ var_6   <chr> "c", "C", "c"
# $ var_7   <dbl> 4, 4, 4
# $ var_8   <chr> "d", "D", "d"
# $ var_9   <dbl> 5, 5, 5
# $ var_10  <chr> "e", "E", "e"
# $ var_11  <dbl> 6, 6, 6
# $ var_12  <chr> "f", "F", "f"
# $ var_13  <dbl> 7, 7, 7
# $ var_14  <chr> "g", "G", "g"
# $ var_15  <dbl> 8, 8, 8
# $ var_16  <chr> "h", "H", "h"
# $ var_17  <dbl> 9, 9, 9
# $ var_18  <chr> "i", "I", "i"
# $ var_19  <dbl> 10, 10, 10
# $ var_20  <chr> "jj", "JJ", "jj"
# $ var_21  <dbl> 11, 11, 11
# $ var_22  <chr> "kk", "KK", "kk"
# $ var_23  <dbl> 12, 12, 12
# $ var_24  <chr> "ll", "LL", "ll"
# $ var_25  <dbl> 13, 13, 13
# $ var_26  <chr> "mm", "MM", "mm"
# $ var_27  <dbl> 14, 14, 14
# $ var_28  <chr> "nn", "NN", "nn"
# $ var_29  <dbl> 15, 15, 15
# $ var_30  <chr> "oo", "OO", "oo"
# $ var_31  <dbl> 16, 16, 16
# $ var_32  <chr> "pp", "PP", "pp"
# $ var_33  <dbl> 17, 17, 17
# $ var_34  <chr> "qq", "QQ", "qq"
# $ var_35  <dbl> 18, 18, 18
# $ var_36  <chr> "rr", "RR", "rr"
# $ var_37  <dbl> 19, 19, 19
# $ var_38  <chr> "ss", "SS", "ss"
# $ var_39  <dbl> 20, 20, 20
# $ var_40  <chr> "tt", "TT", "tt"
# $ var_41  <dbl> 21, 21, 21
# $ var_42  <chr> "uu", "UU", "uu"
# $ var_43  <dbl> 22, 22, 22
# $ var_44  <chr> "vv", "VV", "vv"
# $ var_45  <dbl> 23, 23, 23
# $ var_46  <chr> "ww", "WW", "ww"
# $ var_47  <dbl> 24, 24, 24
# $ var_48  <chr> "xx", "XX", "xx"
# $ var_49  <dbl> 25, 25, 25
# $ var_50  <chr> "yy", "YY", "yy"
# $ var_51  <dbl> 26, 26, 26
# $ var_52  <chr> "zz", "ZZ", "zz"
# $ var_53  <dbl> 27, 27, 27
# $ var_54  <chr> "AA", "AA", "aa"
# $ var_55  <dbl> 28, 28, 28
# $ var_56  <chr> "BB", "BB", "bb"
# $ var_57  <dbl> 29, 29, 29
# $ var_58  <chr> "CC", "CC", "cc"
# $ var_59  <dbl> 30, 30, 30
# $ var_60  <chr> "DD", "DD", "dd"
# $ var_61  <dbl> 31, 31, 31
# $ var_62  <chr> "EE", "EE", "ee"
# $ var_63  <dbl> 32, 32, 32
# $ var_64  <chr> "FF", "FF", "ff"
# $ var_65  <dbl> 33, 33, 33
# $ var_66  <chr> "GG", "GG", "gg"
# $ var_67  <dbl> 34, 34, 34
# $ var_68  <chr> "HH", "HH", "hh"
# $ var_69  <dbl> 35, 35, 35
# $ var_70  <chr> "II", "II", "ii"
# $ var_71  <dbl> 36, 36, 36
# $ var_72  <chr> "JJ", "JJ", "jj"
# $ var_73  <dbl> 37, 37, 37
# $ var_74  <chr> "KK", "KK", "kk"
# $ var_75  <dbl> 38, 38, 38
# $ var_76  <chr> "LL", "LL", "ll"
# $ var_77  <dbl> 39, 39, 39
# $ var_78  <chr> "MM", "MM", "mm"
# $ var_79  <dbl> 40, 40, 40
# $ var_80  <chr> "NN", "NN", "nn"
# $ var_81  <dbl> 41, 41, 41
# $ var_82  <chr> "OO", "OO", "oo"
# $ var_83  <dbl> 42, 42, 42
# $ var_84  <chr> "PP", "PP", "pp"
# $ var_85  <dbl> 43, 43, 43
# $ var_86  <chr> "QQ", "QQ", "qq"
# $ var_87  <dbl> 44, 44, 44
# $ var_88  <chr> "RR", "RR", "rr"
# $ var_89  <dbl> 45, 45, 45
# $ var_90  <chr> "SS", "SS", "ss"
# $ var_91  <dbl> 46, 46, 46
# $ var_92  <chr> "TT", "TT", "tt"
# $ var_93  <dbl> 47, 47, 47
# $ var_94  <chr> "UU", "UU", "uu"
# $ var_95  <dbl> 48, 48, 48
# $ var_96  <chr> "VV", "VV", "vv"
# $ var_97  <dbl> 49, 49, 49
# $ var_98  <chr> "WW", "WW", "ww"
# $ var_99  <dbl> 50, 50, 50
# $ var_100 <chr> "XX", "XX", "xx"
# $ var_101 <dbl> 51, 51, 51
# $ var_102 <chr> "YY", "YY", "yy"
# $ var_103 <dbl> 52, 52, 52
# $ var_104 <chr> "ZZ", "ZZ", "zz"
# $ var_105 <dbl> 5000.01, 5000.01, 5000.01
# $ var_106 <chr> "aaaaaaa", "AAAAAAA", "aaaaaaa"
# $ var_107 <dbl> 50.0001, 50.0001, 50.0001
# $ var_108 <chr> "bbbbbbb", "BBBBBBB", "bbbbbbb"