I have an application (in Scala, though this question is probably driver agnostic) that collates inserts in batches of a thousand and then does a bulk insert. Because of the way the data is received and processed it's possible that we might have duplicate ids, we just want to ignore these.
According to the documentation for bulk write https://docs.mongodb.com/manual/reference/method/db.collection.bulkWrite/#bulkwrite-example-unordered-bulk-write, using the ordered: false
option should allow all inserts to run without the whole write failing because of an error.
Since this was an unordered operation, the writes remaining in the queue were processed despite the exception.
However, my error log is coming up with errors like this
I COMMAND [conn696515] command db.Matches command: insert { insert: "Matches", ordered: false, documents: 1000 } ninserted:4 keysInserted:52 exception: E11000 duplicate key error collection: db.Matches index: _id_ dup key: { : "3000758-3000343-3845342998431744-5-1646-----10-1" } code:11000 numYields:0 reslen:183245 locks:{ Global: { acquireCount: { r: 1017, w: 1017 } }, Database: { acquireCount: { w: 1017 } }, Collection: { acquireCount: { w: 1013 } }, Metadata: { acquireCount: { w: 4 } }, oplog: { acquireCount: { w: 4 } } } protocol:op_query 1003ms
Which suggests to me that only 4 were actually inserted of my 1000.
Does this mean?
- 996 of my inserts were duplicates (this is very unlikely)?
- The
ordered:false
option doesn't allow the inserts to continue? - The documentation literally means single as in one write only failing, and then on a second fail it errors out?
- I'm reading the log output wrong and I am actually getting all these writes?
- The above only works on mongos? I'm using mongod
EDIT Here's the code that runs the query (note as far as I can tell it's the driver that batches it, we send 10000s to this method)
def insertMatchesIgnoreDuplicates(
matches: HashMap[BsonString,Document],
database: MongoDatabase
) : Future[_] = {
val matchesCollection: MongoCollection[Document] =
database.getCollection("Matches")
val inserts = matches.values.map(doc => {
new InsertOneModel(doc)
})
val bulkWriteOptions = new BulkWriteOptions()
bulkWriteOptions.ordered(false)
matchesCollection.bulkWrite(inserts.toSeq, bulkWriteOptions).toFuture
}
And when catching the output here's successful
AcknowledgedBulkWriteResult{insertedCount=15107, matchedCount=0, removedCount=0, modifiedCount=0, upserts=[]}
and unsuccessful
com.mongodb.MongoBulkWriteException: Bulk write operation error on server 10.131.58.143:27017. Write errors: [BulkWriteError{index=52621, code=11000, message='E11000 duplicate key error collection: betbrain_v1.Matches index: id dup key: { : "3000177-3000012-3858875929331712-5---2.5---14-1" }', details={ }}].