1

I was wondering if sparkR makes it easier to merge large data sets as opposed to "regular R"? I have 12 csv files that are approximately 500,000 rows by 40 columns. These files are monthly data for the year 2014. I want to make one file for the year 2014. The files all have the same column labels and I want to merge by the first column (year). However, some files have more rows than others.

When I ran the following code:

setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014")

file_list <- list.files()

for (file in file_list){

# if the merged dataset doesn't exist, create it
if (!exists("dataset")){
dataset <- read.table(file, header=TRUE, sep="\t")
}

# if the merged dataset does exist, append to it
if (exists("dataset")){
temp_dataset <-read.table(file, header=TRUE, sep="\t")
dataset<-rbind(dataset, temp_dataset)
rm(temp_dataset)
}

}

R crashed.

When I ran this code:

library(SparkR)
library(magrittr)
# setwd("C:\\Users\\Anonymous\\Desktop\\Data 2014\\Jan2014.csv")
sc <- sparkR.init(master = "local")
sqlContext <- sparkRSQL.init(sc)

Jan2014_file_path <- file.path( 'Jan2014.csv')

system.time(
housing_a_df <- read.df(sqlContext, 
                      "C:\\Users\\Anonymous\\Desktop\\Data       2014\\Jan2014.csv", 
                      header='true',  
                      inferSchema='false')
)

I got the following errors:

   Error in invokeJava(isStatic = TRUE, className, methodName, ...) : 
   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0        in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost):

So what would be an easy way to merge these files in sparkR?

user21478
  • 155
  • 2
  • 3
  • 10
  • Have you read [this](http://stackoverflow.com/questions/23169645/r-3-0-3-rbind-multiple-csv-files) answer? In the first section, are all the files in `file_list ` csv files? – R. Schifini Jan 12 '16 at 04:22
  • You say you want to "merge by the first column", but in your example code you concatenate the rows from different files. The answers below (at the time of this writing) are about merging=joining, not concatenating. – kasterma Jan 12 '16 at 05:37
  • Does any below answers, answer your question? If yes, kindly accept the answer. This might help other developers – sag Feb 04 '16 at 08:37

2 Answers2

0

You should read the csv file in this format: Ref: https://gist.github.com/shivaram/d0cd4aa5c4381edd6f85

# Launch SparkR using 
# ./bin/sparkR --packages com.databricks:spark-csv_2.10:1.0.3

# The SparkSQL context should already be created for you as sqlContext
sqlContext
# Java ref type org.apache.spark.sql.SQLContext id 1

# Load the local CSV file using `read.df`. Note that we use the CSV reader Spark package here.
Jan2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data 2014/Jan2014.csv", "com.databricks.spark.csv", header="true")

Feb2014 <- read.df(sqlContext, "C:/Users/Anonymous/Desktop/Data  2014/Feb2014.csv", "com.databricks.spark.csv", header="true")

#For merging / joining by year

#join
   jan_feb_2014 <- join(Jan2014 , Feb2014 , joinExpr = Jan2014$year == Feb2014$year1, joinType = "left_outer")
# I used "left_outer", so i want all columns of Jan2014 and matching of columns Feb2014, based upon your requirement change the join type. 
#rename the Feb2014 column name year to year1, as it gets duplicated while joining. Then you can remove the column "jan_feb_2014$year1" after joining by the code, "jan_feb_2014$year1 <- NULL"

This how you can join one by one file.

Arun Gunalan
  • 814
  • 7
  • 26
  • Does join add columns to a dataframe from other daraframe? As he wants to merge the two csv files I think join may not suit him – sag Jan 12 '16 at 05:19
  • He wanted to merge by first column 'year', so i used the join. Might be he wants all the months to be in columns.@SamuelAlexander – Arun Gunalan Jan 12 '16 at 05:58
0

Once read the files as dataframes, you can use unionAll from SparkR to merge the dataframes into a single dataframe. Then you can write it into a single csv file.

Sample code

    df1 <- read.df(sqlContext, "/home/user/tmp/test1.csv", source = "com.databricks.spark.csv")
    df2 <- read.df(sqlContext, "/home/user/tmp/test2.csv", source = "com.databricks.spark.csv")
    mergedDF <- unionAll(df1, df2)
    write.df(mergedDF, "merged.csv", "com.databricks.spark.csv", "overwrite")

I've tested and used it, but not against the data of your size. But I hope this will help you

sag
  • 5,333
  • 8
  • 54
  • 91