1

I am doing a project that involves using HDFS for storage and Apache Spark for computation. I have a directory in HDFS which have several text files in it at same depth.I want to process all these files using Spark and store back their corresponding results back to HDFS with 1 output file for each input file.

For example - Suppose I have a directory with 1000 text files in it at same depth. I am reading all these files using wildcards

sc.wholeTextFiles(hdfs://localhost:9000/home/akshat/files/*.txt)

Then I process them using Spark and get a corresponding RDD and save that by using

result.saveAsTextFile("hdfs://localhost:9000/home/akshat/final")

But it gives me the result of all the input files in one single file and I want to get each file, process them individually and store the output of each of them individually.

What should be my next approach to achieve this ?
Thanks in advance!

Akshat Kumar
  • 69
  • 3
  • 13
  • I have written the code for processing each txt file individually but when I tried to read all these files using wildcards , spark takes them as one single file and processes it as one big txt file. I want my processing to be done for each input txt files and give me the corresponding result in output files for each one of them. – Akshat Kumar Jun 04 '15 at 10:12

1 Answers1

2

You can do this by using wholeTextFiles() , Note: the below approach process files one by one.

val data = sc.wholeTextFiles("hdfs://master:port/vijay/mywordcount/")

val files = data.map { case (filename, content) => filename}


def doSomething(file: String) = { 

 println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs  comes here

}

files.collect.foreach( filename => {
    doSomething(filename)

}) 

where:

  • hdfs://master:port/vijay/mywordcount/ --- your hdfs dir
  • data - org.apache.spark.rdd.RDD[(String, String)]
  • files - org.apache.spark.rdd.RDD[String]- filenames
  • doSomething(filename) - your logic

Update: multiple output files

/* SimpleApp.scala */
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

/* hadoop */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat

/* java */
import java.io.Serializable;

import org.apache.log4j.Logger
import org.apache.log4j.Level

/* Custom TextOutput Format */
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  override def generateActualKey(key: Any, value: Any): Any =
    NullWritable.get()

  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String =
    return key.asInstanceOf[String] +"-"+ name;   // for output hdfs://Ouptut_dir/inputFilename-part-****
  //return key.asInstanceOf[String] +"/"+ name;   // for output hdfs://Ouptut_dir/inputFilename/part-**** [inputFilename - as directory of its partFiles ]
}

/* Spark Context */
object Spark {
  val sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local[*]"))
}

/* WordCount Processing */

object Process extends Serializable{
  def apply(filename: String): org.apache.spark.rdd.RDD[(String, String)]= {
    println("i am called.....")
    val simple_path = filename.split('/').last;
    val lines = Spark.sc.textFile(filename);
    val counts     = lines.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _); //(word,count)
    val fname_word_counts = counts.map( x => (simple_path,x._1+"\t"+ x._2));   // (filename,word\tcount)
    fname_word_counts
  }
}

object SimpleApp  {

        def main(args: Array[String]) {

            //Logger.getLogger("org").setLevel(Level.OFF)
            //Logger.getLogger("akka").setLevel(Level.OFF)

            // input ans output paths
            val INPUT_PATH = "hdfs://master:8020/vijay/mywordcount/"
            val OUTPUT_PATH = "hdfs://master:8020/vijay/mywordcount/output/"

            // context
            val context = Spark.sc
            val data = context.wholeTextFiles(INPUT_PATH)

            // final output RDD
            var output : org.apache.spark.rdd.RDD[(String, String)] = context.emptyRDD

            // files to process
            val files = data.map { case (filename, content) => filename}


            // Apply wordcount Processing on each File received in wholeTextFiles.
            files.collect.foreach( filename => {
                            output = output.union(Process(filename));
            })

           //output.saveAsTextFile(OUTPUT_PATH);   // this will save output as (filename,word\tcount)
           output.saveAsHadoopFile(OUTPUT_PATH, classOf[String], classOf[String],classOf[RDDMultipleTextOutputFormat])  // custom output Format.

           //close context
           context.stop();

         }
}

environment:

  • Scala compiler version 2.10.2
  • spark-1.2.0-bin-hadoop2.3
  • Hadoop 2.3.0-cdh5.0.3

sample output:

[ramisetty@node-1 stack]$ hadoop fs -ls /vijay/mywordcount/output
Found 5 items
-rw-r--r--   3 ramisetty supergroup          0 2015-06-09 03:49 /vijay/mywordcount/output/_SUCCESS
-rw-r--r--   3 ramisetty supergroup         40 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00000
-rw-r--r--   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file1.txt-part-00001
-rw-r--r--   3 ramisetty supergroup         44 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00002
-rw-r--r--   3 ramisetty supergroup          8 2015-06-09 03:49 /vijay/mywordcount/output/file2.txt-part-00003
vijay kumar
  • 2,049
  • 1
  • 15
  • 18
  • Thanks! Worked like a charm ! – Akshat Kumar Jun 04 '15 at 19:18
  • One thing I am stuck at by using your method is that how should I set the output directory of my final RDD because when I give the path of the final RDD output directory using saveAsTextFile , then it processes all the files one by one perfectly but when it writes the output of all files, it writes the output of first file and after that gives error - Output directory already exists – Akshat Kumar Jun 05 '15 at 06:38
  • Please guide me to solve the above problem @ramisetty – Akshat Kumar Jun 05 '15 at 08:13
  • hope this helps - https://groups.google.com/forum/#!topic/spark-users/AjDTPYFLLmQ – vijay kumar Jun 05 '15 at 09:06
  • you have to write hdfs fileSytem code to create a file and save data in scala as per above link :( which is tidious task. else case load each output file to different directory as staging dirs and later move all of them to one dir – vijay kumar Jun 05 '15 at 11:07
  • Please suggest an alternative solution for the above problem @maasg – Akshat Kumar Jun 05 '15 at 11:34
  • Create CustomOutput Format , make use of generateFileNameForKeyValue - refer this link. and use rdd.SaveAsHadoopFile([],[],[]) instead of rdd.saveAsTextFile() http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job – vijay kumar Jun 08 '15 at 11:44
  • Can you suggest me anything if I had the option of saving the output of all the input files in one single file like all the input files are processed one by one and all their outputs are stored in one single file. I can even manage with that kind of result.@ramisetty – Akshat Kumar Jun 08 '15 at 19:30
  • updated the answer (multiple output files)which solves your problem.modify wordcount processing to your processing logic. tested and working.. hope this helps :) – vijay kumar Jun 09 '15 at 04:35
  • Thanks @ramisetty. It worked. I wanted to ask on more thing , in the previous code before the updation,can you tell me what changes should I need to do so that I can map my input files RDD according to doSomething and output of all the files stored in a single file using saveAsTextFile. – Akshat Kumar Jun 09 '15 at 07:09
  • use my update code with output.saveAsTextFile(OUTPUT_PATH); this will work – vijay kumar Jun 09 '15 at 08:46