2

I am processing a bunch of files and then dumping the results in PostgreSQL. I would like to process many workers at the same time but keep getting the error "pq: sorry, too many clients already". This seems to happen whenever workers is > 100 or so. (For simplicity, the code below demonstrates the process but instead of processing a file I am simply inserting 1M rows in each table).

Since I am reusing the same *db why am I getting this error? Does each transaction count as a client or am I doing something wrong?

package main

import (
    "database/sql"
    "flag"
    "fmt"
    "log"
    "sync"

    "github.com/lib/pq"
)

func process(db *sql.DB, table string) error {
    if _, err := db.Exec(fmt.Sprintf(`DROP TABLE IF EXISTS %v;`, table)); err != nil {
        return err
    }

    col := "age"
    s := fmt.Sprintf(`
        CREATE TABLE %v (
            pk serial PRIMARY KEY,
            %v int NOT NULL
    )`, table, col)

    _, err := db.Exec(s)
    if err != nil {
        return err
    }

    tx, err := db.Begin()
    if err != nil {
        return err
    }

    defer func() {
        if err != nil {
            tx.Rollback()
            return
        }
        err = tx.Commit()
    }()

    stmt, err := tx.Prepare(pq.CopyIn(table, col))
    if err != nil {
        return err
    }

    defer func() {
        err = stmt.Close()
    }()

    for i := 0; i < 1e6; i++ {
        if _, err = stmt.Exec(i); err != nil {
            return err
        }
    }

    return err

}

func main() {
    var u string
    flag.StringVar(&u, "user", "", "user")

    var pass string
    flag.StringVar(&pass, "pass", "", "pass")

    var host string
    flag.StringVar(&host, "host", "", "host")

    var database string
    flag.StringVar(&database, "database", "", "database")

    var workers int
    flag.IntVar(&workers, "workers", 10, "workers")

    flag.Parse()

    db, err := sql.Open("postgres",
        fmt.Sprintf(
            "user=%s password=%s host=%s database=%s sslmode=require",
            u, pass, host, database,
        ),
    )

    if err != nil {
        log.Fatalln(err)
    }

    defer db.Close()
    db.SetMaxIdleConns(0)

    var wg sync.WaitGroup
    ch := make(chan int)

    for i := 0; i < workers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for i := range ch {
                table := fmt.Sprintf("_table%d", i)
                log.Println(table)
                if err := process(db, table); err != nil {
                    log.Fatalln(err)
                }
            }
        }()
    }

    for i := 0; i < 300; i++ {
        ch <- i
    }

    close(ch)
    wg.Wait()
}

I realize I can simply increase the posgresql settings but would like to understand the question: How to increase the max connections in postgres?

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
  • 5
    In "plain" Postgres a transaction most definitely does not count as a "client connection". I can run millions of transactions with a single connection to the database. Your program apparently does not (physically) close connections to the database properly. –  Oct 23 '17 at 20:35
  • 1
    FYI, You're deferred assignments to `err` aren't doing anything. You can only assign to named return values: https://play.golang.org/p/FKF2Z2KyzF – JimB Oct 23 '17 at 20:46
  • @JimB I did not know that. Thanks! –  Oct 23 '17 at 20:58
  • 2
    You should parameterize your SQL queries. – squiguy Oct 23 '17 at 21:27
  • 1
    @squiguy I get 'pq: syntax error at or near "$1"'...seems you can't pass table name or column as params??? –  Oct 23 '17 at 23:29

1 Answers1

4

Since I am reusing the same *db why am I getting this error?

I suspect the Postgress driver is using a separate connections for each of your workers which is a smart decision for most cases.

Does each transaction count as a client or am I doing something wrong?

In your case yes each transaction count as a client, because you are calling process() as a goroutine. You are creating as many concurrent transactions as workers. Since each of your transactions is long all of them are probably using an individual connection to the database at the same time and hence you hit a limit.

go func() {
        defer wg.Done()
        for i := range ch {
            table := fmt.Sprintf("_table%d", i)
            log.Println(table)
            if err := process(db, table); err != nil {
                log.Fatalln(err)
            }
        }
    }()
Hector Correa
  • 26,290
  • 8
  • 57
  • 73
  • why is it smart in my case to make separate connections but ok to have just 1 in the example @a_horse_with_no_name gives? –  Oct 23 '17 at 21:07
  • 1
    by "smart decision for most cases" I mean that database drivers do their own database connection pooling (since making a brand new database connection is relatively a slow process) and their logic is usually pretty sound. – Hector Correa Oct 23 '17 at 21:16
  • In this case the driver is probably creating a brand new connection to the DB because each worker starts a new transaction (i.e. you are asking the database to keep each work in isolation.) – Hector Correa Oct 23 '17 at 21:18
  • 1
    @kristen, may I recommend reading this (free to get) e-book? [The Ultimate Guide to Building Database-Driven Apps with Go](https://www.vividcortex.com/resources/the-ultimate-guide-to-building-database-driven-apps-with-go) — it deals with such pooling issues quite nicely. – kostix Oct 24 '17 at 09:36