I am attempting to zip files on HDFS using BZip2. Doing this using MapReduce streaming seems like a good approach, as per an answer on the following post:
The relevant code sample from that post is:
hadoop jar contrib/streaming/hadoop-streaming-1.0.3.jar \
-Dmapred.reduce.tasks=0 \
-Dmapred.output.compress=true \
-Dmapred.compress.map.output=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec \
-input filename \
-output /filename \
-mapper /bin/cat \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-outputformat org.apache.hadoop.mapred.TextOutputFormat
hadoop fs -cat /path/part* | hadoop fs -put - /path/compressed.gz
In practice, I have done the equivalent to the above instead for BZip2, and using the Java Hadoop Streaming API, and using ToolRunner to invoke the streaming job on the cluster from a non-cluster machine.
Important features of this approach are:
Because this is a streaming job where the input format is text, only the value is outputted from the map task, not the key, i.e. the contents of the line of the file and not the byte offset.
It is a map only job. I believe this means that no sorting happens at the end of the map phase, which makes it efficient.
The separate zipped up parts are joined together to produce a single zipped file.
Having tried this approach, I found that it seemed to work efficiently, but then discovered that there is no guarantee that the part* files are in the correct order before they are joined together. I.e. when I unzipped the files, the chunks were themselves always internally correct, but sometimes in the wrong order.
Possible solutions I have been considering basically fall into two categories - either introducing reducers or not.
If introducing reducers, then the following post seems relevant:
MapReduce job output sort order
The question is not the same requirement, but the idea of having a global sort across all part* files seems relevant, and can be done by overriding the default partitioner amongst other ways. However I don't think this serves my purpose, because if I were to have reducers then the map phase will sort the data (alphabetically on the line data, because the key is not written, as per point 1 above), which I don't want to happen. Even if I could prevent the sort, it feels that using reducers is not really the way to solve this problem.
So I am left with trying to find a way to make this work with zero reducers. What I'm really after is a way to somehow track/label/order the input splits so that when the part* files come out of MapReduce, I know how to re-order them such that when it's unzipped, the file is the same as the original. However I suspect that MapReduce is not designed to work this way - i.e. as a user I have no control over the splits themselves, and the map jobs are run independently of each other and may finish in any order.
If anyone has any ideas on how this might be solved, I'd been keen to hear from you, thanks.