Alright there is no specific way to unzip files in hadoop file system , but after long research i have figured it out how to unzip it directly in hadoop file system.The Prerequisite is you need to copy the zip file at a certain location and then run a mapreduce Job. Its obivious that hadoop does not understand the zipfile input format so we need to customise the Mapper as well as reducer so we can control what mapper emits and reducer consumes. Note that this Mapreduce will run on a single Mapper because when customizing the Record Reader Class Provided by hadoop we are disabling the split method i.e making it false. So the Mapreduce will have Filename as Key and The Content of Uncompressed File as as the Value. When reducer consumes it i have made the output outputkey as null so only unzipped content remains with the reducer and the number of reducer is set to one so all dumps in a single part file.
We all know that hadoop cannot handle zip files on its own but java can handle with the help of its own ZipFile class which can read zip file contents through zipinputstrem and zip entry through zipentry so we write a customized ZipInputFormat Class which extends FileInputFormat.
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ZipFileInputFormat extends FileInputFormat<Text, BytesWritable> {
/** See the comments on the setLenient() method */
private static boolean isLenient = false;
/**
* ZIP files are not splitable so they cannot be overrided so function
* return false
*/
@Override
protected boolean isSplitable(JobContext context, Path filename) {
return false;
}
/**
* Create the ZipFileRecordReader to parse the file
*/
@Override
public RecordReader<Text, BytesWritable> createRecordReader(
InputSplit split, TaskAttemptContext context) throws IOException,
InterruptedException {
return new ZipFileRecordReader();
}
/**
*
* @param lenient
*/
public static void setLenient(boolean lenient) {
isLenient = lenient;
}
public static boolean getLenient() {
return isLenient;
}
}
Note that the RecordReader class returns the ZipFileRecordReadeader the Customized version of Hadoop RecordReader class we were talking about.Lets now slightly simplify the RecordReader class
import java.io.IOException;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.zip.ZipEntry;
import java.util.zip.ZipException;
import java.util.zip.ZipInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class ZipFileRecordReader extends RecordReader<Text, BytesWritable> {
/** InputStream used to read the ZIP file from the FileSystem */
private FSDataInputStream fsin;
/** ZIP file parser/decompresser */
private ZipInputStream zip;
/** Uncompressed file name */
private Text currentKey;
/** Uncompressed file contents */
private BytesWritable currentValue;
/** Used to indicate progress */
private boolean isFinished = false;
/**
* Initialise and open the ZIP file from the FileSystem
*/
@Override
public void initialize(InputSplit inputSplit,
TaskAttemptContext taskAttemptContext) throws IOException,
InterruptedException {
FileSplit split = (FileSplit) inputSplit;
Configuration conf = taskAttemptContext.getConfiguration();
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
// Open the stream
fsin = fs.open(path);
zip = new ZipInputStream(fsin);
}
/**
* Each ZipEntry is decompressed and readied for the Mapper. The contents of
* each file is held *in memory* in a BytesWritable object.
*
* If the ZipFileInputFormat has been set to Lenient (not the default),
* certain exceptions will be gracefully ignored to prevent a larger job
* from failing.
*/
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
ZipEntry entry = null;
try {
entry = zip.getNextEntry();
} catch (ZipException e) {
if (ZipFileInputFormat.getLenient() == false)
throw e;
}
// Sanity check
if (entry == null) {
isFinished = true;
return false;
}
// Filename
currentKey = new Text(entry.getName());
if (currentKey.toString().endsWith(".zip")) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] temp1 = new byte[8192];
while (true) {
int bytesread1 = 0;
try {
bytesread1 = zip.read(temp1, 0, 8192);
} catch (EOFException e) {
if (ZipFileInputFormat.getLenient() == false)
throw e;
return false;
}
if (bytesread1 > 0)
bos.write(temp1, 0, bytesread1);
else
break;
}
zip.closeEntry();
currentValue = new BytesWritable(bos.toByteArray());
return true;
}
// Read the file contents
ByteArrayOutputStream bos = new ByteArrayOutputStream();
byte[] temp = new byte[8192];
while (true) {
int bytesRead = 0;
try {
bytesRead = zip.read(temp, 0, 8192);
} catch (EOFException e) {
if (ZipFileInputFormat.getLenient() == false)
throw e;
return false;
}
if (bytesRead > 0)
bos.write(temp, 0, bytesRead);
else
break;
}
zip.closeEntry();
// Uncompressed contents
currentValue = new BytesWritable(bos.toByteArray());
return true;
}
/**
* Rather than calculating progress, we just keep it simple
*/
@Override
public float getProgress() throws IOException, InterruptedException {
return isFinished ? 1 : 0;
}
/**
* Returns the current key (name of the zipped file)
*/
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}
/**
* Returns the current value (contents of the zipped file)
*/
@Override
public BytesWritable getCurrentValue() throws IOException,
InterruptedException {
return currentValue;
}
/**
* Close quietly, ignoring any exceptions
*/
@Override
public void close() throws IOException {
try {
zip.close();
} catch (Exception ignore) {
}
try {
fsin.close();
} catch (Exception ignore) {
}
}
}
Ok for convenience i have given some comments in source code so you can easily understand the how files are read and write using buffer memory.Now Lets write Mapper class for the above to classes
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<Text, BytesWritable, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Text key, BytesWritable value, Context context)
throws IOException, InterruptedException {
String filename = key.toString();
// We only want to process .txt files
if (filename.endsWith(".txt") == false)
return;
// Prepare the content
String content = new String(value.getBytes(), "UTF-8");
context.write(new Text(content), one);
}
}
Lets quickly write the Reducer for the same
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// context.write(key, new IntWritable(sum));
context.write(new Text(key), null);
}
}
Lets quickly configure the Job for Mapper and Reducer
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import com.saama.CustomisedMapperReducer.MyMapper;
import com.saama.CustomisedMapperReducer.MyReducer;
import com.saama.CustomisedMapperReducer.ZipFileInputFormat;
import com.saama.CustomisedMapperReducer.ZipFileRecordReader;
public class MyJob {
@SuppressWarnings("deprecation")
public static void main(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(MyJob.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setInputFormatClass(ZipFileInputFormat.class);
job.setOutputKeyClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
ZipFileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setNumReduceTasks(1);
job.waitForCompletion(true);
}
}
Note that in job Class we have configured InputFormatClass as our ZipFileInputFormat Class and OutputFormatClass be TextOutPutFormat Class.
Mavenize the Project and let the dependencies be as it is to run the code.Export the Jar File and deploy it on your hadoop cluster. Tested and deployed on CDH5.5 YARN. The Contents of POM File are as
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mithun</groupId>
<artifactId>CustomisedMapperReducer</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>CustomisedMapperReducer</name>
<url>http://maven.apache.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.9.13</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>