1

I'm attempting to make a concurrent version of grep. The program walks directories/subdirectories and returns back any matching strings to a provided pattern.

I am attempting to run the file searching concurrently, once I have all the files to search (see searchPaths function). Originally I was getting:

fatal error: all goroutines are asleep - deadlock

Until I added the close(out) at the end of searchPaths, to which it now returns:

Panic: Send on a closed channel when running go routine in foor loop

I am attempting to implement something similar to:

https://go.dev/blog/pipelines#fan-out-fan-in

Is it the case that I am closing the channel at the wrong point?

package main

import (
    "fmt"
    "io/fs"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type SearchResult struct {
    line       string
    lineNumber int
}

type Display struct {
    filePath string
    SearchResult
}

var wg sync.WaitGroup

func (d Display) PrettyPrint() {
    fmt.Printf("Line Number: %v\nFilePath: %v\nLine: %v\n\n", d.lineNumber, d.filePath, d.line)
}

func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
    if strings.Contains(line, pattern) {
        return SearchResult{lineNumber: lineNumber + 1, line: line}, true
    }
    return SearchResult{}, false
}

func splitIntoLines(file string) []string {
    lines := strings.Split(file, "\n")
    return lines
}

func fileFromPath(path string) string {
    fileContent, err := ioutil.ReadFile(path)

    if err != nil {
        log.Fatal(err)
    }

    return string(fileContent)
}

func getRecursiveFilePaths(inputDir string) []string {
    var paths []string
    err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
        if err != nil {
            fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
            return err
        }
        if !info.IsDir() {
            paths = append(paths, path)
        }
        return nil
    })
    if err != nil {
        fmt.Printf("Error walking the path %q: %v\n", inputDir, err)
    }
    return paths
}

func searchPaths(paths []string, pattern string) <-chan Display {
    out := make(chan Display)

    for _, path := range paths {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for _, display := range searchFile(path, pattern) {
                out <- display
            }
        }()
    }
    close(out)
    return out
}

func searchFile(path string, pattern string) []Display {
    var out []Display
    input := fileFromPath(path)
    lines := splitIntoLines(input)
    for index, line := range lines {
        if searchResult, ok := searchLine(pattern, line, index); ok {
            out = append(out, Display{path, searchResult})
        }
    }
    return out
}

func main() {
    pattern := os.Args[1]
    dirPath := os.Args[2]

    paths := getRecursiveFilePaths(dirPath)

    out := searchPaths(paths, pattern)
    wg.Wait()
    for d := range out {
        d.PrettyPrint()
    }

}

Sheen
  • 586
  • 10
  • 22
  • 6
    The sender entity should close the channel, exactly to avoid sending on a closed channel (which causes a runtime panic). If there are multiple senders, they must be coordinated and the channel only closed when all senders are done. Your `wg.Wait()` is "misplaced". See: [Closing channel of unknown length](https://stackoverflow.com/questions/34283255/closing-channel-of-unknown-length/34283635#34283635) – icza Nov 10 '22 at 11:45
  • 1
    `close(out); return out` is an immediate red flag: there is no point in returning a channel which was just closed and therefore cannot be used. – Adrian Nov 10 '22 at 15:00

1 Answers1

0

2 main issues with this code were

  1. you need to close the channel only after wg.Wait() completes. you can do this in a seperate goroutine as shown below
  2. as the path var in searchPaths func is reassigned multiple times as part of the for loop logic, it is not a good practice to use that var directly in the goroutines, a better approach will be to pass it as an argument.
package main

import (
    "fmt"
    "io/fs"
    "io/ioutil"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type SearchResult struct {
    line       string
    lineNumber int
}

type Display struct {
    filePath string
    SearchResult
}

var wg sync.WaitGroup

func (d Display) PrettyPrint() {
    fmt.Printf("Line Number: %v\nFilePath: %v\nLine: %v\n\n", d.lineNumber, d.filePath, d.line)
}

func searchLine(pattern string, line string, lineNumber int) (SearchResult, bool) {
    if strings.Contains(line, pattern) {
        return SearchResult{lineNumber: lineNumber + 1, line: line}, true
    }
    return SearchResult{}, false
}

func splitIntoLines(file string) []string {
    lines := strings.Split(file, "\n")
    return lines
}

func fileFromPath(path string) string {
    fileContent, err := ioutil.ReadFile(path)

    if err != nil {
        log.Fatal(err)
    }

    return string(fileContent)
}

func getRecursiveFilePaths(inputDir string) []string {
    var paths []string
    err := filepath.Walk(inputDir, func(path string, info fs.FileInfo, err error) error {
        if err != nil {
            fmt.Printf("prevent panic by handling failure accessing a path %q: %v\n", path, err)
            return err
        }
        if !info.IsDir() {
            paths = append(paths, path)
        }
        return nil
    })
    if err != nil {
        fmt.Printf("Error walking the path %q: %v\n", inputDir, err)
    }
    return paths
}

func searchPaths(paths []string, pattern string) chan Display {
    out := make(chan Display)
    for _, path := range paths {
        wg.Add(1)
        go func(p string, w *sync.WaitGroup) { // as path var is changing value in the loop, it's better to provide it as a argument in goroutine
            defer w.Done()
            for _, display := range searchFile(p, pattern) {
                out <- display
            }
        }(path, &wg)
    }
    return out
}

func searchFile(path string, pattern string) []Display {
    var out []Display
    input := fileFromPath(path)
    lines := splitIntoLines(input)
    for index, line := range lines {
        if searchResult, ok := searchLine(pattern, line, index); ok {
            out = append(out, Display{path, searchResult})
        }
    }
    return out
}

func main() {
    pattern := os.Args[1]
    dirPath := os.Args[2]

    paths := getRecursiveFilePaths(dirPath)

    out := searchPaths(paths, pattern)

    go func(){
        wg.Wait() // waiting before closing the channel
        close(out)
    }()
    
    count := 0
    for d := range out {
        fmt.Println(count)
        d.PrettyPrint()
        count += 1
    }

}
  • Thank you for this explanation! Could you expand on why the wg.Wait and close functions need to be in their own go routine as opposed to just in the main function? – Sheen Nov 11 '22 at 10:49
  • `All the channel operations are blocking in nature`, so basically if data sent to a channel is not received (in this case the range call is receiving) then the routine that is sending to that channel will block indefinitely (and vice-versa). And also range calls will not exit the loop without the channel being closed, so both of them can't reside in the same thread/routine. – Anurag Kumar Nov 11 '22 at 14:22