60

How do I batch sql statements with Go's database/sql package?

In Java I would do it like this :

// Create a prepared statement
String sql = "INSERT INTO my_table VALUES(?)";
PreparedStatement pstmt = connection.prepareStatement(sql);

// Insert 10 rows of data
for (int i=0; i<10; i++) {
    pstmt.setString(1, ""+i);
    pstmt.addBatch();
}

// Execute the batch
int [] updateCounts = pstmt.executeBatch();

How would I achieve the same in Go?

Jonathan Hall
  • 75,165
  • 16
  • 143
  • 189
barnardh
  • 736
  • 1
  • 6
  • 8

13 Answers13

94

Since the db.Exec function is variadic, one option (that actually does only make a single network roundtrip) is to construct the statement yourself and explode the arguments and pass them in.

Sample code:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, "(?, ?, ?)")
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", 
                        strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}

In a simple test I ran, this solution is about 4 times faster at inserting 10,000 rows than the Begin, Prepare, Commit presented in the other answer - though the actual improvement will depend a lot on your individual setup, network latencies, etc.

A.Villegas
  • 462
  • 7
  • 18
Andrew C
  • 3,560
  • 1
  • 23
  • 24
  • hmm, that's an interesting approach. I kinda like it, would you be be able to post your test for this to show the benchmarking you did? I'm interested in this sort of solution as a way of reducing the number of transactions that are started. – nathj07 Sep 15 '14 at 16:36
  • 1
    To be fair, you should use a prepared statement and a transaction then try it again, instead of begin/prepare/commit each time. That would be a more accurate comparison. – TrippyD Apr 23 '15 at 21:52
  • Worth noting that this is X number of separate queries - it isn't a transaction so any (or multiple) of these repeats could fail while the rest go through. It also doesn't protect against SQL injections. – Xeoncross Mar 01 '17 at 21:38
  • 5
    @Xeoncross the way I wrote it here, you're correct that they're different transactions. But I believe you can just as easily do `db.Begin(); ; db.Commit()` to run it all as one transaction. Also, this *is* safe against SQL injection because it uses the `?` placeholders. – Andrew C Mar 02 '17 at 22:58
  • 5
    I used this answer to build something and discovered that, as of this writing, there appears to be a limit of 2^16-1 (65,535) for placeholders in MySQL. I ended up running multiple inserts inside a loop (roughly 10,000 rows inserted at a time) just to be on the safe side. – j boschiero Sep 01 '17 at 15:56
  • I did the same as @jboschiero . The only issue I'm having now is, if the batch is 100 lines (arbitrary num), and line 50 is a duplicate, I need to know that line 50 failed, but still insert lines 1-49 without rolling it back, which happens with this method. – Nathan Hyland Feb 01 '18 at 18:41
  • 7
    Just incase someone comes across this answer and doesnt realise (like me) the syntax `(?, ?, ?)` is `MySql` specfic and needs to be changed to `($1, $2, $3), ... ($n, $n+1 $n+2)` for `Postgresql`. – amwill04 Sep 03 '19 at 10:59
  • @AndrewC i have tried same in mysql but the statement is coming as INSERT INTO (a,b,c) VALUES (?,?,?),(?,?,?) its not replacing the ? any suggestions ? – user3067170 Sep 29 '20 at 03:20
  • Is there a way to support nullable fields in this approach? i.e one of the values (or both) post.Column2, post.Column3 can be nil – Avishay28 Apr 26 '22 at 19:33
20

If you’re using PostgreSQL then pq supports bulk imports.

Avi Flax
  • 50,872
  • 9
  • 47
  • 64
16

Adapting Andrew's solution for PostgreSQL, which doesn't support the ? placeholder, the following works:

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("($%d, $%d, $%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    stmt := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))
    _, err := db.Exec(stmt, valueArgs...)
    return err
}
MasterCarl
  • 438
  • 3
  • 9
  • 1
    Nice snippet ! Isn't this open to SQL injection ? – Jona Rodrigues Jan 24 '18 at 17:59
  • 4
    I don't see the risk, the string interpolation only uses the incrementing `i` parameter (valueStrings will be `($1, $2, $3),($4, $5, $6),...`). The `ExampleRowStruct`s are passed only to db.Exec, where the database driver takes care of substituting the placeholders. – MasterCarl Feb 02 '18 at 17:50
7

Expanding on Avi Flax's answer, I needed an ON CONFLICT DO UPDATE clause in my INSERT.

The solution to this is to COPY to a temporary table (set to delete at the end of the transaction) then INSERT from the temporary table to the permanent table.

Here's the code I settled on:

func (fdata *FDataStore) saveToDBBulk(items map[fdataKey][]byte) (err error) {
    tx, err := fdata.db.Begin()
    if err != nil {
        return errors.Wrap(err, "begin transaction")
    }
    txOK := false
    defer func() {
        if !txOK {
            tx.Rollback()
        }
    }()

    // The ON COMMIT DROP clause at the end makes sure that the table
    // is cleaned up at the end of the transaction.
    // While the "for{..} state machine" goroutine in charge of delayed
    // saving ensures this function is not running twice at any given time.
    _, err = tx.Exec(sqlFDataMakeTempTable)
    // CREATE TEMPORARY TABLE fstore_data_load
    // (map text NOT NULL, key text NOT NULL, data json)
    // ON COMMIT DROP
    if err != nil {
        return errors.Wrap(err, "create temporary table")
    }

    stmt, err := tx.Prepare(pq.CopyIn(_sqlFDataTempTableName, "map", "key", "data"))
    for key, val := range items {
        _, err = stmt.Exec(string(key.Map), string(key.Key), string(val))
        if err != nil {
            return errors.Wrap(err, "loading COPY data")
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return errors.Wrap(err, "flush COPY data")
    }
    err = stmt.Close()
    if err != nil {
        return errors.Wrap(err, "close COPY stmt")
    }

    _, err = tx.Exec(sqlFDataSetFromTemp)
    // INSERT INTO fstore_data (map, key, data)
    // SELECT map, key, data FROM fstore_data_load
    // ON CONFLICT DO UPDATE SET data = EXCLUDED.data
    if err != nil {
        return errors.Wrap(err, "move from temporary to real table")
    }

    err = tx.Commit()
    if err != nil {
        return errors.Wrap(err, "commit transaction")
    }
    txOK = true
    return nil
}
Riking
  • 2,389
  • 1
  • 24
  • 36
3

Here is a take on @Debasish Mitra's solution if you are using Postgres.

Functioning example: https://play.golang.org/p/dFFD2MrEy3J

Alternate example: https://play.golang.org/p/vUtW0K4jVMd

data := []Person{{"John", "Doe", 27}, {"Leeroy", "Jenkins", 19}}

vals := []interface{}{}
for _, row := range data {
    vals = append(vals, row.FirstName, row.LastName, row.Age)
}

sqlStr := `INSERT INTO test(column1, column2, column3) VALUES %s`
sqlStr = ReplaceSQL(sqlStr, "(?, ?, ?)", len(data))

//Prepare and execute the statement
stmt, _ := db.Prepare(sqlStr)
res, _ := stmt.Exec(vals...)

func ReplaceSQL

func ReplaceSQL(stmt, pattern string, len int) string {
    pattern += ","
    stmt = fmt.Sprintf(stmt, strings.Repeat(pattern, len))
    n := 0
    for strings.IndexByte(stmt, '?') != -1 {
        n++
        param := "$" + strconv.Itoa(n)
        stmt = strings.Replace(stmt, "?", param, 1)
    }
    return strings.TrimSuffix(stmt, ",")
}
Matt Wright
  • 102
  • 1
  • 4
  • using sqlx you can rebind all the placeholders for a given dialect. https://github.com/jmoiron/sqlx/blob/master/bind.go#L44 i guess it is battle tested. –  Mar 14 '19 at 12:28
3

In case anyone is using pgx (the supposed best Postgres driver in Golang), see this solution: https://github.com/jackc/pgx/issues/764#issuecomment-685249471

James
  • 1,394
  • 2
  • 21
  • 31
2

Take the idea of Andrew C and adapt it for a need in my work using sql scalar variables. It works perfectly for that specific requirement in my work. Maybe it is useful to someone because it is useful to simulate batch transactions of sql in golang. That's the idea.

func BulkInsert(unsavedRows []*ExampleRowStruct) error {
    valueStrings := make([]string, 0, len(unsavedRows))
    valueArgs := make([]interface{}, 0, len(unsavedRows) * 3)
    i := 0
    for _, post := range unsavedRows {
        valueStrings = append(valueStrings, fmt.Sprintf("(@p%d, @p%d, @p%d)", i*3+1, i*3+2, i*3+3))
        valueArgs = append(valueArgs, post.Column1)
        valueArgs = append(valueArgs, post.Column2)
        valueArgs = append(valueArgs, post.Column3)
        i++
    }
    sqlQuery := fmt.Sprintf("INSERT INTO my_sample_table (column1, column2, column3) VALUES %s", strings.Join(valueStrings, ","))

    var params []interface{}

    for i := 0; i < len(valueArgs); i++ {
        var param sql.NamedArg
        param.Name = fmt.Sprintf("p%v", i+1)
        param.Value = valueArgs[i]
        params = append(params, param)
    }

    _, err := db.Exec(sqlQuery, params...)
    return err
}
Stephen Rauch
  • 47,830
  • 31
  • 106
  • 135
2

For Postgres lib pq supports bulk inserts: https://godoc.org/github.com/lib/pq#hdr-Bulk_imports

But same can be achieved through below code but where it is really helpful is when one tries to perform bulk conditional update (change the query accordingly).

For performing similar bulk inserts for Postgres, you can use the following function.

// ReplaceSQL replaces the instance occurrence of any string pattern with an increasing $n based sequence
func ReplaceSQL(old, searchPattern string) string {
   tmpCount := strings.Count(old, searchPattern)
   for m := 1; m <= tmpCount; m++ {
      old = strings.Replace(old, searchPattern, "$"+strconv.Itoa(m), 1)
   }
   return old
}

So above sample becomes

sqlStr := "INSERT INTO test(n1, n2, n3) VALUES "
vals := []interface{}{}

for _, row := range data {
   sqlStr += "(?, ?, ?)," // Put "?" symbol equal to number of columns
   vals = append(vals, row["v1"], row["v2"], row["v3"]) // Put row["v{n}"] blocks equal to number of columns
}

//trim the last ,
sqlStr = strings.TrimSuffix(sqlStr, ",")

//Replacing ? with $n for postgres
sqlStr = ReplaceSQL(sqlStr, "?")

//prepare the statement
stmt, _ := db.Prepare(sqlStr)

//format all vals at once
res, _ := stmt.Exec(vals...)
Debasish Mitra
  • 1,394
  • 1
  • 14
  • 17
2

I got pq.CopyIn working, and it's actually 2.4x faster than the string values/args approach (which was super helpful and an elegant solution, btw, so thank you!)

I inserted 10 million test values of int, varchar into a struct, and loaded it with the following function. I'm kinda new to GoLang, so bear with me ...

func copyData(client *client.DbClient, dataModels []*dataModel) error{
    db := *client.DB
    txn, err := db.Begin()
    if err != nil {
        return err
    }
    defer txn.Commit()

    stmt, err := txn.Prepare(pq.CopyIn("_temp", "a", "b"))
    if err != nil {
        return(err)
    }

    for _, model := range dataModels{
        _, err := stmt.Exec(model.a, model.b)
        if err != nil {
            txn.Rollback()
            return err
        }
    }

    _, err = stmt.Exec()
    if err != nil {
        return err
    }

    err = stmt.Close()
    if err != nil {
        return err
    }

    return nil
    }

`

Elapsed (stringValues/args): 1m30.60s.

Elapsed (copyIn): 37.57s.

Jeremy Giaco
  • 342
  • 3
  • 5
1

Batching is not possible via the interfaces available in database/sql. A particular database driver may support it separately, however. For instance https://github.com/ziutek/mymysql appears to support batching with MySQL.

Matt
  • 1,424
  • 10
  • 15
  • Just learned https://github.com/go-sql-driver/mysql has an option called multiStatements that works pretty nicely – lucapette Jan 01 '17 at 22:52
1

One more good library to look at with chain syntax is go-pg

https://github.com/go-pg/pg/wiki/Writing-Queries#insert

Insert multiple books with single query:

err := db.Model(book1, book2).Insert()
gandharv garg
  • 1,781
  • 12
  • 17
0

Here's a more generic version to generate the query & value args based on answers from @andrew-c and @mastercarl:

// bulk/insert.go

import (
    "strconv"
    "strings"
)

type ValueExtractor = func(int) []interface{}

func Generate(tableName string, columns []string, numRows int, postgres bool, valueExtractor ValueExtractor) (string, []interface{}) {
    numCols := len(columns)
    var queryBuilder strings.Builder
    queryBuilder.WriteString("INSERT INTO ")
    queryBuilder.WriteString(tableName)
    queryBuilder.WriteString("(")
    for i, column := range columns {
        queryBuilder.WriteString("\"")
        queryBuilder.WriteString(column)
        queryBuilder.WriteString("\"")
        if i < numCols-1 {
            queryBuilder.WriteString(",")
        }
    }
    queryBuilder.WriteString(") VALUES ")
    var valueArgs []interface{}
    valueArgs = make([]interface{}, 0, numRows*numCols)
    for rowIndex := 0; rowIndex < numRows; rowIndex++ {
        queryBuilder.WriteString("(")
        for colIndex := 0; colIndex < numCols; colIndex++ {
            if postgres {
                queryBuilder.WriteString("$")
                queryBuilder.WriteString(strconv.Itoa(rowIndex*numCols + colIndex + 1))
            } else {
                queryBuilder.WriteString("?")
            }
            if colIndex < numCols-1 {
                queryBuilder.WriteString(",")
            }
        }
        queryBuilder.WriteString(")")
        if rowIndex < numRows-1 {
            queryBuilder.WriteString(",")
        }
        valueArgs = append(valueArgs, valueExtractor(rowIndex)...)
    }
    return queryBuilder.String(), valueArgs
}

// bulk/insert_test.go

import (
    "fmt"
    "strconv"
)

func valueExtractor(index int) []interface{} {
    return []interface{}{
        "trx-" + strconv.Itoa(index),
        "name-" + strconv.Itoa(index),
        index,
    }
}

func ExampleGeneratePostgres() {
    query, valueArgs := Generate("tbl_persons", []string{"transaction_id", "name", "age"}, 3, true, valueExtractor)
    fmt.Println(query)
    fmt.Println(valueArgs)
    // Output:
    // INSERT INTO tbl_persons("transaction_id","name","age") VALUES ($1,$2,$3),($4,$5,$6),($7,$8,$9)
    // [[trx-0 name-0 0] [trx-1 name-1 1] [trx-2 name-2 2]]
}

func ExampleGenerateOthers() {
    query, valueArgs := Generate("tbl_persons", []string{"transaction_id", "name", "age"}, 3, false, valueExtractor)
    fmt.Println(query)
    fmt.Println(valueArgs)
    // Output:
    // INSERT INTO tbl_persons("transaction_id","name","age") VALUES (?,?,?),(?,?,?),(?,?,?)
    // [[trx-0 name-0 0] [trx-1 name-1 1] [trx-2 name-2 2]]
}
James
  • 2,756
  • 17
  • 19
0

If anyone is using Postgres and building strings for this, I highly recommend that you try and use the UNNEST function instead... which expands an array into a set of rows:

INSERT INTO "your_table" ("your_column") SELECT UNNEST($1::int2[])

Replace int2 with the type of your array, passed in as a parameter.

Ben Guild
  • 4,881
  • 7
  • 34
  • 60