6

How to merge multiple sequence files into one sequence file within Hadoop Thanks.

cldo
  • 1,735
  • 6
  • 21
  • 26
  • possible duplicate of [merge output files after reduce phase](http://stackoverflow.com/questions/5700068/merge-output-files-after-reduce-phase) – Shahryar Aug 12 '14 at 11:00

4 Answers4

5

If you want to merge multiple files into single file then here is two ans :

Native language


getmerge

Usage: hadoop fs -getmerge <src> <localdst>

Takes a source directory and a destination file as input and concatenates files in src into the destination local file. Optionally addnl can be set to enable adding a newline character at the end of each file.



Java API


org.apache.hadoop.fs.FileUtil.copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString);

Copy all files in a directory to one output file (merge)

Copy to hdfs

put

Usage: hadoop dfs -put <localsrc> ... <dst>

Copy single src, or multiple srcs from local file system to the destination filesystem. Also reads input from stdin and writes to destination filesystem.

copyFromLocal

Usage: hadoop dfs -copyFromLocal <localsrc> URI

Similar to put command, except that the source is restricted to a local file reference.

saurabh shashank
  • 1,343
  • 2
  • 14
  • 22
  • Thank.But i want to output file out to hdfs. – cldo Dec 07 '12 at 05:09
  • @cldo Edited my ans . Pls have a look . – saurabh shashank Dec 07 '12 at 05:33
  • 3
    I getmerge file local then put file to hdfs.Then run hadoop job in this file.Error is :java.io.IOException: File is corrupt!at org.apache.hadoop.io.SequenceFile$Reader.readBlock(SequenceFile.java:1734) – cldo Dec 07 '12 at 07:21
  • 4
    Yes, `getmerge` literally concatenates the bytes of the input files. This woks fine for text files, but for sequence files you need intelligent key-value merging. Most importantly, you don't want the header of one file copied into another and then interpreted as a record entry. – Ben Sidhom Sep 26 '13 at 18:26
4

Have you considered forqlift? I wrote it to handle certain SequenceFile chores, including SequenceFile merges.

In your case, you could run:

forqlift seq2seq --file new_combined_file.seq  \
    original_file1.seq  original_file2.seq original_file3.seq ...

Granted, forqlift's seq2seq tool is marked "experimental" ... but it's worked well on my (admittedly limited) internal testing.

qethanm
  • 398
  • 1
  • 7
  • 2
    This is great! Have you considered hosting the source code in such a way that it's easier for others to contribute? – Ben Sidhom Sep 26 '13 at 18:27
3

If you are dealing with a significant amount of sequence files, I suggest writing a MapReduce job that uses the Mapper as your mapper and Reducer as your reducer. For i/o formats, use SequenceFileInputFormat and SequenceFileOutputFormat. Set the number of reducers to 1. These are all things you set in the Configuration and Job objects in the driver/main code. See how to set the output format, how to set the input format, how to set the mapper, and how to set the reducer.

Note that the default behavior of Mapper and Reducer is to do nothing to the data-- just pass it through. That's why you don't write a map function or reduce function here.

What this will do is load your sequence file, do nothing to the data in the mapper, shuffle all of the records to the reducer, then output them all to one file. This does have the side effect of sorting the keys in the output sequence file.

Donald Miner
  • 38,889
  • 8
  • 95
  • 118
3

You CAN NOT use hadoop getmerge for sequence files because it will merge them as binary files, not as sequence files (so you will get a lot of headers in your merged file,..).

So you can either write a small hadoop job with single reducer, as @Donald-miner suggested, or write a standalone merger by using SequenceFile.Reader and SeuquenceFile.Writer.


I took the second option and here is my code:

package ru.mail.go.webbase.markov.hadoop.utils;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFilesUtils {
    private static final Configuration conf = HBaseConfiguration.create();

public static <K, V> void merge(Path fromDirectory, Path toFile, Class<K> keyClass, Class<V> valueClass) throws IOException {
    FileSystem fs = FileSystem.get(conf);

    if (!fs.isDirectory(fromDirectory)) {
        throw new IllegalArgumentException("'" + fromDirectory.toString() + "' is not a directory");
    }

    SequenceFile.Writer writer = SequenceFile.createWriter(
            conf,
            SequenceFile.Writer.file(toFile),
            SequenceFile.Writer.keyClass(keyClass),
            SequenceFile.Writer.valueClass(valueClass)
            );

    for (FileStatus status : fs.listStatus(fromDirectory)) {
        if (status.isDirectory()) {
            System.out.println("Skip directory '" + status.getPath().getName() + "'");
            continue;
        }

        Path file = status.getPath();

        if (file.getName().startsWith("_")) {
            System.out.println("Skip \"_\"-file '" + file.getName() + "'"); //There are files such "_SUCCESS"-named in jobs' ouput folders 
            continue;
        }

        System.out.println("Merging '" + file.getName() + "'");

        SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file));
        Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
        Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

        while (reader.next(key, value)) {
            writer.append(key, value);
        }

        reader.close();
    }

    writer.close();
}
}

Here is my test:

public class SequenceFilesUtilsTest {
private static final String OUT_PATH = "./UNIVERSE/SequenceFilesUtilsTest/";

@Before
public void initEnviroment() throws IOException {
    TestUtils.createDirectory(OUT_PATH);
    TestUtils.createDirectory(OUT_PATH + "/in");
}

@Test
public void test() throws Exception {
    Configuration conf = HBaseConfiguration.create();

    Path inPath1 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in1.seq");
    System.out.println("Saving first part to '" + inPath1 + "'");
    SequenceFile.Writer writer1 = SequenceFile.createWriter(
            conf,
            SequenceFile.Writer.file(inPath1),
            SequenceFile.Writer.keyClass(LongWritable.class),
            SequenceFile.Writer.valueClass(Text.class)
            );
    writer1.append(new LongWritable(101), new Text("FIRST1"));
    writer1.append(new LongWritable(102), new Text("FIRST2"));
    writer1.append(new LongWritable(103), new Text("FIRST3"));
    writer1.append(new LongWritable(104), new Text("FIRST4"));
    writer1.close();

    Path inPath2 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in2.seq");
    System.out.println("Saving second part to '" + inPath2 + "'");
    SequenceFile.Writer writer2 = SequenceFile.createWriter(
            conf,
            SequenceFile.Writer.file(inPath2),
            SequenceFile.Writer.keyClass(LongWritable.class),
            SequenceFile.Writer.valueClass(Text.class)
            );
    writer2.append(new LongWritable(201), new Text("SND1"));
    writer2.append(new LongWritable(202), new Text("SND2"));
    writer2.append(new LongWritable(203), new Text("SND3"));
    writer2.close();

    SequenceFilesUtils.merge(
            new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in"),
            new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq"),
            LongWritable.class,
            Text.class);

    Path mergedPath = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq");
    SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(mergedPath));
    LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
    Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);

    reader.next(key, value);
    Assert.assertEquals(101, key.get());
    Assert.assertEquals("FIRST1", value.toString());

    reader.next(key, value);
    Assert.assertEquals(102, key.get());
    Assert.assertEquals("FIRST2", value.toString());

    reader.next(key, value);
    Assert.assertEquals(103, key.get());
    Assert.assertEquals("FIRST3", value.toString());

    reader.next(key, value);
    Assert.assertEquals(104, key.get());
    Assert.assertEquals("FIRST4", value.toString());

    reader.next(key, value);
    Assert.assertEquals(201, key.get());
    Assert.assertEquals("SND1", value.toString());

    reader.next(key, value);
    Assert.assertEquals(202, key.get());
    Assert.assertEquals("SND2", value.toString());

    reader.next(key, value);
    Assert.assertEquals(203, key.get());
    Assert.assertEquals("SND3", value.toString());

    reader.close();
}
}
Filipp Voronov
  • 4,077
  • 5
  • 25
  • 32