7

I'm trying to use Go to insert a row of data into a Postgres table for each new message received from rabbitmq, using a single connection to the DB which is opened in the init function of the code below.

Rather than opening just one connection, the code opens 497 and maxes out which causes the row inserts to stop...

I have tried using the info in these questions opening and closing DB connection in Go app and open database connection inside a function which say I should open one connection and use global db to allow the main function to pass the sql statement to the connection opened in the init function.

I thought I had done this, however a new connection is being opened for each new row so the code stops working once the postgres connection limit is reached...

I am new to Go and have limited programming experience, I have been trying to understand/resolve this issue for the past two days and I could really do with some help understanding where I am going wrong with this...

var db *sql.DB

func init() {
    var err error
    db, err = sql.Open ( "postgres", "postgres://postgres:postgres@SERVER/PORT/DB")
    if err != nil {
        log.Fatal("Invalid DB config:", err)
    }
    if err = db.Ping(); err != nil {
        log.Fatal("DB unreachable:", err)
    }
}

func main() {

// RABBITMQ CONNECTION CODE IS HERE

// EACH MESSAGE RECEIVED IS SPLIT TO LEGEND, STATUS, TIMESTAMP VARIABLES

// VARIABLES ARE PASSED TO sqlSatement    

        sqlStatement := `
        INSERT INTO heartbeat ("Legend", "Status", "TimeStamp")
        VALUES ($1, $2, $3)
`
        // sqlStatement IS THEN PASSED TO db.QueryRow

        db.QueryRow(sqlStatement, Legend, Status, TimeStamp)
    }
}()

<-forever
}

Full code is shown below:

package main

import (
    "database/sql"
    "log"
    _ "github.com/lib/pq"

    "github.com/streadway/amqp"
    "strings"
)
var db *sql.DB

func failOnError(err error, msg string) {
    if err != nil {
        log.Fatalf("%s: %s", msg, err)
    }
}

func init() {
    var err error
    db, err = sql.Open ( "postgres", "postgres://postgres:postgres@192.168.1.69:5432/test?sslmode=disable")
    if err != nil {
        log.Fatal("Invalid DB config:", err)
    }
    if err = db.Ping(); err != nil {
        log.Fatal("DB unreachable:", err)
    }
}

func main() {
    conn, err := amqp.Dial("amqp://Admin:Admin@192.168.1.69:50003/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "HEARTBEAT", // name
        false,       // durable
        false,       // delete when unused
        false,       // exclusive
        false,       // no-wait
        nil,         // arguments
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {

        for d := range msgs {
            myString := string(d.Body[:])
            result := strings.Split(myString, ",")
            Legend := result[0]
            Status := result[1]
            TimeStamp := result[2]

            sqlStatement := `
    INSERT INTO heartbeat ("Legend", "Status", "TimeStamp")
    VALUES ($1, $2, $3)
    `
            //
            db.QueryRow(sqlStatement, Legend, Status, TimeStamp)
        }
    }()

    <-forever
}
kostix
  • 51,517
  • 14
  • 93
  • 176
Mark Smith
  • 757
  • 2
  • 16
  • 27
  • 1
    Provided code should not create a connection per query. But you may create a statement just once. – zerkms Dec 08 '17 at 00:58
  • @zerkms Its late in the night here and I dont quite understand what you have said but I will check again in the morning once Ive had some sleep as I my brain is fried - do you mean that I need to move the statement to the init function??? forgive my newness and thanks for replying... – Mark Smith Dec 08 '17 at 01:06
  • You don't *need* to do that, it's just an improvement. My main point though is that I don't believe the excerpt of the code you provided may cause the symptoms you're observing. – zerkms Dec 08 '17 at 01:08
  • @zerkms, okay thanks, this is why I am pulling my hair out, Im trying to understand where I am going wrong, as the code always stops inserting after 497 rows and if I check the number of connections to postgres using pgadmin i see that there are 497! – Mark Smith Dec 08 '17 at 01:13
  • @MarkSmith Show more code. The problem is in the code the elided code. – Charlie Tumahai Dec 08 '17 at 02:22
  • @CeriseLimón - Ive updated the question to show the full code that I am using - thanks in advance for any help you can give with this! – Mark Smith Dec 08 '17 at 02:27

1 Answers1

23

First off, *sql.DB is not a connection but a pool of connections, it will open as many connection as it needs to and as many as the postgres server allows. It only opens new connections when there is no idle one in the pool ready for use.


So the issue is that the connections that DB opens aren't being released, why? Because you're using QueryRow without calling Scan on the returned *Row value.

Under the hood *Row holds a *Rows instance which has access to its own connection and that connection is released automatically when Scan is called. If Scan is not called then the connection is not released which causes the DB pool to open a new connection on the next call to QueryRow. So since you're not releasing any connections DB keeps opening new ones until it hits the limit specified by the postgres settings and then the next call to QueryRow hangs because it waits for a connection to become idle.

So you either need to use Exec if you don't care about the output, or you need to call Scan on the returned *Row.

mkopriva
  • 35,176
  • 4
  • 57
  • 71