0

I wonder about MongoDB session management in Go using mgo, especially about how to correctly ensure a session is closed and how to react on write failures.

I have read the following:

Best practice to maintain a mgo session

Should I copy session for each operation in mgo?

Still, cannot apply it to my situation.

I have two goroutines which store event after event into MongoDB sharing the same *mgo.Session, both looking essiantially like the following:

func storeEvents(session *mgo.Session) {
    session_copy := session.Copy()
    // *** is it correct to defer the session close here? <-----
    defer session_copy.Close()
    col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
    for {
        event := GetEvent()
        err := col.Insert(&event)
        if err != nil {
            // *** insert FAILED - how to react properly? <-----
            session_copy = session.Copy()
            defer session_copy.Close()
        }
    }
}

col.Insert(&event) after some hours returns the error

read tcp 127.0.0.1:46954->127.0.0.1:27017: i/o timeout

and I am unsure how to react on this. After this error occurs, it occurs on all subsequent writes, hence it seems I have to create a new session. Alternatives for me seem:

1) restart the whole goroutine, i.e.

if err != nil {
    go storeEvents(session)
    return
}

2) create a new session copy

if err != nil {
    session_copy = session.Copy()
    defer session_copy.Close()
    col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
    continue
}

--> Is it correct how I use defer session_copy.Close()? (Note the above defer references the Close() function of another session. Anyway, those sessions will never be closed since the function never returns. I.e., with time, many sessions will be created and not closed.

Other options?

Community
  • 1
  • 1
alex
  • 2,252
  • 4
  • 23
  • 34
  • 1
    You should take a look here https://blog.golang.org/defer-panic-and-recover.. basically if err != nil you should panic the err, then catch it with a recover, create a copy of the session and defer that copy in the session. You're closing the connection properly already with your defer. However if your throwing an error on insert you should also investigate why instead of relying on trying to reinsert. I've done millions of inserts with mgo and I've never had an error thrown. In fact most Mongo errors I've experienced are with Mongo itself not the driver. – reticentroot Nov 20 '16 at 00:40
  • @reticentroot thx for the info on defer; regarding the reason behind insert error: do you have advice where to look? I already searched for the error message and found posts on stackoverflow, which indicate that it's because of timeout; for me timeout does not seem probable, since a) the inserts are rather small and b) once the error occurs, it re-occurs on every subsequent insert; still, I wrote code to measure the time for insert and log it in case an error is returned, but since then (yesterday), no such error re-occurred yet – alex Nov 20 '16 at 13:15
  • Check your mongod instance. It can time out if you are opening more connections then your instance can handle. Can you add the error to your post? – reticentroot Nov 20 '16 at 16:45
  • @reticentroot there were no errors in the mongodb logs; there were just many "connection accepted" followed by "end connection" log entries (with timestamps separated by merely 10 ms); no inserts were mentioned for failed inserts - normally, for inserts the log has an entry "insert . query: " – alex Nov 20 '16 at 20:25
  • actually, in my original setup where the error occured, I used the session pointer directly from two different goroutines, without creating a Copy in each goroutine; now, since I changed to the code shown above in the storeEvents() func, the error did not yet appear; I will wait for Monday and see what happens on business days with more events – alex Nov 20 '16 at 20:27

1 Answers1

0

So I don't know if this is going to help you any, but I don't have any issues with this set up.

I have a mongo package that I import from. This is a template of my mongo.go file

package mongo

import (
    "time"

    "gopkg.in/mgo.v2"
)

var (
    // MyDB ...
    MyDB DataStore
)

// create the session before main starts
func init() {
    MyDB.ConnectToDB()
}

// DataStore containing a pointer to a mgo session
type DataStore struct {
    Session *mgo.Session
}

// ConnectToTagserver is a helper method that connections to pubgears' tagserver
// database
func (ds *DataStore) ConnectToDB() {
    mongoDBDialInfo := &mgo.DialInfo{
        Addrs:    []string{"ip"},
        Timeout:  60 * time.Second,
        Database: "db",
    }
    sess, err := mgo.DialWithInfo(mongoDBDialInfo)
    if err != nil {
        panic(err)
    }
    sess.SetMode(mgo.Monotonic, true)
    MyDB.Session = sess
}

// Close is a helper method that ensures the session is properly terminated
func (ds *DataStore) Close() {
    ds.Session.Close()
}

Then in another package, for example main Updated Based on the comment below

package main

import (
    "../models/mongo"
)

func main() {
    // Grab the main session which was instantiated in the mongo package init function
    sess := mongo.MyDB.Session
    // pass that session in
    storeEvents(sess)
}

func storeEvents(session *mgo.Session) {
    session_copy := session.Copy()
    defer session_copy.Close()

    // Handle panics in a deferred fuction
    // You can turn this into a wrapper (middleware)
    // remove this this function, and just wrap your calls with it, using switch cases
    // you can handle all types of errors
    defer func(session *mgo.Session) {
        if err := recover(); err != nil {
            fmt.Printf("Mongo insert has caused a panic: %s\n", err)
            fmt.Println("Attempting to insert again")
            session_copy := session.Copy()
            defer session_copy.Close()
            col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
            event := GetEvent()
            err := col.Insert(&event)
            if err != nil {
                fmt.Println("Attempting to insert again failed")
                return
            }
            fmt.Println("Attempting to insert again succesful")
        }
    }(session)

    col := session_copy.DB("DB_NAME").C("COLLECTION_NAME")
    event := GetEvent()
    err := col.Insert(&event)
    if err != nil {
        panic(err)
    }
}

I use a similar setup on my production servers on AWS. I do over 1 million inserts an hour. Hope this helps. Another things I've done to ensure that the mongo servers can handle the connections is increate the ulimit on my production machines. It's talked about in this stack

Community
  • 1
  • 1
reticentroot
  • 3,612
  • 2
  • 22
  • 39
  • the storeEvents() is just a copy of the code I posted above - you do not panic on error and use Recover to create a new session as you suggested in your comment to my post – alex Nov 20 '16 at 20:19
  • That's because in my production code, I'm using a go server using gollira mux and custom handlers (middleware), which wrap my routes and recover the server on panics. Whereas in your code I didn't see if you were using a go server. So my recover example may not work for you. You'd have to write your own recovery methods. And yes I copied your code above so that you can see how I'd use my example with your code, which is the typical thing to do when giving someone an example. Lastly the main thing to note is the use of the struct holding my session and init function. – reticentroot Nov 20 '16 at 20:48
  • @alex also, I've updated the code above to provide an example of recoveries, it's still something you should research yourself and understand if you're going to use Go. If you're coming other languages this is the same as trying and catching exceptions. You don't want to create sessions in an infinite loop like you did above with the for {}, because you can spawn too many connections and because there is always the possibility that your break condition won't be met. If the inserts are causing timeouts, with low volume, then there is something critically wrong with the setup of mongo server. – reticentroot Nov 20 '16 at 21:54