0

I need to read a 10GB fixed width file to a dataframe. How can I do it using Spark in R?

Suppose my text data is the following:

text <- c("0001BRAjonh   ",
"0002USAmarina ",
"0003GBPcharles")

I want the 4 first characters to be associated to the column "ID" of a data frame; from character 5-7 would be associated to a column "Country"; and from character 8-14 to be associated to a column "Name"

I would use function read.fwf if the dataset was small, but that is not the case.

I can read the file as a text file using sparklyr::spark_read_text function. But I don't know how to attribute the values of the file to a data frame properly.

Charles Santana
  • 130
  • 1
  • 1
  • 12
  • I built a class to do this for me in Scala based off of substring and selectExpr. To start I had all my schemas in text files set under an external Hive Table with five columns: table name, column number, column name, column start, column end. Each column was transformed into a respective array, having a while building each columns parsing statement. Even though it will be in Scala want me to draft up a proxy Spark answer for it within the answer section? – afeldman Mar 25 '19 at 21:55
  • Definitely yes, @afeldman. It will certainly help me to figure out how to do it in R. Thanks for this. – Charles Santana Mar 26 '19 at 14:58

2 Answers2

0

I think this is a great question. I have similar files I need to read so this was a useful exercise for me.

Here is some code that does what you ask. First I save out the example text you gave to a file so that I can read it in via sparklyr::spark_read_text like you suggest.

Then I use dplyr::mutate and substr (which will be evaluated by SparkR::substr in this case) to split the data into the columns you requested.

# because we'll want to use pipes later
library(magrittr)

text <- c("0001BRAjonh   ",
          "0002USAmarina ",
          "0003GBPcharles")

# let's save out this text to a file for the purposes of this example
save_path <- normalizePath("~/testing_fwf.dat")

readr::write_lines(text, file = save_path)

# use spark_read_text to load it into spark
test_fwf <- sparklyr::spark_read_text(
  sc,
  name = "test_fwf",
  path = paste0("file://",save_path)
)

# just checking the column names
# it creates a single column called "line"
colnames(test_fwf)
# [1] "line"

# split that column into the ones you requested
split_fwf <- test_fwf %>%
  dplyr::mutate(
    ID = substr(line, 1, 4), # Column, start, stop
    Country = substr(line, 5, 7),
    Name = substr(line, 8, 14)
  ) %>%
  dplyr::select(-line) # we can get rid of the original unsplit column now

# for ease of printing the results, let's collect this
split_fwf_collected <- split_fwf %>%
  dplyr::collect()

# take a look at what we've created
split_fwf_collected

# # A tibble: 3 x 3
#   ID    Country Name     
#   <chr> <chr>   <chr>    
# 1 0001  BRA     "jonh   "
# 2 0002  USA     "marina "
# 3 0003  GBP     "charles"

-1

EDIT: Forgot to say substring starts at 1 and array starts at 0, because reasons.

Going through and adding the code I talked about in the column above.

The process is dynamic and is based off a Hive table called Input_Table. The table has 5 columns: Table_Name, Column_Name, Column_Ordinal_Position, Column_Start, and Column_Length. It is external so any user can change, drop, and remove any file into the folder location. I quickly built this from scratch to not take actual code, does everything make sense?

#Call Input DataFrame and the Hive Table. For hive table we make sure to only take correct column as well as the columns in correct order.
val inputDF       = spark.read.format(recordFormat).option("header","false").load(folderLocation + "/" + tableName + "." + tableFormat).rdd.toDF("Odd_Long_Name")
val inputSchemaDF = spark.sql("select * from Input_Table where Table_Name = '" + tableName + "'").sort($"Column_Ordinal_Position")

#Build all the arrays from the columns, rdd to map to collect changes a dataframe col to a array of strings. In this format I can iterator through the column.
val columnNameArray    = inputSchemaDF.selectExpr("Column_Name").rdd.map(x=>x.mkString).collect
val columnStartArray   = inputSchemaDF.selectExpr("Column_Start_Position").rdd.map(x=>x.mkString).collect
val columnLengthArray  = inputSchemaDF.selectExpr("Column_Length").rdd.map(x=>x.mkString).collect

#Make the iteraros as well as other variables that are meant to be overwritten
var columnAllocationIterator = 1
var localCommand             = ""
var commandArray             = Array("") 

#Loop as there are as many columns in input table
while (columnAllocationIterator <= columnNameArray.length) {
  #overwrite the string command with the new command, thought odd long name was too accurate to not place into the code
  localCommand = "substring(Odd_Long_Name, " + columnStartArray(columnAllocationIterator-1) + ", " + columnLengthArray(columnAllocationIterator-1) + ") as " + columnNameArray(columnAllocationIterator-1) 

  #If the code is running the first time it overwrites the command array, else it just appends
  if (columnAllocationIterator==1) {
    commandArray = Array(localCommand)
  } else {
    commandArray = commandArray ++ Array(localCommand)
  }

  #I really like iterating my iterators like this
  columnAllocationIterator = columnAllocationIterator + 1
}

#Run all elements of the string array indepently against the table
val finalDF = inputDF.selectExpr(commandArray:_*)
afeldman
  • 492
  • 2
  • 10