3

I'm new to mapreduce , my mapper output a DbWritable object , in reducer procedure, I cann't get any value from passed object ,maybe it hadn't passed at all ? here is my code

DBWritable

public   class StWritable implements DBWritable,Writable { 
    private String stockName; 
    private double q1,q2,q3,q4; // quarter  of year 
    ... 
    @Override
    public void readFields(DataInput input) throws IOException {

    }
    @Override
    public void write(DataOutput output) throws IOException {

    }
    @Override
    public void readFields(ResultSet rs) throws SQLException {
       stockName = rs.getString("STOCK_NAME");
      // year      = rs.getInt("RECORDING_YEAR");
       q1        = rs.getDouble("Q1");
       q2        = rs.getDouble("Q2");
       q3        = rs.getDouble("Q3");
       q4        = rs.getDouble("Q4");   
    }
    @Override
    public void write(PreparedStatement pstmt) throws SQLException {
       pstmt.setString(1, stockName);
       pstmt.setDouble(2, q1); 
       pstmt.setDouble(3, q2); 
       pstmt.setDouble(4, q3); 
       pstmt.setDouble(5, q4);  
    }
    @Override
    public String toString() {
        return Double.toString(q1)+","+Double.toString(q2)+","+Double.toString(q3)+","+Double.toString(q4);
    }    
}

Mapper

public static class StockMapper extends Mapper<NullWritable, StWritable, Text , StWritable> {

        private Text stock = new Text(); 
        private Text value = new Text(); 
        private StWritable  stockq=  new StWritable() ;
        public static final Log log = LogFactory.getLog(StockMapper.class);

        @Override
        protected void map(NullWritable key, StWritable stockqWritable, Context context) throws IOException, InterruptedException {

          final String stockName = stockqWritable.getStockName();          
          final Double q1        = stockqWritable.getQ1();
          final Double q2        = stockqWritable.getQ2();
          final Double q3        = stockqWritable.getQ3();
          final Double q4        = stockqWritable.getQ4();

           stock.set(stockName);
           stockq.setStockName(stockName);
           stockq.setQ1(q1);
           stockq.setQ2(q2);
           stockq.setQ3(q3);
           stockq.setQ4(q4);
          // value.set(stockq.toString());
           log.info("map the stockq value is  "+ stockqWritable.toString());
           context.write(stock,stockq);
        }

    }

reducer

public static class StockReducer extends Reducer<Text, StWritable, NullWritable , StWritable> {

        public static final Log log = LogFactory.getLog(StockReducer.class);
        StWritable stock = new StWritable();
        double q1 =Double.MIN_VALUE,q2 =Double.MIN_VALUE,q3 =Double.MIN_VALUE ,q4 = Double.MIN_VALUE ;

        @Override
        protected void reduce(Text key, Iterable<StWritable> recordings, Context context) throws IOException, InterruptedException {

            stock.setStockName(key.toString()); 
            for(StWritable recording : recordings)    {                
               q1  = q1 + recording.getQ1();
               q2  = q2 + recording.getQ2();
               q3  = q3 + recording.getQ3();
               q4  = q4 + recording.getQ4();
               log.info("reduce  the stockq value is  "+ recording.toString());
            }
           stock.setQ1(q1);
           stock.setQ2(q2);
           stock.setQ3(q3);
           stock.setQ4(q4);
          // log.info("reduce  the stockq value is  "+ stock.toString());

           context.write(NullWritable.get(),stock);
        }
    }

Driver

configuration.set("hbase.zookeeper.quorum", "backup103");
final String selectQuery = "SELECT STOCK_NAME,Q1,Q2,Q3,Q4 FROM STOCKQ"; 
final Job job = Job.getInstance(configuration, "phoenix-mr-job");    
job.setJarByClass(StockMr.class);
PhoenixMapReduceUtil.setInput(job, StWritable.class, "STOCKQ",  selectQuery);
PhoenixMapReduceUtil.setOutput(job, "STOCKQ_SUM", "STOCK_NAME,Q1,Q2,Q3,Q4");
job.setMapperClass(StockMapper.class);
//job.setCombinerClass(StockCombiner.class);
job.setReducerClass(StockReducer.class); 
job.setOutputFormatClass(PhoenixOutputFormat.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(StWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(StWritable.class); 
job.setNumReduceTasks(1);
TableMapReduceUtil.addDependencyJars(job);
job.waitForCompletion(true); 

Some log info

INFO StockMr$StockMapper: map the stockq value is  86.29,86.58,81.9,83.8
INFO StockMr$StockMapper: map the stockq value is  199.27,200.26,192.55,194.84

INFO StockMr$StockReducer: reduce  the stockq value is  4.9E-324,4.9E-324,4.9E-324,4.9E-324
INFO StockMr$StockReducer: reduce  the stockq value is  4.9E-324,4.9E-324,4.9E-324,4.9E-324

Thanks.

reference

Output a list from a Hadoop Map Reduce job using custom writable

http://phoenix.apache.org/phoenix_mr.html

new edit

I got two workaround ways to the problem :one is the mapper send composite values as Text to reducer ,then Text.toString().split(",") ; another way is the mapper send MapWritable to reducer . I don't know which is better ,or both are bad...

Community
  • 1
  • 1
iwish
  • 31
  • 5

0 Answers0