1

I need to split an RDD by first letters (A-Z) and write the files into directories respectively. The simple solution is to filter the RDD for each letter, but this requires 26 passes. There is a response to a similar question for writing to text files here, but I cannot figure out how to do this for Avro files.

Has anyone been able to do this?

Community
  • 1
  • 1

2 Answers2

1

You can use multipleoutputformat to do this

It is a two step task :-

  1. First you need the multiple output format for avro. Below is the code for that:

    package avro
    
    import org.apache.hadoop.mapred.lib.MultipleOutputFormat
    import org.apache.hadoop.fs.FileSystem
    import org.apache.hadoop.mapred.JobConf
    import org.apache.hadoop.util.Progressable
    import org.apache.avro.mapred.AvroOutputFormat
    import org.apache.avro.mapred.AvroWrapper
    import org.apache.hadoop.io.NullWritable
    import org.apache.spark.rdd.RDD
    import org.apache.hadoop.mapred.RecordWriter
    
    class MultipleAvroFileOutputFormat[K] extends MultipleOutputFormat[AvroWrapper[K], NullWritable] {
    val outputFormat = new AvroOutputFormat[K]
    
    override def generateFileNameForKeyValue(key: AvroWrapper[K], value: NullWritable, name: String) = {
    val name = key.datum().asInstanceOf[String].substring(0, 1)
    name + "/" + name
    }
    
    override def getBaseRecordWriter(fs: FileSystem,
    job: JobConf,
    name: String,
    arg3: Progressable) = {
    outputFormat.getRecordWriter(fs, job, name, arg3).asInstanceOf[RecordWriter[AvroWrapper[K], NullWritable]]
    }
    
    }
    
  2. In your driver code you have to mention that you want to use the Above given output format. You also need to mention the output schema for avro data. Below is sample driver code which stores a RDD of string in avro format with schema {"type":"string"}

    package avro
    
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.hadoop.io.NullWritable
    import org.apache.spark._
    import org.apache.spark.SparkContext._
    import org.apache.hadoop.mapred.JobConf
    import org.apache.avro.mapred.AvroJob
    import org.apache.avro.mapred.AvroWrapper
    object AvroDemo {
    def main(args: Array[String]): Unit = {
    val conf = new SparkConf
    conf.setAppName(args(0));
    conf.setMaster("local[2]");
    conf.set("spark.serializer",   "org.apache.spark.serializer.KryoSerializer")
    conf.registerKryoClasses(Array(classOf[AvroWrapper[String]]))
    val sc = new SparkContext(conf);    
    val input = sc.parallelize(Seq("one", "two", "three", "four"), 1);
    val pairRDD = input.map(x => (new AvroWrapper(x), null));
    val job = new JobConf(sc.hadoopConfiguration)
    val schema = "{\"type\":\"string\"}"
    job.set(AvroJob.OUTPUT_SCHEMA, schema)  //set schema for avro output
    pairRDD.partitionBy(new HashPartitioner(26)).saveAsHadoopFile(args(1),  classOf[AvroWrapper[String]], classOf[NullWritable], classOf[MultipleAvroFileOutputFormat[String]], job, None);
    sc.stop()
    }  
    }
    
mrnakumar
  • 625
  • 6
  • 13
0

I hope you get a better answer than mine...

I've been in a similar situation myself, except with "ORC" instead of Avro. I basically threw up my hands and ended up calling the ORC file classes directly to write the files myself.

In your case, my approach would entail partitioning the data via "partitionBy" into 26 partitions, one for each first letter A-Z. Then call "mapPartitionsWithIndex", passing a function that outputs the i-th partition to an Avro file at the appropriate path. Finally, to convince Spark to actually do something, have mapPartitionsWithIndex return, say, a List containing the single boolean value "true"; and then call "count" on the RDD returned by mapPartitionsWithIndex to get Spark to start the show.

I found an example of writing an Avro file here: http://www.myhadoopexamples.com/2015/06/19/merging-small-files-into-avro-file-2/

E.F.Walker
  • 234
  • 1
  • 5
  • Thanks! Writing by partitions seems to be working, at least on a small dataset in local mode. – david_joseph May 22 '16 at 21:08
  • Follow up question. With this approach will Spark create only 26 partitions? And with that each directory would have a single file? I am concerned that the files will get very large and that if say the majority of the data would be under letter A, then that executor will fail with out-of-memory. – david_joseph May 23 '16 at 14:14
  • The function passed into "mapPartitionsWithIndex" can limit the output file size. This function is going to loop over every element in the partition. It could keep track of the number of records it's written to an output file and close that one and open a new one when it's reached some maximum. – E.F.Walker May 23 '16 at 21:52
  • There still could be an "unfair division of labor" problem when, for example, 99% of the records are A's. That's bad for performance since 99% of the work isn't getting parallelized. What I've done in this situation is get fancy with the partitioning scheme. I have one job that first measures the key distribution, and comes up with a fair partitioning strategy. That makes the code a lot more complicated, but it's effective. – E.F.Walker May 23 '16 at 22:01