2

I have a lot of massive files I need convert to CSV by replacing certain characters.

I am looking for reliable approach given InputStream return OutputStream and replace all characters c1 to c2.

Trick here is to read and write in parallel, I can't fit whole file in memory.

Do I need to run it in separate thread if I want read and write at the same time?

Thanks a lot for your advices.

Wild Goat
  • 3,509
  • 12
  • 46
  • 87

3 Answers3

4

To copy data from an input stream to an output stream you write data while you're reading it either a byte (or character) or a line at a time.

Here is an example that reads in a file converting all 'x' characters to 'y'.

BufferedInputStream in = new BufferedInputStream(new FileInputStream("input.dat"));
BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream("output.dat"));
int ch;
while((ch = in.read()) != -1) {
        if (ch == 'x') ch = 'y';
        out.write(ch);
}
out.close();
in.close();

Or if can use a Reader and process a line at a time then can use this aproach:

BufferedReader reader = new BufferedReader(new FileReader("input.dat"));
PrintWriter writer = new PrintWriter(
      new BufferedOutputStream(new FileOutputStream("output.dat")));
String str;
while ((str = reader.readLine()) != null) {
    str = str.replace('x', 'y');     // replace character at a time
    str = str.replace("abc", "ABC"); // replace string sequence
    writer.println(str);
}
writer.close();
reader.close();

BufferedInputStream and BufferedReader read ahead and keep 8K of characters in a buffer for performance. Very large files can be processed while only keeping 8K of characters in memory at a time.

CodeMonkey
  • 22,825
  • 4
  • 35
  • 75
  • Ok, great, thanks! But how do I read and write in parallel? I can't fit that whole file in memory. – Wild Goat Dec 23 '16 at 13:48
  • if processing file a byte at a time or line at a time then Java won't put whole file in memory. BufferedInputStream and BufferedReader above keeps a small in-memory cache while it reads so only 8K of the file is stored while reading. Don't need to parallelize the approach unless file is many terrabytes in size and want to break file into chunks. – CodeMonkey Dec 23 '16 at 13:56
  • You can create a reader/writer job class to handle a particular file and create n threads where each thread process one file at a time and repeat until done. – CodeMonkey Dec 23 '16 at 14:01
1
            FileWriter writer = new FileWriter("Report.csv");
            BufferedReader reader = new BufferedReader(new InputStreamReader(YOURSOURCE, Charsets.UTF_8));
            String line;
            while ((line = reader.readLine()) != null) {
                line.replace('c1', 'c2');
                writer.append(line);
                writer.append('\n');
            }
            writer.flush();
            writer.close();
Tchopane
  • 175
  • 8
0

You can find related answer here: Filter (search and replace) array of bytes in an InputStream

I took @aioobe's answer in that thread, and built the replacing input stream module in Java, which you can find it in my GitHub gist: https://gist.github.com/lhr0909/e6ac2d6dd6752871eb57c4b083799947

Putting the source code here as well:

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;

/**
 * Created by simon on 8/29/17.
 */
public class ReplacingInputStream extends FilterInputStream {

    private Queue<Integer> inQueue, outQueue;
    private final byte[] search, replacement;

    public ReplacingInputStream(InputStream in, String search, String replacement) {
        super(in);

        this.inQueue = new LinkedList<>();
        this.outQueue = new LinkedList<>();

        this.search = search.getBytes();
        this.replacement = replacement.getBytes();
    }

    private boolean isMatchFound() {
        Iterator<Integer> iterator = inQueue.iterator();

        for (byte b : search) {
            if (!iterator.hasNext() || b != iterator.next()) {
                return false;
            }
        }

        return true;
    }

    private void readAhead() throws IOException {
        // Work up some look-ahead.
        while (inQueue.size() < search.length) {
            int next = super.read();
            inQueue.offer(next);

            if (next == -1) {
                break;
            }
        }
    }

    @Override
    public int read() throws IOException {
        // Next byte already determined.

        while (outQueue.isEmpty()) {
            readAhead();

            if (isMatchFound()) {
                for (byte a : search) {
                    inQueue.remove();
                }

                for (byte b : replacement) {
                    outQueue.offer((int) b);
                }
            } else {
                outQueue.add(inQueue.remove());
            }
        }

        return outQueue.remove();
    }

    @Override
    public int read(byte b[]) throws IOException {
        return read(b, 0, b.length);
    }

    // copied straight from InputStream inplementation, just needed to to use `read()` from this class
    @Override
    public int read(byte b[], int off, int len) throws IOException {
        if (b == null) {
            throw new NullPointerException();
        } else if (off < 0 || len < 0 || len > b.length - off) {
            throw new IndexOutOfBoundsException();
        } else if (len == 0) {
            return 0;
        }

        int c = read();
        if (c == -1) {
            return -1;
        }
        b[off] = (byte)c;

        int i = 1;
        try {
            for (; i < len ; i++) {
                c = read();
                if (c == -1) {
                    break;
                }
                b[off + i] = (byte)c;
            }
        } catch (IOException ee) {
        }
        return i;
    }
}
lhr0909
  • 11
  • 2