1

I'm currently staring at a beefed up version of the following code:

func embarrassing(data []string) []string {
  resultChan := make(chan string)
  var waitGroup sync.WaitGroup
  for _, item := range data {
    waitGroup.Add(1)
    go func(item string) {
      defer waitGroup.Done()
      resultChan <- doWork(item)
    }(item)
  }

  go func() {
    waitGroup.Wait()
    close(resultChan)
  }()

  var results []string
  for result := range resultChan {
    results = append(results, result)
  }
  return results
}

This is just blowing my mind. All this is doing can be expressed in other languages as

results = parallelMap(data, doWork)

Even if it can't be done quite this easily in Go, isn't there still a better way than the above?

icza
  • 389,944
  • 63
  • 907
  • 827
Sebastian Oberhoff
  • 1,271
  • 1
  • 10
  • 16
  • 1
    Whether your parallel map is actually faster or not than a sequential one depends on a lot of factors. Unbound parallelism typically is not a good idea. – Volker Sep 28 '18 at 08:25

2 Answers2

10

If you need all the results, you don't need the channel (and the extra goroutine to close it) to communicate the results, you can write directly into the results slice:

func cleaner(data []string) []string {
    results := make([]string, len(data))

    wg := &sync.WaitGroup{}
    wg.Add(len(data))
    for i, item := range data {
        go func(i int, item string) {
            defer wg.Done()
            results[i] = doWork(item)
        }(i, item)
    }
    wg.Wait()

    return results
}

This is possible because slice elements act as distinct variables, and thus can be written individually without synchronization. For details, see Can I concurrently write different slice elements. You also get the results in the same order as your input for free.

Anoter variation: if doWork() would not return the result but get the address where the result should be "placed", and additionally the sync.WaitGroup to signal completion, that doWork() function could be executed "directly" as a new goroutine.

We can create a reusable wrapper for doWork():

func doWork2(item string, result *string, wg *sync.WaitGroup) {
    defer wg.Done()
    *result = doWork(item)
}

If you have the processing logic in such format, this is how it can be executed concurrently:

func cleanest(data []string) []string {
    results := make([]string, len(data))

    wg := &sync.WaitGroup{}
    wg.Add(len(data))
    for i, item := range data {
        go doWork2(item, &results[i], wg)
    }
    wg.Wait()

    return results
}

Yet another variation could be to pass a channel to doWork() on which it is supposed to deliver the result. This solution doesn't even require a sync.Waitgroup, as we know how many elements we want to receive from the channel:

func cleanest2(data []string) []string {
    ch := make(chan string)
    for _, item := range data {
        go doWork3(item, ch)
    }

    results := make([]string, len(data))
    for i := range results {
        results[i] = <-ch
    }
    return results
}

func doWork3(item string, res chan<- string) {
    res <- "done:" + item
}

"Weakness" of this last solution is that it may collect the result "out-of-order" (which may or may not be a problem). This approach can be improved to retain order by letting doWork() receive and return the index of the item. For details and examples, see How to collect values from N goroutines executed in a specific order?

icza
  • 389,944
  • 63
  • 907
  • 827
0

You can also use reflection to achieve something similar.

In this example it distribute the handler function over 4 goroutines and returns the results in a new instance of the given source slice type.

package main

import (
    "fmt"
    "reflect"
    "strings"
    "sync"
)

func parralelMap(some interface{}, handle interface{}) interface{} {
    rSlice := reflect.ValueOf(some)
    rFn := reflect.ValueOf(handle)
    dChan := make(chan reflect.Value, 4)
    rChan := make(chan []reflect.Value, 4)
    var waitGroup sync.WaitGroup
    for i := 0; i < 4; i++ {
        waitGroup.Add(1)
        go func() {
            defer waitGroup.Done()
            for v := range dChan {
                rChan <- rFn.Call([]reflect.Value{v})
            }
        }()
    }
    nSlice := reflect.MakeSlice(rSlice.Type(), rSlice.Len(), rSlice.Cap())
    for i := 0; i < rSlice.Len(); i++ {
        dChan <- rSlice.Index(i)
    }
    close(dChan)
    go func() {
        waitGroup.Wait()
        close(rChan)
    }()
    i := 0
    for v := range rChan {
        nSlice.Index(i).Set(v[0])
        i++
    }
    return nSlice.Interface()
}

func main() {
    fmt.Println(
        parralelMap([]string{"what", "ever"}, strings.ToUpper),
    )
}

Test here https://play.golang.org/p/iUPHqswx8iS