0

I'm new to Hadoop, and i'm trying to do a MapReduce program, to count the max first two occurrencise of lecters by date (grouped by month). So my input is of this kind :

2017-06-01 , A, B, A, C, B, E, F 
2017-06-02 , Q, B, Q, F, K, E, F
2017-06-03 , A, B, A, R, T, E, E 
2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

so, i'm expeting as result of this MapReducer program, something like :

2017-06,  A:4, E:4
2017-07,  A:4, B:4

public class ArrayGiulioTest {

    public static Logger logger = Logger.getLogger(ArrayGiulioTest.class);

    public static class CustomMap extends Mapper<LongWritable, Text, Text, TextWritable> {
        private Text word = new Text();

        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            TextWritable array = new TextWritable();
            String line = value.toString();
            StringTokenizer tokenizer = new StringTokenizer(line, ",");
            String dataAttuale = tokenizer.nextToken().substring(0,
                    line.lastIndexOf("-"));

            Text tmp = null;
            Text[] tmpArray = new Text[tokenizer.countTokens()];
            int i = 0;
            while (tokenizer.hasMoreTokens()) {
                String prod = tokenizer.nextToken(",");

                word.set(dataAttuale);
                tmp = new Text(prod);
                tmpArray[i] = tmp;

                i++;
            }

            array.set(tmpArray);

            context.write(word, array);

        }
    }

    public static class CustomReduce extends Reducer<Text, TextWritable, Text, Text> {


        public void reduce(Text key, Iterator<TextWritable> values,
                Context context) throws IOException, InterruptedException {

            MapWritable map = new MapWritable();
            Text txt = new Text();

            while (values.hasNext()) {
                TextWritable array = values.next();
                Text[] tmpArray = (Text[]) array.toArray();
                for(Text t : tmpArray) {
                    if(map.get(t)!= null) {
                        IntWritable val = (IntWritable) map.get(t);
                        map.put(t, new IntWritable(val.get()+1));
                    } else {
                        map.put(t, new IntWritable(1));
                    }
                }

            }

            Set<Writable> set = map.keySet();
            StringBuffer str = new StringBuffer();
            for(Writable k : set) {

                str.append("key: " + k.toString() + " value: " + map.get(k) + "**");
            }
            txt.set(str.toString());


            context.write(key, txt);
        }
    }

    public static void main(String[] args) throws Exception {
        long inizio = System.currentTimeMillis();
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "countProduct");
        job.setJarByClass(ArrayGiulioTest.class);

        job.setMapperClass(CustomMap.class);
        //job.setCombinerClass(CustomReduce.class);
        job.setReducerClass(CustomReduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(TextWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.waitForCompletion(true);
        long fine = System.currentTimeMillis();
        logger.info("**************************************End" + (End-Start));
        System.exit(1);
    }

}

and i've implemented my custom TextWritable in this way :

public class TextWritable extends ArrayWritable {


    public TextWritable() {
        super(Text.class);
    }
}

..so when i run my MapReduce program i obtain a result of this kind

2017-6    wordcount.TextWritable@3e960865
2017-6    wordcount.TextWritable@3e960865

it's obvious that my reducer it doesn't works. It seems the output from my Mapper

Any idea? And someone can says if is the right path to the solution?

Here Console Log (Just for information, my input file has 6 rows instead of 5) *I obtain the same result starting MapReduce problem under eclipse(mono JVM) or using Hadoop with Hdfs

File System Counters
    FILE: Number of bytes read=1216
    FILE: Number of bytes written=431465
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
Map-Reduce Framework
    Map input records=6
    Map output records=6
    Map output bytes=214
    Map output materialized bytes=232
    Input split bytes=97
    Combine input records=0
    Combine output records=0
    Reduce input groups=3
    Reduce shuffle bytes=232
    Reduce input records=6
    Reduce output records=6
    Spilled Records=12
    Shuffled Maps =1
    Failed Shuffles=0
    Merged Map outputs=1
    GC time elapsed (ms)=0
    Total committed heap usage (bytes)=394264576
Shuffle Errors
    BAD_ID=0
    CONNECTION=0
    IO_ERROR=0
    WRONG_LENGTH=0
    WRONG_MAP=0
    WRONG_REDUCE=0
File Input Format Counters 
    Bytes Read=208
File Output Format Counters 
    Bytes Written=1813
GIULIO
  • 41
  • 8
  • sorry for the silly question, but are you using java 8 ? have considered the built-in map-reduce methods ? maybe this can help. https://www.sitepoint.com/java-8-streams-filter-map-reduce/ – catastrophic error Jun 06 '17 at 20:27
  • @guicl That is a silly question because that's obviously not Hadoop – OneCricketeer Jun 06 '17 at 20:28
  • Possible duplicate of [How do I print my Java object without getting "SomeType@2f92e0f4"?](https://stackoverflow.com/questions/29140402/how-do-i-print-my-java-object-without-getting-sometype2f92e0f4) – OneCricketeer Jun 06 '17 at 20:29
  • I'm using java7. @guicl and i ve to solve without built in mapreduce metodo – GIULIO Jun 06 '17 at 20:31
  • @cricket_007 Mmmm no, because i Want to obtain a Text as output from reducer ... or a MapWritable if i can print it well. But seems that my reducer doesnt works – GIULIO Jun 06 '17 at 20:34
  • May I ask the purpose of using anything other than `ArrayWritable`? 1) `Text` already exists and it is a Writable class, so you should name your classes better. 2) Your custom implementation doesn't add anything. – OneCricketeer Jun 06 '17 at 20:34
  • And are you sure you are reading the file output by the reducer? Where is the `key:` or `value:` output? It seems you are reading the mapper output – OneCricketeer Jun 06 '17 at 20:40
  • @cricket_007 that's the point, i was expeting in my output something with key 2016-6 and value as in reduce method : "key: " + k.toString() + " value: " + map.get(k) + "**", instead i obtaing toString() method of mine custom TextWritable Object, as MapOutput value "job.setMapOutputValueClass(TextWritable.class);". While i'm expeting a Text as value. Actually, i was thinking to use in my reducer the MapWritable to put all Lecters and their quantity, and then make a computation to establish the two with most occurrencies and then print them. – GIULIO Jun 06 '17 at 20:49
  • @cricket_007 i've used my custom TextWritable, that extends ArrayWritable, beacuse i've read somewhere that to use ArrayWritable in the reducer, i've to extend ArrayWritable class with my custom class. Obviously i'm a dummy, so if you know a simple way to do better... – GIULIO Jun 06 '17 at 20:54
  • Well, first, `ArrayWritable` has no `toString` method, but you are welcome to implement one in your `TextWritable` (which should be renamed to `TextArrayWritable` in my opinion). Second, `ArrayWritable(String[] strings)` is the only constructor you called, so I don't think you need a custom class, that's all. Was the file that you opened named `part-r-0000`, for example? – OneCricketeer Jun 06 '17 at 21:39
  • @cricket_007 yes its named part-r-00000 – GIULIO Jun 06 '17 at 21:45
  • @cricket_007 but i still have as output the Mapper output instead of reducer output.... – GIULIO Jun 06 '17 at 21:52
  • Well, I think you are on the correct path, but at least write an actual `toString()` method so that you can see what input your reducer is trying to read. – OneCricketeer Jun 06 '17 at 22:19
  • By the way, i ve wrote my to string method of textarraywritable and as expected it has collections of lecters insidie. But still continue to write as my output , the Mapper output . Have you tried my code? And gave you same result? @cricket_007 thanks in advance – GIULIO Jun 07 '17 at 05:38
  • Remove `job.setCombinerClass(CustomReduce.class);` You can't use your reducer as a combiner, it emits different key/value types to the input types it receives. – Binary Nerd Jun 07 '17 at 08:40
  • @BinaryNerd yes it's true, but still doesn't change anything. It continues to print my Mapper output instead of my Reducer output. I've edited my post with the console log. – GIULIO Jun 07 '17 at 20:10

2 Answers2

0

I think you're trying to do too much work in the Mapper. You only need to group the dates (which it seems you aren't formatting them correctly anyway based on your expected output).

The following approach is going to turn these lines, for example

2017-07-01 , A, B, A, C, B, E, F
2017-07-05 , A, B, A, G, B, G, G

Into this pair for the reducer

2017-07 , ("A,B,A,C,B,E,F", "A,B,A,G,B,G,G")

In other words, you gain no real benefit by using an ArrayWritable, just keep it as text.


So, the Mapper would look like this

class CustomMap extends Mapper<LongWritable, Text, Text, Text> {

    private final Text key = new Text();
    private final Text output = new Text();

    @Override
    protected void map(LongWritable offset, Text value, Context context) throws IOException, InterruptedException {

        int separatorIndex = value.find(",");

        final String valueStr = value.toString();
        if (separatorIndex < 0) {
            System.err.printf("mapper: not enough records for %s", valueStr);
            return;
        }
        String dateKey = valueStr.substring(0, separatorIndex).trim();
        String tokens = valueStr.substring(1 + separatorIndex).trim().replaceAll("\\p{Space}", "");

        SimpleDateFormat fmtFrom = new SimpleDateFormat("yyyy-MM-dd");
        SimpleDateFormat fmtTo = new SimpleDateFormat("yyyy-MM");

        try {
            dateKey = fmtTo.format(fmtFrom.parse(dateKey));
            key.set(dateKey);
        } catch (ParseException ex) {
            System.err.printf("mapper: invalid key format %s", dateKey);
            return;
        }

        output.set(tokens);
        context.write(key, output);
    }
}

And then the reducer can build a Map that collects and counts the values from the value strings. Again, writing out only Text.

class CustomReduce extends Reducer<Text, Text, Text, Text> {

    private final Text output = new Text();

    @Override
    protected void reduce(Text date, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        Map<String, Integer> keyMap = new TreeMap<>();
        for (Text v : values) {
            String[] keys = v.toString().trim().split(",");

            for (String key : keys) {
                if (!keyMap.containsKey(key)) {
                    keyMap.put(key, 0);
                }
                keyMap.put(key, 1 + keyMap.get(key));
            }
        }

        output.set(mapToString(keyMap));
        context.write(date, output);
    }

    private String mapToString(Map<String, Integer> map) {
        StringBuilder sb = new StringBuilder();
        String delimiter = ", ";
        for (Map.Entry<String, Integer> entry : map.entrySet()) {
            sb.append(
                    String.format("%s:%d", entry.getKey(), entry.getValue())
            ).append(delimiter);
        }
        sb.setLength(sb.length()-delimiter.length());
        return sb.toString();
    }
}

Given your input, I got this

2017-06 A:4, B:4, C:1, E:4, F:3, K:1, Q:2, R:1, T:1
2017-07 A:4, B:4, C:1, E:1, F:1, G:3
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • what is segment in mapper? – GIULIO Jun 11 '17 at 17:17
  • Ok, it works and i solved also my previous issue thanks to your code. I was defining my reduce method in this way with this param : Iterator values, while it will be with Iterable. I've lost many days because i've write in method sign Iterator and not Iterable :D – GIULIO Jun 11 '17 at 18:04
0

The main problem is about the sign of the reduce method :

I was writing : public void reduce(Text key, Iterator<TextWritable> values, Context context)

instead of

    public void reduce(Text key, Iterable<ArrayTextWritable> values,

This is the reason why i obtain my Map output instead of my Reduce otuput

GIULIO
  • 41
  • 8