1

I have spent two days on this issue. Thanks in advance if anyone can help! Here is the description:

First mapper and reduce work well, and the output with SequenceFileOutputFormat can be found in the output path.

First mapper:

public static class TextToRecordMapper 
    extends Mapper<Object, Text, Text, IntArrayWritable>{   

    public void map(Object key, Text value, Context context
                ) throws IOException, InterruptedException {    
    }
}

First reducer:

public static class MacOneSensorSigCntReducer 
      extends Reducer<Text,IntArrayWritable,Text,IntArrayWritable> {

    public void reduce(Text key, Iterable<IntArrayWritable> values, 
                  Context context
                  ) throws IOException, InterruptedException {  
    }
}

The Job part:

Job job = new Job(conf, "word count");
job.setJarByClass(RawInputText.class);

job.setMapperClass(TextToRecordMapper.class);

job.setReducerClass(MacOneSensorSigCntReducer.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntArrayWritable.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

job.waitForCompletion(true);

This works well, and then I add my second mapper and reducer to deal with the output of the first part.

Second mapper:

public static class MacSensorsTimeLocMapper 
  extends Mapper<Text,IntArrayWritable,Text,IntWritable> {

    private Text macInfo = new Text();
    public void map(Text key, Iterable<IntArrayWritable> values, 
                  Context context
                  ) throws IOException, InterruptedException {
    }
}

Second reducer:

public static class MacInfoTestReducer 
    extends Reducer<Text,IntWritable,Text,Text> {
        public void reduce(Text key, Iterable<IntWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {
    }
}

The Job part:

Job secondJob = new Job(conf, "word count 2");

secondJob.setJarByClass(RawInputText.class);

FileInputFormat.addInputPath(secondJob, new Path(otherArgs[1]));
secondJob.setInputFormatClass(SequenceFileInputFormat.class);


secondJob.setMapperClass(MacSensorsTimeLocMapper.class);    

secondJob.setMapOutputKeyClass(Text.class);    
secondJob.setMapOutputValueClass(IntArrayWritable.class); 

//do not use test reducer to make things simple    
//secondJob.setReducerClass(MacInfoTestReducer.class);       

FileOutputFormat.setOutputPath(secondJob, new Path(otherArgs[2]));

System.exit(secondJob.waitForCompletion(true) ? 0 : 1);

The second mapper function is not called when I run the code, and the output is generated with text like the following:

00:08:CA:6C:A2:81   com.hicapt.xike.IntArrayWritable@234265

Seems like the framework calls IdentityMapper instead of mine. But how do I change that to make my mapper be called with SequenceFileInputFormat as the input format.

all the code added below:

import java.io.IOException;
import java.util.Collection;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.TreeMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class RawInputText {

  public static class TextToRecordMapper 
       extends Mapper<Object, Text, Text, IntArrayWritable>{

    private Text word = new Text();    
    private IntArrayWritable mapv = new IntArrayWritable();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {        

      String        line     = value.toString();
      String[]      valArray = line.split(",");

      if(valArray.length == 6){
          IntWritable[] valInts  = new IntWritable[2]; 

          word.set(valArray[0]+"-"+valArray[1]);

          valInts[0] = new IntWritable(Integer.parseInt(valArray[2]));
          valInts[1] = new IntWritable(Integer.parseInt(valArray[4]));

          mapv.set(valInts);      

          context.write(word, mapv);  
      }
    }
  }

  public static class MacOneSensorSigCntReducer 
      extends Reducer<Text,IntArrayWritable,Text,IntArrayWritable> {    
    private Text macKey   = new Text(); 
    private IntArrayWritable macInfo = new IntArrayWritable();

    public void reduce(Text key, Iterable<IntArrayWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {      
        String[]      keyArray = key.toString().split("-");

        if(keyArray.length < 2){
            int a = 10;

            a= 20;
        }

        String mac = keyArray[1];
        String sen = keyArray[0];

        Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
        MinuteSignalInfo minSig;
        int rssi = 0;
        int ts   = 0;
        int i    = 0;

        for (IntArrayWritable val : values) {           
            i = 0;
            for(Writable element : val.get()) {
                  IntWritable eleVal = (IntWritable)element;
                  if(i%2 == 0)
                      rssi = eleVal.get();
                  else
                      ts   = eleVal.get()/60;
                  i++;
            }

            minSig = (MinuteSignalInfo)rssiTime.get(ts);
            if(minSig == null){
                minSig = new MinuteSignalInfo();
                minSig.rssi = rssi;
                minSig.count = 1;
            }else{
                minSig.rssi  += rssi;
                minSig.count += 1;
            }           
            rssiTime.put(ts, minSig);
        }

        TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
        treeMap.putAll(rssiTime);

        macKey.set(mac);        

        i = 0;
        IntWritable[] valInts  = new IntWritable[1+treeMap.size()*3]; 

        valInts[i++] = new IntWritable(Integer.parseInt(sen));

        Collection<Integer> macs = treeMap.keySet();
        Iterator<Integer> it = macs.iterator();
        while(it.hasNext()) {
            int tsKey = it.next();
            valInts[i++] = new IntWritable(tsKey);
            valInts[i++] = new IntWritable(treeMap.get(tsKey).rssi);
            valInts[i++] = new IntWritable(treeMap.get(tsKey).count);
        }       

        macInfo.set(valInts);

        context.write(macKey, macInfo);
    }
}

  public static class MacSensorsTimeLocMapper 
      extends Mapper<Text,IntArrayWritable,Text,IntWritable> {

    private Text macInfo = new Text();
    public void map(Text key, Iterable<IntArrayWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {

        int i      = 0;
        int sensor = 0;
        int ts     = 0;
        int rssi   = 0;
        int count  = 0;  

        Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
        MinuteSignalInfo minSig;

        for (IntArrayWritable val : values) {
            i = 0;          

            for(Writable element : val.get()) {
                IntWritable eleVal = (IntWritable)element;                
                int valval   = eleVal.get();
                if(i == 0) {
                    sensor = valval;
                }else if(i%3 == 1){
                    ts = valval;
                }else if(i%3 == 2){
                    rssi = valval;
                }else if(i%3 == 0){
                    count = valval;             

                    minSig = (MinuteSignalInfo)rssiTime.get(ts);
                    if(minSig == null){
                        minSig = new MinuteSignalInfo();
                        minSig.rssi = rssi;
                        minSig.count = count;
                        minSig.sensor = sensor;

                        rssiTime.put(ts, minSig);
                    }else{
                        if((rssi/count) < (minSig.rssi/minSig.count)){
                            minSig.rssi = rssi;
                            minSig.count = count;
                            minSig.sensor = sensor;

                            rssiTime.put(ts, minSig);
                        }
                    }
                }

                i++;                
            }           
        } 

        TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
        treeMap.putAll(rssiTime);

        String macLocs = "";
        Collection<Integer> tss = treeMap.keySet();
        Iterator<Integer> it = tss.iterator();
        while(it.hasNext()) {
            int tsKey = it.next();
            macLocs += String.valueOf(tsKey) + ",";
            macLocs += String.valueOf(treeMap.get(tsKey).sensor) + ";";
        }   

       macInfo.set(macLocs);

       context.write(key, new IntWritable(10));
       //context.write(key, macInfo);
    }
  }

  public static class MacSensorsTimeLocReducer 
      extends Reducer<Text,IntArrayWritable,Text,Text> {

    private Text macInfo = new Text();
    public void reduce(Text key, Iterable<IntArrayWritable> values, 
                      Context context
                      ) throws IOException, InterruptedException {

        int i      = 0;
        int sensor = 0;
        int ts     = 0;
        int rssi   = 0;
        int count  = 0;  

        Hashtable<Integer, MinuteSignalInfo> rssiTime = new Hashtable<Integer, MinuteSignalInfo>();
        MinuteSignalInfo minSig;

        for (IntArrayWritable val : values) {
            i = 0;          

            for(Writable element : val.get()) {
                IntWritable eleVal = (IntWritable)element;                
                int valval   = eleVal.get();
                if(i == 0) {
                    sensor = valval;
                }else if(i%3 == 1){
                    ts = valval;
                }else if(i%3 == 2){
                    rssi = valval;
                }else if(i%3 == 0){
                    count = valval;             

                    minSig = (MinuteSignalInfo)rssiTime.get(ts);
                    if(minSig == null){
                        minSig = new MinuteSignalInfo();
                        minSig.rssi = rssi;
                        minSig.count = count;
                        minSig.sensor = sensor;

                        rssiTime.put(ts, minSig);
                    }else{
                        if((rssi/count) < (minSig.rssi/minSig.count)){
                            minSig.rssi = rssi;
                            minSig.count = count;
                            minSig.sensor = sensor;

                            rssiTime.put(ts, minSig);
                        }
                    }
                }

                i++;                
            }           
        } 

        TreeMap<Integer, MinuteSignalInfo> treeMap = new TreeMap<Integer, MinuteSignalInfo>();
        treeMap.putAll(rssiTime);

        String macLocs = "";
        Collection<Integer> tss = treeMap.keySet();
       Iterator<Integer> it = tss.iterator();
       while(it.hasNext()) {
        int tsKey = it.next();
        macLocs += String.valueOf(tsKey) + ",";
        macLocs += String.valueOf(treeMap.get(tsKey).sensor) + ";";
       }    

       macInfo.set(macLocs);

       context.write(key, macInfo);
    }
  }

  public static class MacInfoTestReducer 
    extends Reducer<Text,IntArrayWritable,Text,Text> {

        private Text macInfo = new Text();
        public void reduce(Text key, Iterable<IntArrayWritable> values, 
                          Context context
                          ) throws IOException, InterruptedException {
            String tmp = ""; 

            for (IntArrayWritable val : values) {


                for(Writable element : val.get()) {
                    IntWritable eleVal = (IntWritable)element;                
                    int valval   = eleVal.get();

                    tmp += String.valueOf(valval) + " ";
                }
            }


            macInfo.set(tmp);
            context.write(key, macInfo);
        }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length != 3) {
      System.err.println("Usage: wordcount <in> <out>");
      System.exit(2);
    }    

    /* 

    Job job = new Job(conf, "word count");
    job.setJarByClass(RawInputText.class);

    job.setMapperClass(TextToRecordMapper.class);

    job.setReducerClass(MacOneSensorSigCntReducer.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntArrayWritable.class);

    job.setOutputFormatClass(SequenceFileOutputFormat.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    job.waitForCompletion(true);

    */    


    Job secondJob = new Job(conf, "word count 2");

    secondJob.setJarByClass(RawInputText.class);

    FileInputFormat.addInputPath(secondJob, new Path(otherArgs[1]));
    secondJob.setInputFormatClass(SequenceFileInputFormat.class);


    secondJob.setMapperClass(MacSensorsTimeLocMapper.class);   
    //secondJob.setMapperClass(Mapper.class);

    secondJob.setMapOutputKeyClass(Text.class);    
    secondJob.setMapOutputValueClass(IntArrayWritable.class); 

    secondJob.setReducerClass(MacInfoTestReducer.class);

    //secondJob.setOutputKeyClass(Text.class);
    //secondJob.setOutputValueClass(IntArrayWritable.class);     

    FileOutputFormat.setOutputPath(secondJob, new Path(otherArgs[2]));

    System.exit(secondJob.waitForCompletion(true) ? 0 : 1);

  }
}


package com.hicapt.xike;

public class MinuteSignalInfo {
    public int sensor;
    public int rssi;
    public int count;

    public MinuteSignalInfo() {
        rssi   = 0;
        count  = 0;
        sensor = 0;
    }
}



package com.hicapt.xike;

import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.IntWritable;

public class IntArrayWritable extends ArrayWritable {

    public IntArrayWritable() {
        super(IntWritable.class);
    }
    /*
    public void readFields(DataInput in) throws IOException{        
        super.readFields(in);
    }

    public void write(DataOutput out) throws IOException{       
        super.write(out);
    }*/
}
Raymond
  • 11
  • 3
  • Thanks a lot for your edit, seaotternerd! From the link http://stackoverflow.com/questions/19376074/save-and-read-complicated-writable-value-in-hadoop-job/19395539#19395539 I wonder if I need to add write, readFields for the custom definition IntArrayWritable, it only has a construction function -- public IntArrayWritable() { super(IntWritable.class); } -- for now. – Raymond Dec 26 '13 at 07:23
  • 1
    `public void map(Text key, Iterable values, ` is not a valid mapper function, where did you get this from? – Thomas Jungblut Dec 26 '13 at 09:46
  • Thomas, thank you so much! You get me out of the fire finally! There is no valid mapper function, then hadoop uses default Mapper instead of mine. Here is the story: I wrote the code like map->combine->reduce at first, then I found the combine part will be called N times, N can be 0, 1, or many (the book state like this also). I began to use two MRs to finish the work. So origin combiner changes to first reducer, original reducer change to second mapper, and the I did not change the parameter. I am changing my code to make it work now. Thanks again, Wish you a happy Christmas holiday! – Raymond Dec 27 '13 at 02:41

0 Answers0