-1

I am currently writing a Hadoop program that outputs the top 100 most tweeted hastags given a data set of tweets. I was able to output all the hashtags with the WordCount program. So the output looks like this, ignore the quotation marks:

"#USA 2" 

"#Holy 5"

"#SOS 3"

"#Love 66"

However, I ran into trouble when I attempt to sort them by their word frequencies (the value) with the code from here.

I noticed that the key are integers instead of strings for the program input provided in the link above. I try changing a few parameters in the code to fit my usage but it didn't work out so well, as I don't understand them so well. Please help me!

Gyanendra Dwivedi
  • 5,511
  • 2
  • 27
  • 53
  • You need 2 mapreduce jobs, one which performs the wordcount and the other that sorts the output. One idea of sorting that I can think of is interchanging the key and values, so that it gets automatically sorted. To sort in descending though, you may need to impplement some comparator. – Amita Mar 14 '18 at 07:39
  • Thank you very much for the quick response Amita. I understand the idea completely. The interchanging and comparator are all done in the link I provided. However, I am just having issues with setting the parameters for the mapper and the reducer because the program in the link takes input like "30284 12" instead of "#SOS 12". Basically, I don't know what to change in the code. – Ragnall_Mac_Somairle Mar 14 '18 at 07:43
  • Possible duplicate of [hadoop map reduce secondary sorting](https://stackoverflow.com/questions/18395998/hadoop-map-reduce-secondary-sorting) – philantrovert Mar 14 '18 at 08:17
  • Use the key as Text type instead of IntWritable. @Ragnall_Mac_Somairle – Amita Mar 14 '18 at 09:10
  • added changed code. – Gyanendra Dwivedi Mar 14 '18 at 19:43

1 Answers1

1

You need a second mapReduce job, Where the input is the output of your first job.

I have tweaked the code to make it work as per your wish.

For Input

#USA 2

#Holy 5

#SOS 3

#Love 66 

The output should be

66 #Love 

5 #Holy 

3 #SOS 

2 #USA

I have assumed that tab is delimited between hashtag and count. If it is something else, please change that. The code is not tested, please let me know if it works.

package com.my.cert.example;

import java.nio.ByteBuffer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.IntWritable.Comparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class ValueSortExp {
 public static void main(String[] args) throws Exception {

  Path inputPath = new Path("C:\\hadoop\\test\\test.txt");
  Path outputDir = new Path("C:\\hadoop\\test\\test1");

  // Path inputPath = new Path(args[0]);
  // Path outputDir = new Path(args[1]);

  // Create configuration
  Configuration conf = new Configuration(true);

  // Create job
  Job job = new Job(conf, "Test HIVE commond");
  job.setJarByClass(ValueSortExp.class);

  // Setup MapReduce
  job.setMapperClass(ValueSortExp.MapTask.class);
  job.setReducerClass(ValueSortExp.ReduceTask.class);
  job.setNumReduceTasks(1);

  // Specify key / value
  job.setMapOutputKeyClass(IntWritable.class);
  job.setMapOutputValueClass(Text.class);
  job.setOutputKeyClass(IntWritable.class);
  job.setOutputValueClass(Text.class);
  job.setSortComparatorClass(IntComparator.class);
  // Input
  FileInputFormat.addInputPath(job, inputPath);
  job.setInputFormatClass(TextInputFormat.class);

  // Output
  FileOutputFormat.setOutputPath(job, outputDir);
  job.setOutputFormatClass(TextOutputFormat.class);

  /*
   * // Delete output if exists FileSystem hdfs = FileSystem.get(conf); if
   * (hdfs.exists(outputDir)) hdfs.delete(outputDir, true);
   * 
   * // Execute job int code = job.waitForCompletion(true) ? 0 : 1;
   * System.exit(code);
   */

  // Execute job
  int code = job.waitForCompletion(true) ? 0 : 1;
  System.exit(code);

 }

 public static class IntComparator extends WritableComparator {

     public IntComparator() {
         super(IntWritable.class);
     }

     @Override
     public int compare(byte[] b1, int s1, int l1,
             byte[] b2, int s2, int l2) {

         Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
         Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();

         return v1.compareTo(v2) * (-1);
     }
 }

 public static class MapTask extends
   Mapper<LongWritable, Text, IntWritable, IntWritable> {
  public void map(LongWritable key, Text value, Context context)
    throws java.io.IOException, InterruptedException {
   String line = value.toString();
   String[] tokens = line.split("\t"); // This is the delimiter between Key and Value
   int valuePart = Integer.parseInt(tokens[1]);
   context.write(new IntWritable(valuePart), new Text(tokens[0]));
  }
 }

 public static class ReduceTask extends
   Reducer<IntWritable, Text, Text, IntWritable> {
  public void reduce(IntWritable key, Iterable<Text> list, Context context)
    throws java.io.IOException, InterruptedException {

   for (Text value : list) {

    context.write(value,key);

   }

  }
 }

}
Gyanendra Dwivedi
  • 5,511
  • 2
  • 27
  • 53
  • Thank you so much! I really appreciated it! Unfortunately I'm new to stackoverflow so they won't let me thumbs up for your answer publicly. – Ragnall_Mac_Somairle Mar 21 '18 at 17:15
  • @Ragnall_Mac_Somairle You may accept the answer, if it helped. – Gyanendra Dwivedi Mar 21 '18 at 17:28
  • @GyanendraDwivedi Thank you for your help! I am getting an error and not sure why. Could you please assist me with that. Thank you! error: incompatible types: Text cannot be converted to IntWritable context.write(new IntWritable(valuePart), new Text(tokens[0])); – Ameen Apr 04 '20 at 06:31