i am very much new to hadoop,can any one give me a simple program on how to skip bad recors in hadoop map/reduce?
Thanks in Advance
i am very much new to hadoop,can any one give me a simple program on how to skip bad recors in hadoop map/reduce?
Thanks in Advance
Since you are filtering records based on missingness of fields, this is logic suitable for your Mapper implementation. A Java API Mapper could look something like this:
public class FilteringMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
private static final Logger _logger = Logger.getLogger(FilteringMapper.class);
protected void map(LongWritable key, Text value, Context context) {
if(recordIsBad(value))
_logger.info(<log record data you care about>);
else
context.write(key, value);
}
private boolean recordIsBad(Text record){
//return true if record is bad by your standards
}
}
This Mapper would only filter based on your standards. If you need further transformations of data in the Mapper this is easily added.
The best way to handle corrupt records is in your mapper or reducer code. You can detect the bad record and ignore it, or you can abort the job by throwing an exception. You can also count the total number of bad records in the job using counters to see how widespread the problem is. In rare cases, though, you can’t handle the problem because there is a bug in a third party library that you can’t work around in your mapper or reducer. In these cases, you can use Hadoop’s optional skipping mode for automatically skipping bad records. When skipping mode is enabled, tasks report the records being processed back to the tasktracker. When the task fails, the tasktracker retries the task, skipping the records that caused the failure. Because of the extra network traffic and bookkeeping to maintain the failed record ranges, skipping mode is turned on for a task only after it has failed twice.
Thus, for a task consistently failing on a bad record, the tasktracker runs the following task attempts with these outcomes:
Task fails.
Task fails.
Skipping mode is enabled. Task fails, but failed record is stored by the tasktracker.
Skipping mode is still enabled. Task succeeds by skipping the bad record that failed in the previous attempt.
Skipping mode is off by default; you enable it independently for map and reduce tasks using the
SkipBadRecords
class. It’s important to note that skipping mode can detect only one bad record per task attempt, so this mechanism is appropriate only for detecting occasional bad records (a few per task, say). You may need to increase the maximum number of task attempts (viamapred.map.max.attempts
andmapred.reduce.max.attempts
) to give skipping mode enough attempts to detect and skip all the bad records in an input split. Bad records that have been detected by Hadoop are saved as sequence files in the job’s output directory under the_logs/skip
subdirectory. These can be inspected for diagnostic purposes after the job has completed (usinghadoop fs -text
, for example).
Text from "Hadoop: The Definitive Guide" By Tom White
You can skip records in your MapReduce job by setting the properties mapreduce.map.skip.maxrecords
and/or mapreduce.reduce.skip.maxgroups
to a value >0 (by default they're 0).
Here's a simple command that you can run on the shell using Hadoop streaming to launch a MapReduce job that is going to ignore 1000 mapper fails:
mapred streaming \
-D mapreduce.map.skip.maxrecords=1000 \
-file map.sh \
-file reduce.sh \
-input myInputDir \
-output myOutputDir \
-mapper map.sh \
-reducer reduce.sh
(where map.sh
and reduce.sh
are executable bash scripts).
From the documentation:
Hadoop provides an option where a certain set of bad input records can be skipped when processing map inputs. Applications can control this feature through the SkipBadRecords class.
This feature can be used when map tasks crash deterministically on certain input. This usually happens due to bugs in the map function. Usually, the user would have to fix these bugs. This is, however, not possible sometimes. The bug may be in third party libraries, for example, for which the source code is not available. In such cases, the task never completes successfully even after multiple attempts, and the job fails. With this feature, only a small portion of data surrounding the bad records is lost, which may be acceptable for some applications (those performing statistical analysis on very large data, for example).
You can see all properties related to skipping records in mapred-default.xml
.
Another relevant property is mapreduce.task.skip.start.attempts
: the number of failed attempts before the MR task will start to skip records (default is 2).