-2

I want to do the same thing in Go as asked in here.

I'm parsing a huge log file, and I need to parse it line by line. On each line I deserialize the line into a struct. The data may come from any data source (file, network etc). So, I receive an io.Reader in my function. Since the file is huge, I want to split it among many goroutines.

I could have done this easily using io.Pipe etc. However, I need to split the file without cutting the lines, for example, in half without cutting them in the middle. So that, each goroutine may receive an io.Reader and then they may work in different parts of the file.

Sometimes, I also need to send io.MultiReader to my function as well. In that case, I would do the same again. So, it's not necessarily the same file (but mostly it is).

func scan(r io.Reader, pf ProcessFunc) {
    // need to split `r` here if `r` is:
    // r.(io.ReadSeeker)

    // run goroutine #1 with 50% of the stream
    // uses bufio.Scanner

    // run goroutine #2 with 50% of the stream
    // uses bufio.Scanner

    // another goroutine is receiving the deserialized values
    // and sends them to the ProcessFunc for processing further
    // down the pipeline
}

Let's say the data is like this:

foo1 bar2
foo3 bar4
foo5 bar6
foo7 bar8

The goroutine #1 will get an io.Reader like so:

foo1 bar2
foo3 bar4

And the goroutine #2 will get an io.Reader like so:

foo5 bar6
foo7 bar8

But not like this:

o5 bar6    -> breaks the line in the second io.Reader
foo7 bar8
  • So, you need to split an io.Reader to many io.Readers without breaking up the lines? – Inanc Gumus Aug 10 '19 at 19:26
  • Exactly. It's because, if I assert an io.ReadSeeker, then I don't know where to jump next in the stream. I want to split the incoming Reader to many Readers without breaking up the lines. I also want to continue using the bufio.Scanner if it's possible. – John Fitzgerald Aug 10 '19 at 19:34
  • 1
    Even you manage to obtain multiple readers into the same file (which I doubt is cheaply possible), you don't want to do this on spinning rust disks (with SSD you'd probably be fine) – Sergio Tulentsev Aug 10 '19 at 19:36
  • Assuming that there is no fixed line length it is impossible to say upfront where each line starts and ends. This means one either would need to have multiple readers where each one only processes every n-th line (starting with a different offset) or have a single reader which reads every line and then hands the content over to a go routine which processes this line. The first way (multiple readers) has a larger overhead since the file will be fully read multiple times this way. A single reader which hands over the processing of a line to other go routines scales probably much better. – Steffen Ullrich Aug 10 '19 at 19:50
  • 1
    If you have a seekable source, then it's easy to split the source approximately equal size chunks: seek to some position; read data to find line break; open new reader from previous break (or start of source) and limit reader to end at break. Note that on some devices, multiple readers may not provide a benefit over a single reader. If you have a network connection or some other source that's not seekable, the solution is to read lines from one goroutine and parse those lines in worker goroutines. – Charlie Tumahai Aug 10 '19 at 19:55
  • @SergioTulentsev So, what's the better solution for parsing a huge file? – John Fitzgerald Aug 10 '19 at 19:56
  • 1
    @JohnFitzgerald Read from one goroutine, process lines in worker goroutines. – Charlie Tumahai Aug 10 '19 at 19:57
  • @SteffenUllrich If io.Reader bears an *os.File, I can assert that length, otherwise it's impossible yes. Actually, I've tried the both ways. In the single reader case, there happens to be a lot of contention because the single reader throws the lines so fast. It's only 2x faster than the sequential one when I send the messages in batches (like 1000 at a time to the other readers: keeping them in a buffer). – John Fitzgerald Aug 10 '19 at 19:58
  • @CeriseLimón Yes, that's what I have in mind and I've tried it as well, using *os.Files. It is x2-4 faster on my machine with SSD. However, there happens a lot of channel messaging contention. So, I send 1000 lines at a time to the other readers (goroutines) using a slice of deserialized values as a buffer. Is there any other way? – John Fitzgerald Aug 10 '19 at 20:00
  • @JohnFitzgerald My first comment describes a solution where you can open multiple *os.Files on the same file. The solution is to use approximately equal size chunks, not equal number of lines. – Charlie Tumahai Aug 10 '19 at 20:02
  • @CeriseLimón Yes, I understand. You jump to an approximate position then find a line end there and go further from there. I guess the sequential solution is better because I find my code become very complex due to slice buffering, channel message contention etc. – John Fitzgerald Aug 10 '19 at 20:06

1 Answers1

1

You've got a couple options:

  1. If you have seekable data, you can seek and then will have scan for the next newline so you can make sure that you only split on line breaks

  2. Pass lines into a goroutine instead of io.readers. Basically, each goroutine would have a channel and the main routing would feed each line from the io.reader into the channels.

  3. Split the file before hand with something like split

Liyan Chang
  • 7,721
  • 3
  • 39
  • 59