3

I have the following problem:

I have written a simple unzipper, which unzippes .gz files which are, per line (/n), processed by another process. So, in shell, I can type:

unzipper file | program

Both programs are C/C++ coded.

Does anybody know if and how I can implement this 'pipe' (|) within one C/C++ program, so that I can make something like this multi-threaded for instance...

In my particular case, it is important to keep the new-line structure intact, that's why I'm using a pipe. The gz files are too big to keep in memory as a whole.

Niels
  • 537
  • 5
  • 22
  • you don't need pipes to process input line by line. just read it line by line. that said, you might simplify the control structure by using threads or coroutines (e.g. IIRC that was done for the Algol compiler in the 1960's). – Cheers and hth. - Alf Jan 18 '14 at 16:50
  • The answer depends on your operating system. I would suggest adding a "linux" or "osx" or "windows" tag (or "solaris" or maybe "posix" or whatever), depending on what you mean. – Nemo Jan 18 '14 at 16:50
  • So basically you want to combine the 2 programs into 1? You use the `pipe` system call to create the pipe. One thread unzips the file and writes the pipe; the other thread reads the pipe and does whatever it does. Whether you gain anything by this is an open question. – Duck Jan 18 '14 at 16:53
  • The problem is that unzipping can only be done per set number of bytes, not per line. The unzipped lines are not the same in length as well, so I need the unzipped new-line characters... It's done in Linux, but I think the C/C++ code should work in other OS's as well.. – Niels Jan 18 '14 at 17:00
  • Moreover, the lines are to be processed in groups of 4... – Niels Jan 18 '14 at 17:03
  • @Niels, I guess I don't see the problem. You have to buffer in the 2nd program/thread until you arrive at the condition that allows you to process a full unit. This must effectively be what you are doing now but the shell is redirecting the pipe to your stdin. – Duck Jan 18 '14 at 17:13
  • yes, the problem is, now even more precise, is that I can indeed wait until a complete block is available, but what do I do at the end of an unzipped block, which not necessarily ends at the end of a usable block with four lines – Niels Jan 18 '14 at 17:21
  • You mean any excess characters beyond the 4 lines? Simplest method, you process your 4 lines and move the remainder up to the beginning of the buffer(s) and continue. – Duck Jan 18 '14 at 17:25
  • yeah that was indeed one earlier suggested solution. so then I guess I have to use memcpy and physically point to each new-line and strcat the rest or wait for new unzipped block and strcat that to the last readable block – Niels Jan 18 '14 at 17:28
  • There are a dozen variations on this depending on how sophisticated you need to be but that's the bottom line. – Duck Jan 18 '14 at 17:58

1 Answers1

1

In programming in general, there is something called generators; in C++ we tend to think of them as input iterators however the concern remains identical: much like a pipe, it's about pull-driven production.

As such, you can restructure your program around the idea of a Producer (preferably with the interface of an input iterator) and a Consumer, and the Consumer will ask for input one line at the time, that the Producer will lazily come up with.

For a good guide on the necessary interface I recommend the venerable SGI STL website: here it is for the InputIterator concept.

For a simpler example, let's suppose we don't have to deal with unzipping and just read a file on a line-per-line basis:

class LineIterator: public std::iterator<std::input_iterator_tag,
                                         std::string const>
{
public:
    // Default Constructible
    LineIterator(): stream(nullptr) {}

    explicit LineIterator(std::istream& is): stream(&is) { this->advance(); }

    // Equality Comparable
    friend bool operator==(LineIterator const& left, LineIterator const& right) {
        return left.stream == right.stream
           and left.buffer == right.buffer
           and left.currentLine == right.currentLine;
    }

    friend bool operator!=(LineIterator const& left, LineIterator const& right) {
        return not (left == right);
    }

    // Trivial Iterator (non mutable)
    pointer operator->() const { return &currentLine; }

    reference operator*() const { return currentLine; }

    // Input Iterator
    LineIterator& operator++() {
        this->advance();
        return *this;
    } // operator++

    LineIterator operator++(int) {
        LineIterator tmp(*this);
        ++*this;
        return tmp;
    } // operator++

private:
    void advance() {
        // Advance a valid iterator to fetch the next line from the source stream.
        static LineIterator const SingularValue;

        assert(*this != SingularValue and "Cannot advance singular iterator");
        // Note: in real life, I would use std::getline...
        // ... but it would not showcase the double-buffering model
        // required to solve the OP problem (because of decoding)

        // We use double-buffering, so clear current and swap buffers
        currentLine.clear();
        swap(buffer, currentLine);

        // Check if we found some new line or not
        size_t const nl = currentLine.find('\n');

        // If we found one already, preserve what's after in the buffer
        // as we only want to expose one line worth of material.
        if (nl != std::string::npos) {
            if (nl == currentLine.size()) { return; } // nothing to preserve

            buffer.assign(currentLine.begin() + nl + 1, currentLine.end());
            currentLine.erase(currentLine.begin() + nl + 1, currentLine.end());
            return;
        }

        // If we did not, then we need to pump more data into the buffer.
        if (not stream) { return; } // Nothing to pump...

        static size_t const ReadBufferSize = 256;
        char input[ReadBufferSize];

        while (stream->read(input, ReadBufferSize)) {
            if (this->splitBuffer(input, ReadBufferSize)) { break; }
        }

        // We end up here either if we found a new line or if some read failed.
        // If the stream is still good, we successfully found a new line!
        if (*stream) { return; }

        // Otherwise, the stream is no good any longer (it dried up!)
        // but we may still have read some little things from it.
        this->splitBuffer(input, stream->gcount());

        stream = SingularValue.stream; // stream dried up,
                                       // so reset it to match singular value.
    } // advance

    bool splitBuffer(char const* input, size_t const size) {
        // Split input at the newline character, the first chunk ends
        // up in currentLine, the second chunk in buffer.
        // Returns true if a newline character was found, false otherwise.

        // Check if we finally found a new line
        char const* const newLine = std::find(input, input + size, '\n');

        // If we did not, copy everything into currentLine and signal it.
        if (newLine == input + size) {
            currentLine.append(input, size);
            return false;
        }

        // If we did, copy everything up to it (including it) into currentLine
        // and then bufferize the rest for the next iteration.
        currentLine.append(input, newLine + 1);
        buffer.assign(newLine + 1, input + size);
        return true;
    } // splitBuffer

    std::istream* stream;
    std::string buffer;

    std::string currentLine;
}; // class LineIterator

It's a bit of a mouthful (and is probably buggy...) still, it has the interface we need to compose it with STL algorithms, such as:

std::ifstream file("someFile.txt");
std::copy(LineIterator(file), LineIterator(), std::ostream_iterator(std::cout));

which will echo the file on the terminal one line at a time (demo here).

Now, all you have to do is replace the fetch portion (stream.read) to a block by block read & unzip :)

Matthieu M.
  • 287,565
  • 48
  • 449
  • 722
  • Ok, thanks a lot for the respons. I do have a few questions as I'm not that an advanced C programmer. Could anyone explain the following constructions: – Niels Jan 19 '14 at 10:36
  • class LineIterator: public std::iteratorexplicit LineIterator(std::istream& is): stream(&is) { this->advance(); } – Niels Jan 19 '14 at 10:37
  • I don't understand the : constructions, whats happening there. and then, what is friend? – Niels Jan 19 '14 at 10:38
  • and what does pointer operator->() and reference operator *() do? – Niels Jan 19 '14 at 10:39
  • @Niels: Hum, I am not too sure I understood all the questions... let me try. 1. `std::iterator` is a simple class with no method/state that just declares a bunch of inner types (such as `reference`) that are expected from an iterator; 2. In `Foo(int i): attribute(i) {}` the part between `:` and `{` is the *initializer list* which is used to initialize the class attributes before the beginning of the constructor body; 3. `friend` is a way to declare that a class or function is allowed to access the `protected` and `private` parts of your current class, which is normally not the case... – Matthieu M. Jan 19 '14 at 12:05
  • @Niels: ... 4. `operator->()` is what is called when you write `someclass->` and `operator*()` (no argument) is what is called when you write `*someclass`, they are used to present an interface similar to that of a **pointer**. I advise you take a peek at [The Definitive C++ Book Guide and List](http://stackoverflow.com/questions/388242/the-definitive-c-book-guide-and-list) for a way of improving your knowledge about C++ because those are very basic questions and you are likely to come upon those concepts over and over. – Matthieu M. Jan 19 '14 at 12:08