2

I'm trying to build a system using the fan-out / fan-in design to call multiple rest API.

My fan out would receive a list of users. Foreach user, execute an other time the fan-out to create multiple http get and then aggregate the data in that user.

This is my fan out method.

in <- chan struct{} is my channel with the user information

f func(struct{}) struct{} is the method that will take a user as a parameter, call an other instance of my fan out, merge the data in the user and then return the aggregated user

<- chan struct{} is the new channel with the result

func split(in <- chan struct{}, f func(struct{}) struct{}) <- chan struct{} {
    out := make(chan struct{})

    go func() {
        for data := range in {
            out <- f(data)
        }

        close(out)
    }()

    return out
}

The merge method is used to take multiple channel and merge them to a single channel

func merge(in  ...<- chan struct{}) <- chan struct{} {
    out := make(chan struct{})
    var wg sync.WaitGroup

    output := func(channel <- chan struct{}) () {
        for data := range channel {
            out <- data
        }

        wg.Done()
    }

    for _, channel := range in {
        go output(channel)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    wg.Add(len(in))

    return out
}

Process method used to create a fan-out / fan-in system

func Process(dataSlice []struct{}, f func(struct{}) struct{}) <-chan struct{} {

    in := groupData(dataSlice) // this method is simply taking a slice of struct and returning a channel of struct with the values inside

    out1 := split(in, f)
    out2 := split(in, f)
    out3 := split(in, f)

    return merge(out1, out2, out3)
}

Finally the method calling this thing out

type Request struct {
    Name string
}

type Response struct {
    Name string
    Age int
}

func main() {

    users := make([]Request, 0)

    users = append(users, Request{Name: "A"})
    users = append(users, Request{Name: "B"})
    users = append(users, Request{Name: "C"})
    users = append(users, Request{Name: "D"})

    for data := range parallel.Process(users, apply) {
        fmt.Println(data)
    }

}

func apply(request Request) Response {
    return Response{request.Name, 1}
}

So my apply method is really small here. In reality i am calling the process method again to split my rest api call in multiple channels and retrieve all of the information for that request. My response struct would depend on the api i called.

I'm getting the following errors.

  • cannot use users (type []Request) as type []struct {} in argument to parallel.Process
  • cannot use apply (type func(Request) struct {}) as type parallel.ProcessData in argument to parallel.Process
  • cannot use Response literal (type Response) as type struct {} in return argument

So basically i can't make a method that accepts any struct or some sort of interface of a request.

Thanks :)

Update:

Using interface{} and the reflect lib i managed to get this piece of code working.

func apply(request interface{}) interface{} {

    if(reflect.TypeOf(request).Name() != reflect.TypeOf(Request{}).Name()) {
        panic("invalid request type")
    }

    value := reflect.ValueOf(request)
    typedRequest := value.Interface().(Request)


    return Response{Name: typedRequest.Name, Age: rand.Intn(100)}
}

This feels wrong.

Since there is no generic type, are we supposed to re implement the same method multiple time for each type we want to use? What would be the suggested approach to solve in a proper way?

  • 3
    `struct{}` is just an empty struct. Maybe the closest thing what you want is `interface{}`. However, you cannot cast `[]Request` to `[]interface{}`. http://stackoverflow.com/questions/12753805/type-converting-slices-of-interfaces-in-go/12754757 – ymonad Feb 21 '17 at 01:51
  • However, in general, using something like `func(interface{}) interface{}` is not a good idea since you can pass anything, which means it would not be type safe. – ymonad Feb 21 '17 at 02:02
  • So how would you suggest proceeding? Create some kind of abstract structure that everything inherit from and keep some kind of type constant in it? I feel like strict typing of channels is reducing the possibilities. I come from a Scala akka actor background and i might not be using the channels the way i should be. – Elliot Laurendeau Feb 21 '17 at 03:52
  • You can define interface, and make those structures implement that interface. Using golang as dynamic type language would be a pain job. – ymonad Feb 21 '17 at 04:00
  • Your call to `wg.Add` should appear before the call to `wg.Wait`. – jub0bs Aug 07 '21 at 12:49

0 Answers0