0

In the below snippet I tried to perform GET request using POOL of worker.

I thought it will reuse the TCP session instead of creating new one for every worker, but it creates multiple session.

Could someone please help in validating the code:

package main

import (
    "fmt"
    "net/http"
    "sync"
)

//var URL = "http://localhost:8181/api/v1/resources"
var URL = "http://httpbin.org/ip"

// Task interface
type Task interface {
    Execute(p *Pool)
}

// JobDesc struct
type JobDesc struct {
    Client *http.Client
}

// Pool struct
type Pool struct {
    mu      sync.Mutex
    size    int
    tasks   chan Task
    kill    chan struct{}
    wg      sync.WaitGroup
    Results chan interface{}
}

// Methods for Pool struct.
func (p *Pool) worker() {
    defer p.wg.Done()
    for {
        select {
        case task, ok := <-p.tasks:
            if !ok {
                return
            }
            task.Execute(p)

        case <-p.kill:
            return
        }
    }
}

// Resize methods update the number of goroutine in the pool, to execute the task.
func (p *Pool) Resize(n int) {
    p.mu.Lock()
    defer p.mu.Unlock()
    for p.size < n {
        p.size++
        p.wg.Add(1)
        go p.worker()
    }
    for p.size > n {
        p.size--
        p.kill <- struct{}{}
    }
}

// CloseJobChannel method close the jobs channel,
// once all the jobs are sending to the channel.
func (p *Pool) CloseJobChannel() {
    close(p.tasks)
}

// CloseResultChannel method close the results channel,
// once all the jobs are completed and results are collected.
func (p *Pool) CloseResultChannel() {
    close(p.Results)
}

// Wait Method of Pool struct
func (p *Pool) Wait() {
    p.wg.Wait()
}

// AddJob Method add task to the task channel
func (p *Pool) AddJob(task Task) {
    p.tasks <- task
}

// NewPool function create pool of task base on the size
// provided.
func NewPool(size int) *Pool {
    pool := &Pool{
        tasks:   make(chan Task, 10000),
        kill:    make(chan struct{}),
        Results: make(chan interface{}, 10000),
    }
    pool.Resize(size)
    return pool
}

// Execute to execute
func (job *JobDesc) Execute(p *Pool) {
    res, err := job.Client.Get(URL)
    if err != nil {
        fmt.Println(err)
    }
    //body, _ := ioutil.ReadAll(res.Body)
    fmt.Println(res.Body)
    //defer res.Body.Close()
}

func main() {
    pool := NewPool(10)
    tr := &http.Transport{DisableKeepAlives: false}
    client := &http.Client{Transport: tr}

    for i := 1; i <= 50; i++ {
        job := JobDesc{Client: client}
        pool.AddJob(&job)
    }
    pool.CloseJobChannel()
    pool.Wait()
    pool.CloseResultChannel()
    i := 1
    for k := range pool.Results {
        fmt.Printf("%d, %d", i, k)
        i++
    }
}
JimB
  • 104,193
  • 13
  • 262
  • 255
James Sapam
  • 16,036
  • 12
  • 50
  • 73
  • It looks like you're making up to 10 concurrent calls to the server. Since you can't make simultaneous requests on the same connection, it's going to make multiple connections. – JimB Dec 19 '16 at 22:22
  • oh could you please recommend any better way to GET or POST faster with multiple thread. – James Sapam Dec 19 '16 at 22:24
  • What's wrong with using multiple connections? That's the only way to send more than 1 request at a time. – JimB Dec 19 '16 at 22:24
  • as per the docs https://golang.org/pkg/net/http/, Clients and Transports are safe for concurrent use by multiple goroutines and for efficiency should only be created once and re-used. – James Sapam Dec 19 '16 at 22:24
  • When i have 100k docs to post, and it creates 100K session which is not possible. – James Sapam Dec 19 '16 at 22:25
  • btw this is working in python to reuse session. – James Sapam Dec 19 '16 at 22:28
  • If you send these serially it _will_ work exactly like it would in python. If you want concurrent connections you need to limit concurrency to a reasonable level, and be able to maintain that many connections in the Transport. Maybe this will help: https://stackoverflow.com/questions/39813587/go-client-program-generates-a-lot-a-sockets-in-time-wait-state/39834253#39834253 – JimB Dec 19 '16 at 22:31
  • Let me read, thank you so much. – James Sapam Dec 19 '16 at 22:35
  • @jimB That works after tweaking MaxIdleConnsPerHost and MaxIdleConns. Hope you may write few words in the answer, so i can mark as answer and who ever come across can get it easily. – James Sapam Dec 19 '16 at 23:04
  • @jimB it works for GET request but not with POST request. Got, panic: runtime error: invalid memory address or nil pointer dereference – James Sapam Dec 20 '16 at 00:46
  • a nil pointer deterrence is an error in your code. Please show the code and where the panic happens. – JimB Dec 20 '16 at 01:00
  • Here is the one for get request, it works intermittently https://play.golang.org/p/7oqInVO0XR and throwing this `error [Get : unsupported protocol scheme ""] panic: runtime error: invalid memory address or nil pointer dereference [signal SIGSEGV: segmentation violation code=0x1 addr=0x0 pc=0x28b7] goroutine 11 [running]:` – James Sapam Dec 20 '16 at 05:09
  • 1
    it would really help if you show where the actual panic occurred, but it's probably from your Execute method, where you continue executing after an error and `res` is probably `nil`. – JimB Dec 20 '16 at 16:01

0 Answers0