15

A help for the implementation best practice is needed. The operating environment is as follows:

  • Log data file arrives irregularly.
  • The size of a log data file is from 3.9KB to 8.5MB. The average is about 1MB.
  • The number of records of a data file is from 13 lines to 22000 lines. The average is about 2700 lines.
  • Data file must be post-processed before aggregation.
  • Post-processing algorithm can be changed.
  • Post-processed file is managed separately with original data file, since the post-processing algorithm might be changed.
  • Daily aggregation is performed. All post-processed data file must be filtered record-by-record and aggregation(average, max min…) is calculated.
  • Since aggregation is fine-grained, the number of records after the aggregation is not so small. It can be about half of the number of the original records.
  • At a point, the number of the post-processed file can be about 200,000.
  • A data file should be able to be deleted individually.

In a test, I tried to process 160,000 post-processed files by Spark starting with sc.textFile() with glob path, it failed with OutOfMemory exception on the driver process.

What is the best practice to handle this kind of data? Should I use HBase instead of plain files to save post-processed data?

zeodtr
  • 10,645
  • 14
  • 43
  • 60

3 Answers3

9

I wrote own loader. It solved our problem with small files in HDFS. It uses Hadoop CombineFileInputFormat. In our case it reduced the number of mappers from 100000 to approx 3000 and made job significantly faster.

https://github.com/RetailRocket/SparkMultiTool

Example:

import ru.retailrocket.spark.multitool.Loaders 
val sessions = Loaders.combineTextFile(sc, "file:///test/*") 
// or val sessions = Loaders.combineTextFile(sc, conf.weblogs(), size = 256, delim = "\n") 
// where size is split size in Megabytes, delim - line break character 
println(sessions.count())
Roman Zykov
  • 273
  • 3
  • 7
  • Thank you for sharing this. I think the size argument is especially valuable, since it cannot be specified on coalesce(). – zeodtr Oct 02 '14 at 00:04
  • This solution is better than coalesce because it works at map stage, but coalesce after. – Roman Zykov Oct 06 '14 at 08:56
  • 2
    Since now hadoop supports CombineTextInputFormat(at least from 2.2), combining small input files can be done with sc.newAPIHadoopFile(), without implementing a custom class. – zeodtr Mar 30 '15 at 11:33
3

I'm pretty sure the reason your getting OOM is because of handling so many small files. What you want is to combine the input files so you don't get so many partitions. I try to limit my jobs to about 10k partitions.

After textFile, you can use .coalesce(10000, false) ... not 100% sure that will work though because it's been a while since I've done it, please let me know. So try

sc.textFile(path).coalesce(10000, false)
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • It worked! Actually I used coalesce factor 1227, which is the number of partitions when Spark process the big single file that contains the whole records. But the job runs slower(as expected), and still it seems the information of all files is still transferred to the driver process, which can cause OOM when too many files are involved. But 1.68GB for the driver process for 168016 files are not so bad. – zeodtr Jul 08 '14 at 09:41
  • Well, we have a separate simple job specifically for reducing the number of files down as it's such an important thing. Once I had to run it in 5 goes on 5 subsets – samthebest Jul 08 '14 at 09:42
  • Hey could you do the universe a favour please :) Could you put "Combine multiple input files in spark" either into your question - or even in the header ... it will make it much easier for people to Google this problem. – samthebest Jul 08 '14 at 09:52
  • It is slow, I used 8000 on 8TB – Murtaza Kanchwala Jul 10 '15 at 06:49
1

You can use this

First You can get a Buffer/List of S3 Paths / Same for HDFS or Local Path

If you're trying with Amazon S3 then :

import scala.collection.JavaConverters._
import java.util.ArrayList
import com.amazonaws.services.s3.AmazonS3Client
import com.amazonaws.services.s3.model.ObjectListing
import com.amazonaws.services.s3.model.S3ObjectSummary
import com.amazonaws.services.s3.model.ListObjectsRequest

def listFiles(s3_bucket:String, base_prefix : String) = {
    var files = new ArrayList[String]

    //S3 Client and List Object Request
    var s3Client = new AmazonS3Client();
    var objectListing: ObjectListing = null;
    var listObjectsRequest = new ListObjectsRequest();

    //Your S3 Bucket
    listObjectsRequest.setBucketName(s3_bucket)

    //Your Folder path or Prefix
    listObjectsRequest.setPrefix(base_prefix)

    //Adding s3:// to the paths and adding to a list
    do {
      objectListing = s3Client.listObjects(listObjectsRequest);
      for (objectSummary <- objectListing.getObjectSummaries().asScala) {
        files.add("s3://" + s3_bucket + "/" + objectSummary.getKey());
      }
      listObjectsRequest.setMarker(objectListing.getNextMarker());
    } while (objectListing.isTruncated());

    //Removing Base Directory Name
    files.remove(0)

    //Creating a Scala List for same
    files.asScala
  }

Now Pass this List object to the following piece of code, note : sc is an object of SQLContext

var df: DataFrame = null;
  for (file <- files) {
    val fileDf= sc.textFile(file)
    if (df!= null) {
      df= df.unionAll(fileDf)
    } else {
      df= fileDf
    }
  }

Now you got a final Unified RDD i.e. df

Optional, And You can also repartition it in a single BigRDD

val files = sc.textFile(filename, 1).repartition(1)

Repartitioning always works :D

Murtaza Kanchwala
  • 2,425
  • 25
  • 33