5

Hi,There is a topic about writing text data into multiple output directories in one spark job using MultipleTextOutputFormat

Write to multiple outputs by key Spark - one Spark job

I would ask if there is some similar way to write avro data to multiple directories

What I want is to write the data in avro file to different directory(based on the timestamp field, same day in the timestamp goes to the same directory)

Community
  • 1
  • 1
Tom
  • 5,848
  • 12
  • 44
  • 104

2 Answers2

2

The AvroMultipleOutputs class simplifies writing Avro output data to multiple outputs.

  • Case one: writing to additional outputs other than the job default output. Each additional output, or named output, may be configured with its own Schema and OutputFormat.

  • Case two: to write data to different files provided by user

AvroMultipleOutputs supports counters, by default they are disabled. The counters group is the AvroMultipleOutputs class name. The names of the counters are the same as the output name. These count the number of records written to each output name.

Also have a look at

Ram Ghadiyaram
  • 28,239
  • 13
  • 95
  • 121
0

Here is what we had implemented for our usecase in java : Write to different files with prefix depending upon the content of avro record using AvroMultipleOutputs.

Here is the wrapper on top of OutputFormat to produce multiple outputs using AvroMultipleOutputs similar to what @Ram has mentioned. https://github.com/architch/MultipleAvroOutputsFormat/blob/master/MultipleAvroOutputsFormat.java

It can be used to write avro records to multiple paths in spark the following way:

Job job = Job.getInstance(hadoopConf);
AvroJob.setOutputKeySchema(job, schema);
AvroMultipleOutputs.addNamedOutput(job,"type1",AvroKeyOutputFormat.class,schema);
AvroMultipleOutputs.addNamedOutput(job,"type2",AvroKeyOutputFormat.class,schema);

rdd.mapToPair(event->{
            if(event.isType1())
                return new Tuple2<>(new Tuple2<>("type1",new AvroKey<>(event.getRecord())),NullWritable.get());
            else
                return new Tuple2<>(new Tuple2<>("type2",new AvroKey<>(event.getRecord())),NullWritable.get());
                })
             .saveAsNewAPIHadoopFile(
                        outputBasePath,
                        GenericData.Record.class,
                        NullWritable.class,
                        MultipleAvroOutputsFormat.class,
                        job.getConfiguration()
                );

Here getRecords returns a GenericRecord. The output would be like this at outputBasePath:

17359 May 28 15:23 type1-r-00000.avro
28029 May 28 15:24 type1-r-00001.avro
16473 May 28 15:24 type1-r-00003.avro
17124 May 28 15:23 type2-r-00000.avro
30962 May 28 15:24 type2-r-00001.avro
16229 May 28 15:24 type2-r-00003.avro

This can also be used to write to different directories altogether by providing the baseOutputPath directly as mentioned here: write to multiple directory

Archit Agarwal
  • 121
  • 1
  • 6