You can use two Map function, Each Map Function Can process one data set if you want to implement different processing. You need to register your map with your job conf. For eg:
public static class FullOuterJoinStdDetMapper extends MapReduceBase implements Mapper <LongWritable ,Text ,Text, Text>
{
private String person_name, book_title,file_tag="person_book#";
private String emit_value = new String();
//emit_value = "";
public void map(LongWritable key, Text values, OutputCollector<Text,Text>output, Reporter reporter)
throws IOException
{
String line = values.toString();
try
{
String[] person_detail = line.split(",");
person_name = person_detail[0].trim();
book_title = person_detail[1].trim();
}
catch (ArrayIndexOutOfBoundsException e)
{
person_name = "student name missing";
}
emit_value = file_tag + person_name;
output.collect(new Text(book_title), new Text(emit_value));
}
}
public static class FullOuterJoinResultDetMapper extends MapReduceBase implements Mapper <LongWritable ,Text ,Text, Text>
{
private String author_name, book_title,file_tag="auth_book#";
private String emit_value = new String();
// emit_value = "";
public void map(LongWritable key, Text values, OutputCollectoroutput, Reporter reporter)
throws IOException
{
String line = values.toString();
try
{
String[] author_detail = line.split(",");
author_name = author_detail[1].trim();
book_title = author_detail[0].trim();
}
catch (ArrayIndexOutOfBoundsException e)
{
author_name = "Not Appeared in Exam";
}
emit_value = file_tag + author_name;
output.collect(new Text(book_title), new Text(emit_value));
}
}
public static void main(String args[])
throws Exception
{
if(args.length !=3)
{
System.out.println("Input outpur file missing");
System.exit(-1);
}
Configuration conf = new Configuration();
String [] argum = new GenericOptionsParser(conf,args).getRemainingArgs();
conf.set("mapred.textoutputformat.separator", ",");
JobConf mrjob = new JobConf();
mrjob.setJobName("Inner_Join");
mrjob.setJarByClass(FullOuterJoin.class);
MultipleInputs.addInputPath(mrjob,new Path(argum[0]),TextInputFormat.class,FullOuterJoinStdDetMapper.class);
MultipleInputs.addInputPath(mrjob,new Path(argum[1]),TextInputFormat.class,FullOuterJoinResultDetMapper.class);
FileOutputFormat.setOutputPath(mrjob,new Path(args[2]));
mrjob.setReducerClass(FullOuterJoinReducer.class);
mrjob.setOutputKeyClass(Text.class);
mrjob.setOutputValueClass(Text.class);
JobClient.runJob(mrjob);
}