0

I have a question similar to the below link in stackoverflow

R+Hadoop: How to read CSV file from HDFS and execute mapreduce?

I am tring to read a file from location "/somnath/logreg_data/ds1.10.csv" in HDFS, reduce its number of columns from 10 to 5 and then write to another location "/somnath/logreg_data/reduced/ds1.10.reduced.csv" in HDFS using the below transfer.csvfile.hdfs.to.hdfs.reduced function.

transfer.csvfile.hdfs.to.hdfs.reduced("hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv", "hdfs://10.5.5.82:8020/somnath/logreg_data/reduced/ds1.10.reduced.csv", 5)

The function definition is

transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        #r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = from.dfs(hdfsFilePath),
                                          input.format = "native",
                                          map = function(.,M) {
                                                label <- M[,dim(M)[2]]
                                                print(label)
                                                reduced.predictors <- M[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))}
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        #to.dfs(reduced.values)
                }

But I am receiving an error

Error in file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open the connection
Calls: transfer.csvfile.hdfs.to.hdfs.reduced ... make.keyval.reader -> do.call -> <Anonymous> -> file
In addition: Warning message:
In file(fname, paste(if (is.read) "r" else "w", if (format$mode ==  :
  cannot open file 'hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv': No such file or directory
Execution halted

OR

When I am trying to load a file from hdfs using the below commands, I am getting the below error:

> x <- hdfs.file(path="hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",mode="r")
Error in hdfs.file(path = "hdfs://10.5.5.82:8020/somnath/logreg_data/ds1.10.csv",  :
  attempt to apply non-function

Any help will be highly appreciated

Thanks

Community
  • 1
  • 1
somnathchakrabarti
  • 3,026
  • 10
  • 69
  • 92

1 Answers1

1

Basically found a solution to the problem that I stated above.

r.file <- hdfs.file(hdfsFilePath,"r")
from.dfs(
    mapreduce(
         input = as.matrix(hdfs.read.text.file(r.file)),
         input.format = "csv",
         map = ...
))

Below is the entire modified function:

transfer.csvfile.hdfs.to.hdfs.reduced =
                function(hdfsFilePath, hdfsWritePath, reducedCols=1) {
                        hdfs.init()
                        #local.df = data.frame()
                        #hdfs.get(hdfsFilePath, local.df)
                        #to.dfs(local.df)
                        r.file <- hdfs.file(hdfsFilePath,"r")
                        transfer.reduced.map =
                                        function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                                                function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M))
                                        }
                        reduced.values =
                             values(
                                     from.dfs(
                                        mapreduce(
                                          input = as.matrix(hdfs.read.text.file(r.file)),
                                          input.format = "csv",
                                          map = function(.,M) {
                                                numRows <- length(M)
                                                M.vec.elems <-unlist(lapply(M,
                                                       function(x) strsplit(x, ",")))
                                                M.matrix <- matrix(M.vec.elems, nrow=numRows, byrow=TRUE)
                                                label <- M.matrix[,dim(M.matrix)[2]]
                                                reduced.predictors <- M.matrix[,1:reducedCols]
                                                reduced.M <- cbind(reduced.predictors, label)
                                                keyval(
                                                     1,
                                                     as.numeric(reduced.M)) }
                        )))
                        write.table(reduced.values, file="/root/somnath/reduced.values.csv")
                        w.file <- hdfs.file(hdfsWritePath,"w")
                        hdfs.write(reduced.values,w.file)
                        hdfs.close(r.file)
                        hdfs.close(w.file)
                        #to.dfs(reduced.values)
                }

Hope this helps and don't forget to give points if you find it useful. Thanks ahead

somnathchakrabarti
  • 3,026
  • 10
  • 69
  • 92