0

Suppose that I have the following code to read lines and multiple each line by 2 and print each line out one by one.

I'd like to use N workers. Each worker takes M lines each time and processes them. More importantly, I'd like the output to be printed in the same order as the input. But the example here does not guarantee the output is printed in the same order as the input.

https://gobyexample.com/worker-pools

The following URL also shows some examples. But I don't think they fit my requirement. The problem is that the input can be arbitrarily long. There is no way to hold everything in memory before they are printed. There must be a way to get some output from the workers can determine if the output of a worker is ready to be printed and then it is print. It sounds like there should be a master goroutine to do this. But I am not sure how to implement it most efficiently, as this master gorountine can easily be a bottleneck when N is big.

How to collect values from N goroutines executed in a specific order?

Could anybody show an example program that results from the workers in order and prints the results as early as they can be printed?

$ cat main.go
#!/usr/bin/env gorun
// vim: set noexpandtab tabstop=2:

package main

import (
    "bufio"
    "fmt"
    "strconv"
    "io"
    "os"
    "log"
)

func main() {
    stdin := bufio.NewReader(os.Stdin)

    for {
        line, err := stdin.ReadString('\n')

        if err == io.EOF {
            if len(line) != 0 {
                i, _ := strconv.Atoi(line)
                fmt.Println(i*2)
            }
            break
        } else if err != nil {
            log.Fatal(err)
        }

        i, _ := strconv.Atoi(line[:(len(line)-1)])
        fmt.Println(i*2)
    }
}
user1424739
  • 11,937
  • 17
  • 63
  • 152

1 Answers1

-1

If workers know the initial order they are e.g. informed about line numbers for example, then let workers preserve that information (just the line numbers). Your workers then feed that information back to your results channel. Your results aggregating code that receives from results channel then orders results based on the initial ordering information before further processing (e.g. printing).

Below is quick modification of one of the examples you show.

package main

import "fmt"
import "time"

type Result struct {
    Data, Seq int
}

type Job struct {
    Data string
    Seq  int
}

func worker(id int, jobs <-chan Job, results chan<- Result) {
    for j := range jobs {
        fmt.Println("worker", id, "started  job", j)
        time.Sleep(time.Second)
        fmt.Println("worker", id, "finished job", j)
        results <- Result{len(j.Data), j.Seq}
    }
}

func main() {
    workload := 5

    jobs := make(chan Job, 100)
    results := make(chan Result, 100)

    output := make([]Result, workload)

    for w := 1; w <= 3; w++ {
        go worker(w, jobs, results)
    }

    for j := 0; j < workload; j++ {
        jobs <- Job{ // explicit to make it clear
            Data: fmt.Sprintf("blah blah blah %d", j),
            Seq:  j,
        }
    }
    close(jobs)

    // receive results
    for a := 1; a <= workload; a++ {
        res := <-results
        output[res.Seq] = res

        // uncomment to see unordered
        // fmt.Printf("received: %#v", res)
    }

    for _, out := range output {
        fmt.Printf("output %#v\n", out)
    }
}

BTW: this does not work well if you do not know your workload in advance... In which case your code that receives results needs to be a little smarter in processing part that is already received and ordered (homework) :). Essentially wait for line 0 then wait for next or print what is already received in sequence.

Have fun!

k1m190r
  • 1,213
  • 15
  • 26
  • I think OP stated that **There is no way to hold everything in memory before they are printed**. So this does not work for the question. – leaf bebop Feb 06 '18 at 09:29
  • I guess that depends on how workers handle what is passed in to them. I'm also assuming that `Result` is tiny subset of the processing of larger `Job`. I'm addressing the `Order` part of the question, which I though is primary. – k1m190r Feb 06 '18 at 09:35
  • The order part has been answered and the OP even put a link to the answer. The problem OP is addressing, is about having large amount of worker where an in-memory and wait-untill-all solution would not be good enough or even possible. – leaf bebop Feb 06 '18 at 09:40