-1

Here's an interesting situation I ran into. I need to read from a file, and populate a map based on what we found, after some data manipulation using go-routines. Here's the simplified problem statement and example:

Generate the data required by running gen_data.sh

#!/bin/bash 

rm some.dat || : 
for i in `seq 1 10000`; do 
    echo "$i `date` tx: $RANDOM rx:$RANDOM" >> some.dat
done

If I read those lines in some.dat into a map[int]string without go-routines using loadtoDict.go, it retains alignment. (as in the 1st and 2nd words are the same, see o/p below.)

In real-life I do need to process the lines (expensive) before they are loaded into the map, using go-routines speeds up my dictionary creation, and this is an important requirement for the real problem.

loadtoDict.go

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
)

var (
    fileName = "some.dat"
)

func checkerr(err error) {
    if err != nil {
        fmt.Println(err)
        log.Fatal(err)
    }
}

func main() {
    ourDict := make(map[int]string)
    f, err := os.Open(fileName)
    checkerr(err)
    defer f.Close()

    fscanner := bufio.NewScanner(f)

    indexPos := 1

    for fscanner.Scan() {
        text := fscanner.Text()
        //fmt.Println("text", text)
        ourDict[indexPos] = text
        indexPos++

    }

    for i, v := range ourDict {
        fmt.Printf("%d: %s\n", i, v)
    }

}

Running:

$ ./loadtoDict
...
8676: 8676 Mon Dec 23 15:52:24 PST 2019 tx: 17718 rx:1133
2234: 2234 Mon Dec 23 15:52:20 PST 2019 tx: 13170 rx:15962
3436: 3436 Mon Dec 23 15:52:21 PST 2019 tx: 17519 rx:5419
6177: 6177 Mon Dec 23 15:52:23 PST 2019 tx: 5731 rx:5449

notice how the 1st and 2nd words are "aligning". However, if I use go-routines to load my map, this goes awry:

async_loadtoDict.go

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "sync"
)

var (
    fileName = "some.dat"
    mu       = &sync.RWMutex{}
    MAX = 9000
)

func checkerr(err error) {
    if err != nil {
        fmt.Println(err)
        log.Fatal(err)
    }
}

func main() {
    ourDict := make(map[int]string)
    f, err := os.Open(fileName)
    checkerr(err)
    defer f.Close()

    fscanner := bufio.NewScanner(f)

    indexPos := 1
    var wg sync.WaitGroup
    sem := make(chan int, MAX)
    defer close(sem)

    for fscanner.Scan() {
        text := fscanner.Text()
        wg.Add(1)
        sem <- 1
        go func() {
            mu.Lock()
            defer mu.Unlock()
            ourDict[indexPos] = text
            indexPos++
            <- sem
            wg.Done()
        }()

    }

    wg.Wait()

    for i, v := range ourDict {
        fmt.Printf("%d: %s\n", i, v)
    }

}

output:

$ ./async_loadtoDict 
...
11: 22 Mon Dec 23 15:52:19 PST 2019 tx: 25688 rx:7602
5716: 6294 Mon Dec 23 15:52:23 PST 2019 tx: 28488 rx:3572
6133: 4303 Mon Dec 23 15:52:21 PST 2019 tx: 24286 rx:1565
7878: 9069 Mon Dec 23 15:52:25 PST 2019 tx: 16863 rx:24234
8398: 7308 Mon Dec 23 15:52:23 PST 2019 tx: 4321 rx:20642
9566: 3489 Mon Dec 23 15:52:21 PST 2019 tx: 14447 rx:12630
2085: 2372 Mon Dec 23 15:52:20 PST 2019 tx: 14375 rx:24151

This is despite guarding the ingestion ourDict[indexPos] with mutex. I'd like my map index align with the ingestion attempt.

Thanks!

struggling_learner
  • 1,214
  • 15
  • 29
  • 1
    How unnecessarily complex... The reason the index is not matching is that even though you are creating the goroutines in the same order and guarding against concurrency you have `MAX` (9000) goroutines waiting, and you don't control the order that they resume, the index is representing the order of execution, not creation – Luiz Ferraz Dec 24 '19 at 01:00
  • 2
    By the way, you code is fully sequential, just not deterministic. – Luiz Ferraz Dec 24 '19 at 01:01
  • unless i keep `MAX = 1`, I observe what I reported above - this defeats having go-routines prep and populate my `map`. I do need to process the lines before they are loaded into the map, using go-routines speeds up my dictionary creation, and this is an important requirement for the real problem. – struggling_learner Dec 24 '19 at 01:06
  • Add the access to the map in a single goroutine and the others just prep the data (i suppose there is some data manipulation, because if is just a passthrough like the example goroutines will actually make it slower with the overhead). I will an example as an answer – Luiz Ferraz Dec 24 '19 at 01:11
  • As I said in my answer, your semaphore `sem` doesn't work because you made it deeply buffered. When you set `MAX = 1` you make it one entry deep and then it works: it forces each of your spun-off goroutines to wait until the previous one finishes before it can start. – torek Dec 24 '19 at 01:55
  • the concurrent code is plain wrong, run it with the `-race` flag.... –  Dec 24 '19 at 09:40

3 Answers3

-1

Your semaphore sem is not working because you made it deeply buffered.

In general, this is the wrong way to set up a map for this kind of task, because reading the file will be the slow part. If you have a more complex task—e.g., read a line, think a lot about it, set up something—you'll want this as your pseudocode structure:

type workType struct {
    index int
    line  string
}

var wg sync.WaitGroup
wg.Add(nWorkers)
// I made this buffered originally but there's no real point, so
// fixing that in an edit
work := make(chan workType)
for i := 0; i < nWorkers; i++ {
    go readAndDoWork(work, &wg)
}

for i := 1; fscanner.Scan(); i++ {
    work <- workType{index: i, line: fscanner.Text()}
}
close(work)
wg.Wait()

... now your dictionary is ready ...

with the workers doing this:

func readAndDoWork(ch chan workType, wg *sync.WorkGroup) {
    for item := range ch {
        ... do computation ...
        insertIntoDict(item.index, result)
    }
    wg.Done()
}

with insertIntoDict grabbing the mutex (to protect the map from index to result) and writing into the dictionary. (You can just inline this if you prefer.)

The idea here is to set up some number of workers—probably based on the number of CPUs available—each of which grabs the next work item and handles it. The main goroutine just parcels out work, then closes the work channel—this will cause all the workers to see end of input—and then waits for them to signal that they're done computing.

(If you like, you can create one more goroutine that reads worker-computed results and puts them into the map. That way you don't need a mutex for the map itself.)

torek
  • 448,244
  • 59
  • 642
  • 775
  • @mh-cbon: I'm not sure myself but I think the OP didn't like the very short description of why `sem` wasn't serializing the counter. – torek Dec 24 '19 at 10:10
-1

As I mentioned in the comments, you cannot control the order the goroutines are executed, so the index should not be changed from inside of them.

Here is a example where the interaction with the map is in a single goroutine and your processing in the others:

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "sync"
)

var (
    fileName = "some.dat"
    MAX      = 9000
)

func checkerr(err error) {
    if err != nil {
        fmt.Println(err)
        log.Fatal(err)
    }
}

type result struct {
    index int
    data string
}

func main() {
    ourDict := make(map[int]string)
    f, err := os.Open(fileName)
    checkerr(err)
    defer f.Close()

    fscanner := bufio.NewScanner(f)

    var wg sync.WaitGroup
    sem := make(chan struct{}, MAX) // Use empty structs for semaphores as they have no allocation
    defer close(sem)
    out := make(chan result)
    defer close(out)
    indexPos := 1

    for fscanner.Scan() {
        text := fscanner.Text()
        wg.Add(1)
        sem <- struct{}{}

        go func(index int, data string) {
            // Defer the release of your resources, otherwise if any error occur in your goroutine
            // you'll have a deadlock
            defer func() {
                wg.Done()
                <-sem
            }()
            // Process your data
            out <- result{index, data}
        }(indexPos, text) // Pass in the data that will change on the iteration, go optimizer will move it around better

        indexPos++
    }

    // The goroutine is the only one to write to the dict, so no race condition
    go func() {
        for {
            if entry, ok := <-out; ok {
                ourDict[entry.index] = entry.data
            } else {
                return // Exit goroutine when channel closes
            }
        }
    }()

    wg.Wait()

    for i, v := range ourDict {
        fmt.Printf("%d: %s\n", i, v)
    }

}
Luiz Ferraz
  • 1,427
  • 8
  • 13
  • in the last go func, simply range over the channel, the loop will exit as soon as the channel is closed. –  Dec 24 '19 at 09:42
  • the wait conditions are incorrect. you should not be waiting over the input processing, but rather than over the output processing. `defer close(out)` should happen when `wg.Wait` releases. the range over out can be inlined into main, and if you are only printing results, there is no need in saving to memory upfront. This code is doing a lots of wg.Add/wg.Done, this is inefficient. declaring workers upfront helps in mitigating contentions. –  Dec 24 '19 at 09:47
-1

Ok I've figured this out. Giving the goroutine a value by copy to hang on to, seems to work.

Changed:

for fscanner.Scan() {
    text := fscanner.Text()
    wg.Add(1)
    sem <- 1
    go func() {
        mu.Lock()
        defer mu.Unlock()
        ourDict[indexPos] = text
        indexPos++
        <- sem
        wg.Done()
    }()

}

to

for fscanner.Scan() {
        text := fscanner.Text()
        wg.Add(1)
        sem <- 1
        go func(mypos int) {
                mu.Lock()
                defer mu.Unlock()
                ourDict[mypos] = text
                <-sem
                wg.Done()
        }(indexPos)
        indexPos++
}

Full code: https://play.golang.org/p/dkHaisPHyHz

Using a pool of workers,

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "sync"
)

const (
    MAX      = 10
    fileName = "some.dat"
)

type gunk struct {
    line string
    id   int
}

func main() {
    ourDict := make(map[int]string)
    wg := sync.WaitGroup{}
    mu := sync.RWMutex{}

    cha := make(chan gunk)

    for i := 0; i < MAX; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for {
                textin, ok := <-cha
                if !ok {
                    return
                }
                mu.Lock()
                ourDict[textin.id] = textin.line
                mu.Unlock()
            }
        }(i)
    }

    f, err := os.Open(fileName)
    checkerr(err)
    defer f.Close()
    fscanner := bufio.NewScanner(f)
    indexPos := 1

    for fscanner.Scan() {
        text := fscanner.Text()
        thisgunk := gunk{line: text, id: indexPos}
        cha <- thisgunk
        indexPos++
    }

    close(cha)
    wg.Wait()
    for i, v := range ourDict {
        fmt.Printf("%d: %s\n", i, v)
    }

}

func checkerr(err error) {
    if err != nil {
        fmt.Println(err)
        log.Fatal(err)
    }
}
struggling_learner
  • 1,214
  • 15
  • 29
  • Passing the index will help, yes, because you used to protect it with the mutex (for the duration of the use-and-increment only, meaning, other goroutines incremented it before yours) but now you have a per-goroutine index. But what do you think the variable `sem` is doing for you? – torek Dec 24 '19 at 01:55
  • sem is to limit concurrency to MAX goroutines. https://stackoverflow.com/a/25306439/9488865 – struggling_learner Dec 24 '19 at 02:07
  • Right - so when you have it set to 9000, you spin off up to 9000 parallel goroutines. That's not really productive: the number of CPUs you have available is going to limit the amount of real work you can do. When you set it to 1, you limit yourself to 1 goroutine, and then the fact that `indexPos` itself is shared between *all* the goroutines, there's only 1 gorouting using it. With the updated code, you make a copy of `indexPos` in each goroutine. – torek Dec 24 '19 at 04:03
  • 1
    Note that there is a cost (smallish) to creating a new goroutine, and a cost (smallish) to sending and receiving on a channel. It's generally better to spin up your n-cpus-available workers once, then feed each one work over a channel, than it is to spin up a one-time worker for each job, then make it speak over a semaphore channel to limit how many can actually run at a time. – torek Dec 24 '19 at 04:05
  • refer to torek answer, it will give you better results than this hard and complex code. –  Dec 24 '19 at 09:48