2

I'm writing a M/R job that processes large time-series-data files written in binary format that looks something like this (new lines here for readability, actual data is continuous, obviously):

TIMESTAMP_1---------------------TIMESTAMP_1
TIMESTAMP_2**********TIMESTAMP_2 
TIMESTAMP_3%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%TIMESTAMP_3
.. etc

Where timestamp is simply a 8 byte struct, identifiable as such by the first 2 bytes. The actual data is bounded between duplicate value timestamps, as displayed above, and contains one or more predefined structs. I would like to write a custom InputFormat that will emit the key/value pair to the mappers:

< TIMESTAMP_1, --------------------- >
< TIMESTAMP_2, ********** >
< TIMESTAMP_3, %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% >

Logically, I'd like to keep track of the current TIMESTAMP, and aggregate all the data until that TIMESTAMP is detected again, then send out my <TIMESTAMP, DATA> pair as a record. My problem is syncing between splits inside the RecordReader, so if a certain reader receives the following split

# a split occurs inside my data
reader X: TIMESTAMP_1--------------
reader Y: -------TIMESTAMP_1 TIMESTAMP_2****..

# or inside the timestamp
or even: @@@@@@@TIMES
         TAMP_1-------------- ..

What's a good way to approach this? Do I have an easy way to access the file offsets such that my CustomRecordReader can sync between splits and not lose data? I feel I have some conceptual gaps on how splits are handled, so perhaps an explanation of these may help. thanks.

skaffman
  • 398,947
  • 96
  • 818
  • 769
sa125
  • 28,121
  • 38
  • 111
  • 153

2 Answers2

3

In general it is not simple to create input format which support splits, since you should be able to find out where to move from the split boundary to get consistent records. XmlInputFormat is good example of format doing so.
I would suggest first consider if you indeed need splittable inputs? You can define your input format as not splittable and not have all these issues.
If you files are generally not much larger then block size - you loose nothing. If they do - you will loose part of the data locality.

David Gruzman
  • 7,900
  • 1
  • 28
  • 30
  • my files are on the order of 100's of MB to 10's of GB, so definitely splittable. Won't I take a major performance hit if I prevent the files from being split? – sa125 May 10 '12 at 12:54
  • Your performance hit would depend on relation speed of processing / network speed. Processing big file in one mapper will mean that most of it came from the network, and not local drives. Now, if you process 10mb/sec - you will not feel it. If you process 400 - you do (assuming 1 GBit network). You can also increase block size for these files, thereof reducing the impact – David Gruzman May 10 '12 at 13:33
2

You can subclass the concrete subclass of FileInputFormat, for example, SeqenceFileAsBinaryInputFormat, and override the isSplitable() method to return false:

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.SequenceFileAsBinaryInputFormat;

public class NonSplitableBinaryFile extends SequenceFileAsBinaryInputFormat{

  @Override
  protected boolean isSplitable(FileSystem fs, Path file) {
      return false;
  }

  @Override
  public RecordReader getRecordReader(InputSplit split, JobConf job,
  Reporter reporter) throws IOException {
    //return your customized record reader here
  }
}
arghtype
  • 4,376
  • 11
  • 45
  • 60
yjshen
  • 6,583
  • 3
  • 31
  • 40
  • How about keep track of your file's size while writing it to hdfs? when the file's size near the threshold, close it and open a new file to write. as David said, in order to not lose locality. – yjshen May 10 '12 at 13:10