-1

I'm writing a program that reads a list of order numbers in a file called orders.csv and compares it with the other csv files that are present in the folder.

The problem is that it goes into deadlock even using waitgroup and I don't know why.

For some reason stackoverflow says that my post is mostly code, so I have to add this line, because the whole code is necessary if someone wants to help me debug this problem I'm having.

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type Files struct {
    filenames []string
}

type Orders struct {
    ID []string
}

var ordersFilename string = "orders.csv"

func main() {
    var (
        ordersFile *os.File
        files       Files
        orders     Orders
        err        error
    )

    mu := new(sync.Mutex)
    wg := &sync.WaitGroup{}
    wg.Add(1)

    if ordersFile, err = os.Open(ordersFilename); err != nil {
        log.Fatalln("Could not open file: " + ordersFilename)
    }

    orders = getOrderIDs(ordersFile)

    files.filenames = getCSVsFromCurrentDir()

    var filenamesSize = len(files.filenames)
    var ch = make(chan map[string][]string, filenamesSize)
    var done = make(chan bool)

    for i, filename := range files.filenames {
        go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
            wg.Add(1)
            defer wg.Done()
            checkFile(currentFilename, orders, ch)
            mu.Lock()
            *filenamesSize--
            mu.Unlock()
            if i == *filenamesSize {
                done <- true
                close(done)
            }
        }(filename, ch, i, orders, wg, &filenamesSize, mu, done)
    }

    select {
    case str := <-ch:
        fmt.Printf("%+v\n", str)
    case <-done:
        wg.Done()
        break
    }

    wg.Wait()
    close(ch)
}

// getCSVsFromCurrentDir returns a string slice
// with the filenames of csv files inside the
// current directory that are not "orders.csv"
func getCSVsFromCurrentDir() []string {
    var filenames []string

    err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
        if path != "." && strings.HasSuffix(path, ".csv") && path != ordersFilename {
            filenames = append(filenames, path)
        }

        return nil
    })

    if err != nil {
        log.Fatalln("Could not read file names in current dir")
    }

    return filenames
}

// getOrderIDs returns an Orders struct filled
// with order IDs retrieved from the file
func getOrderIDs(file *os.File) Orders {
    var (
        orders      Orders
        err         error
        fileContent string
    )

    reader := bufio.NewReader(file)

    if fileContent, err = readLine(reader); err != nil {
        log.Fatalln("Could not read file: " + ordersFilename)
    }

    for err == nil {
        orders.ID = append(orders.ID, fileContent)
        fileContent, err = readLine(reader)
    }

    return orders
}

func checkFile(filename string, orders Orders, ch chan<- map[string][]string) {
    var (
        err           error
        file          *os.File
        fileContent   string
        orderFilesMap map[string][]string
        counter       int
    )

    orderFilesMap = make(map[string][]string)

    if file, err = os.Open(filename); err != nil {
        log.Fatalln("Could not read file: " + filename)
    }

    reader := bufio.NewReader(file)

    if fileContent, err = readLine(reader); err != nil {
        log.Fatalln("Could not read file: " + filename)
    }

    for err == nil {
        if containedInSlice(fileContent, orders.ID) && !containedInSlice(fileContent, orderFilesMap[filename]) {
            orderFilesMap[filename] = append(orderFilesMap[filename], fileContent)
            // fmt.Println("Found: ", fileContent, " in ", filename)
        } else {
            // fmt.Printf("Could not find: '%s' in '%s'\n", fileContent, filename)
        }
        counter++
        fileContent, err = readLine(reader)
    }

    ch <- orderFilesMap
}

// containedInSlice returns true or false
// based on whether the string is contained
// in the slice
func containedInSlice(str string, slice []string) bool {
    for _, ID := range slice {
        if ID == str {
            return true
        }
    }

    return false
}

// readLine returns a line from the passed reader
func readLine(r *bufio.Reader) (string, error) {
    var (
        isPrefix bool  = true
        err      error = nil
        line, ln []byte
    )
    for isPrefix && err == nil {
        line, isPrefix, err = r.ReadLine()
        ln = append(ln, line...)
    }
    return string(ln), err
}
Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
Dboi
  • 11
  • 2

3 Answers3

1
  1. The first issue is the wg.Add always must be outside of the goroutine(s) it stands for. If it isn't, the wg.Wait call might be called before the goutine(s) have actually started running (and called wg.Add) and therefore will "think" that there is nothing to wait for.

  2. The second issue with the code is that there are multiple ways it waits for the routines to be done. There is the WaitGroup and there is the done channel. Use only one of them. Which one depends also on how the results of the goroutines are used. Here we come to the next problem.

  3. The third issue is with gathering the results. Currently the code only prints / uses a single result from the goroutines. Put a for { ... } loop around the select and use return to break out of the loop if the done channel is closed. (Note that you don't need to send anything on the done channel, closing it is enough.)

Improved Version 0.0.1

So here the first version (including some other "code cleanup") with a done channel used for closing and the WaitGroup removed:

func main() {
    ordersFile, err := os.Open(ordersFilename)
    if err != nil {
        log.Fatalln("Could not open file: " + ordersFilename)
    }

    orders := getOrderIDs(ordersFile)

    files := Files{
        filenames: getCSVsFromCurrentDir(),
    }

    var (
        mu = new(sync.Mutex)
        filenamesSize = len(files.filenames)
        ch = make(chan map[string][]string, filenamesSize)
        done = make(chan bool)
    )

    for i, filename := range files.filenames {
        go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
            checkFile(currentFilename, orders, ch)
            mu.Lock()
            *filenamesSize--
            mu.Unlock()
            // TODO: This also accesses filenamesSize, so it also needs to be protected with the mutex:
            if i == *filenamesSize {
                done <- true
                close(done)
            }
        }(filename, ch, i, orders, &filenamesSize, mu, done)
    }

    // Note: closing a channel is not really needed, so you can omit this:
    defer close(ch)
    for {
        select {
        case str := <-ch:
            fmt.Printf("%+v\n", str)
        case <-done:
            return
        }
    }
}

Improved Version 0.0.2

  1. In your case we have some advantage however. We know exactly how many goroutines we started and therefore also how many results we expect. (Of course if each goroutine returns a result which currently this code does.) That gives us another option as we can collect the results with another for loop having the same amount of iterations:
func main() {
    ordersFile, err := os.Open(ordersFilename)
    if err != nil {
        log.Fatalln("Could not open file: " + ordersFilename)
    }

    orders := getOrderIDs(ordersFile)

    files := Files{
        filenames: getCSVsFromCurrentDir(),
    }

    var (
        // Note: a buffered channel helps speed things up. The size does not need to match the size of the items that will
        //   be passed through the channel. A fixed, small size is perfect here.
        ch = make(chan map[string][]string, 5)
    )

    for _, filename := range files.filenames {
        go func(filename string) {
            // orders and channel are not variables of the loop and can be used without copying
            checkFile(filename, orders, ch)
        }(filename)
    }

    for range files.filenames {
        str := <-ch
        fmt.Printf("%+v\n", str)
    }
}

A lot simpler, isn't it? Hope that helps!

Peter
  • 29,454
  • 5
  • 48
  • 60
TehSphinX
  • 6,536
  • 1
  • 24
  • 34
  • Note that Peter's answer shows you option 3 using the WaitGroup to wait for the routines to finish. That pattern is also helpful in cases where the amount of answers is not know beforehand. The channel gets closed if all routines are done, thereby finishing the for loop over the channel once all items in the channel have been consumed. – TehSphinX Dec 09 '20 at 11:29
  • 1
    A select statement with a single case is pointless. I took the liberty to remove it from the last version. – Peter Dec 09 '20 at 12:29
1

There is a lot wrong with this code.

  1. You're using the WaitGroup wrong. Add has to be called in the main goroutine, else there is a chance that Wait is called before all Add calls complete.
  2. There's an extraneous Add(1) call right after initializing the WaitGroup that isn't matched by a Done() call, so Wait will never return (assuming the point above is fixed).
  3. You're using both a WaitGroup and a done channel to signal completion. This is redundant at best.
  4. You're reading filenamesSize while not holding the lock (in the if i == *filenamesSize statement). This is a race condition.
  5. The i == *filenamesSize condition makes no sense in the first place. Goroutines execute in an arbitrary order, so you can't be sure that the goroutine with i == 0 is the last one to decrement filenamesSize

This can all be simplified by getting rid of most if the synchronization primitives and simply closing the ch channel when all goroutines are done:

func main() { 
    ch := make(chan map[string][]string)
    var wg WaitGroup

    for _, filename := range getCSVsFromCurrentDir() { 
        filename := filename // capture loop var
        wg.Add(1)
        go func() { 
            checkFile(filename, orders, ch)
            wg.Done()
        }()
    } 

    go func() { 
        wg.Wait() // after all goroutines are done...
        close(ch) // let range loop below exit
    }()

    for str := range ch { 
        // ...
    } 
}
Peter
  • 29,454
  • 5
  • 48
  • 60
  • Thank you very much for the answer, I'm accepting the other one just because it seems more complete to me, but yours also was very useful – Dboi Dec 09 '20 at 11:24
0

not an answer, but some comments that does not fit the comment box.

In this part of the code

func main() {
    var (
        ordersFile *os.File
        files       Files
        orders     Orders
        err        error
    )

    mu := new(sync.Mutex)
    wg := &sync.WaitGroup{}
    wg.Add(1)

The last statement is a call to wg.Add that appears dangling. By that i mean we can hardly understand what will trigger the required wg.Done counter part. This is a mistake to call for wg.Add without a wg.Done, this is prone to errors to not write them in such way we can not immediately find them in pair.

In that part of the code, it is clearly wrong

    go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
        wg.Add(1)
        defer wg.Done()

Consider that by the time the routine is executed, and that you added 1 to the waitgroup, the parent routine continues to execute. See this example: https://play.golang.org/p/N9Chaqkv4bd The main routine does not wait for the waitgroup because it does not have time to increment.

There is more to say but i find it hard to understand the purpose of your code so i am not sure how to help you further without basically rewrite it.