I have been trying to join fields from two data sets but have been unsuccessful. I would appreciate if someone could help me achieve this. The files and my code which I have been trying is as follows
movie-metadata
975900 /m/03vyhn Ghosts of Mars 2001-08-24 14010832 98.0 {"/m/02h40lc": "English Language"} {"/m/09c7w0": "United States of America"} {"/m/01jfsb": "Thriller", "/m/06n90": "Science Fiction", "/m/03npn": "Horror", "/m/03k9fj": "Adventure", "/m/0fdjb": "Supernatural", "/m/02kdv5l": "Action", "/m/09zvmj": "Space western"}
3196793 /m/08yl5d Getting Away with Murder: The JonBenét Ramsey Mystery 2000-02-16 95.0 {"/m/02h40lc": "English Language"} {"/m/09c7w0": "United States of America"} {"/m/02n4kr": "Mystery", "/m/03bxz7": "Biographical film", "/m/07s9rl0": "Drama", "/m/0hj3n01": "Crime Drama"}
28463795 /m/0crgdbh Brun bitter 1988 83.0 {"/m/05f_3": "Norwegian Language"} {"/m/05b4w": "Norway"} {"/m/0lsxr": "Crime Fiction", "/m/07s9rl0": "Drama"}
9363483 /m/0285_cd White Of The Eye 1987 110.0 {"/m/02h40lc": "English Language"} {"/m/07ssc": "United Kingdom"} {"/m/01jfsb": "Thriller", "/m/0glj9q": "Erotic thriller", "/m/09blyk": "Psychological thriller"}
character-metadata
975900 /m/03vyhn 2001-08-24 Akooshay 1958-08-26 F 1.62 Wanda De Jesus 42 /m/0bgchxw /m/0bgcj3x /m/03wcfv7
975900 /m/03vyhn 2001-08-24 Lieutenant Melanie Ballard 1974-08-15 F 1.78 /m/044038p Natasha Henstridge 27 /m/0jys3m /m/0bgchn4 /m/0346l4
975900 /m/03vyhn 2001-08-24 Desolation Williams 1969-06-15 M 1.727 /m/0x67 Ice Cube 32 /m/0jys3g /m/0bgchn_ /m/01vw26l
975900 /m/03vyhn 2001-08-24 Sgt Jericho Butler 1967-09-12 M 1.75 Jason Statham 33 /m/02vchl6 /m/0bgchnq /m/034hyc
In the first file, I am interested in first field which is movie id and third field movie name. While in the second file, the first field is movie id and 9th field is actor name(s). There can be multiple actor names for every movieId as shown in file-2 above. The output I am trying to achieve is in the following format
movieId movieName, actorName1, actorName2, actorName3....etc.
I have been successful in extracting the fields from two mapper classes. In reducer class, my code does not seem to achieve the format above which I intend as output. I get the output as
movieId movieName, actorName1
I do not get the rest of the names of actors. Please have a look at my code and correct me accordingly.
public class Join {
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: Join <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJobName("Join");
job.setJarByClass(Join.class);
job.setReducerClass(JoinReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
MultipleInputs.addInputPath(job, new Path(args[0]),
TextInputFormat.class, JoinMap1.class);
MultipleInputs.addInputPath(job, new Path(args[1]),
TextInputFormat.class, JoinMap2.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
public static class JoinMap1 extends
Mapper<LongWritable, Text, Text, Text> {
private String movieId, movieName, fileTag = "A~ ";
@Override
public void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String values[] = value.toString().split("\t");
movieId = values[0].trim();
movieName = values[2].trim().replaceAll("\t", "movie Name");
context.write(new Text(movieId), new Text (fileTag + movieName));
}
}
public static class JoinMap2 extends Mapper<LongWritable, Text, Text, Text>{
private String movieId, actorName, fileTag = "B~ ";
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String values[] = line.toString().split("\t");
movieId = values[0].trim();
actorName = values[8].trim().replaceAll("\t", "actor Name");
context.write(new Text (movieId), new Text (fileTag + actorName));
}
}
public static class JoinReduce extends
Reducer<Text, Text, Text, Text> {
private String movieName, actorName;
@Override
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException
{
for (Text value : values){
String currValue = value.toString();
String splitVals[] = currValue.split("~");
if(splitVals[0].equals("A")){
movieName = splitVals[1] != null ? splitVals[1].trim() : "movieName";
} else if (splitVals[0].equals("B")){
actorName= splitVals[1] != null ? splitVals[1].trim() : "actorName";
}
}
context.write(key, new Text (movieName + ", " + actorName));
}
}
}
Please suggest me what can be done so that I can achieve the output as shown above. Any help would be greatly appreciated. Bricks and bats are welcome.