1

I want to use the MongoDB Rust Driver to bulk-insert documents into a collection. I have a bulk vector that I fill and flush when reaching a given size.

My issue is that I must .clone() the vector when giving it to driver's API bulk_write().

If I don't clone, I have a E0382: use after move error, and it seems that the API don't accept reference (E0308: expected struct std::vec::Vec, found reference).

I'm new to Rust. Is there a way to do it without cloning this big structure? (structure is not big in my sample, but is in my actual code)

There's my code:

// Cargo.toml extract:
//
// [dependencies]
// bson = "0.10"
// mongodb = "0.3.7"

#[macro_use(bson, doc)] extern crate bson;
extern crate mongodb;
use mongodb::coll::options::WriteModel;
use mongodb::{Client, ThreadedClient};
use mongodb::db::ThreadedDatabase;

fn main() {

    // Connect to MongoDB and select collection
    let client = Client::connect("localhost", 27017).ok().expect("Failed to initialize client.");
    let coll = client.db("test").collection("mycol");

    // Make the bulk vector
    let mut bulk = Vec::new();
    for i in 0..1000 {

        // Append a item to the bulk
        bulk.push(WriteModel::UpdateOne { filter: doc!{"_id": i},
                                          update: doc!{"$set" => {"hello" => "world"}},
                                          upsert: Some(true) });

        // Each 11 items, flush bulk
        if bulk.len() > 10 {
            println!("Upsert {} docs into collection...",bulk.len());

            // `bulk` have to be cloned here
            let result = coll.bulk_write(bulk.clone(), true); // Unoptimal: bulk cloned
            //let result = coll.bulk_write(bulk, true); // E0382: use after move
            //let result = coll.bulk_write(&bulk, true); // E0308: expected struct `std::vec::Vec`, found reference 

            // Check result
            match result.bulk_write_exception {
                Some(exception) => {
                    if exception.message.len()>0 {
                        println!("ERROR: {}",exception.message);
                    }
                }
                None => ()
            }
            bulk.clear();
        }
    }

    // Final flush
    if bulk.len() > 0 {
        println!("Upsert {} docs into collection...",bulk.len());
        let result = coll.bulk_write(bulk.clone(), true);
        match result.bulk_write_exception {
            Some(exception) => {
                if exception.message.len()>0 {
                    println!("ERROR: {}",exception.message);
                }
            }
            None => ()
        }
    }
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
user2655800
  • 141
  • 10

2 Answers2

2

As @kazemakase pointed out in the comments, the general solution to these kinds of problems is mem::replace(). However, in this case, the reason we need to hang onto bulk is for the final case when bulk.len() <= 10. If we restructure the original code, we can avoid the Use after move error entirely:

// Connect to MongoDB and select collection
let client = Client::connect("localhost", 27017).ok().expect("Failed to initialize client.");
let coll = client.db("test").collection("mycol");

let ranges: Vec<_> = (0..1000).into_iter().collect();
for range in ranges[..].chunks(11) {
    let bulk: Vec<_> = 
        range.map(|i| WriteModel::UpdateOne { filter: doc!{"_id": i},
                                              update: doc!{"$set" => {"hello" => "world"}},
                                              upsert: Some(true) })
        .collect();

    println!("Upsert {} docs into collection...", bulk.len());

    let result = coll.bulk_write(bulk);
    if let Some(exception) = result.bulk_write_exception {
        if exception.message.len() > 0 {
            println!("ERROR: {}", exception.message);
        }
    }
}

The main change here is to use slice::chunks() instead of manually building the chunks into bulk in the main iteration loop. The other nice thing is that this removes the duplicated error handling and is more idiomatic Rust.

Wesley Wiser
  • 9,491
  • 4
  • 50
  • 69
  • 1
    Also https://docs.rs/itertools/0.7.4/itertools/trait.Itertools.html#method.chunks – Shepmaster Jun 21 '18 at 16:48
  • Thanks! Issue of the slice::chunk() [https://doc.rust-lang.org/std/primitive.slice.html#method.chunks] solution is the first collect(), for example if the documents to insert are generated by parsing a 3Gb file, it could be memory costly to get them all before starting to insert. That's why I would prefer the itertools::chunks() proposed above by Shepmaster because it applies to an iterator instead of slices. – user2655800 Jun 23 '18 at 21:56
1

Thanks to kazemakase, I could solve my own issue by creating a new empty vector, swap both vectors and send the full one. Here's the new working code:

// Cargo.toml extract:
//
// [dependencies]
// bson = "0.10"
// mongodb = "0.3.7"

#[macro_use(bson, doc)] extern crate bson;
extern crate mongodb;
use mongodb::coll::options::WriteModel;
use mongodb::{Client, ThreadedClient};
use mongodb::db::ThreadedDatabase;
use std::mem;

fn main() {

    // Connect to MongoDB and select collection
    let client = Client::connect("localhost", 27017).ok().expect("Failed to initialize client.");
    let coll = client.db("test").collection("mycol");

    // Make the bulk vector
    let mut bulk = Vec::new();
    for i in 0..1000 {

        // Append a item to the bulk
        bulk.push(WriteModel::UpdateOne { filter: doc!{"_id": i},
                                          update: doc!{"$set" => {"hello" => "world"}},
                                          upsert: Some(true) });

        // Each 11 items, flush bulk
        if bulk.len() > 10 {
            println!("Upsert {} docs into collection...",bulk.len());

            let mut bulk2 = Vec::new(); // create new empty bulk
            mem::swap(&mut bulk, &mut bulk2); // bulk <-> bulk2
            let result = coll.bulk_write(bulk2, true);  // send full bulk

            //let result = coll.bulk_write(bulk.clone(), true); // Unoptimal: bulk cloned
            //let result = coll.bulk_write(bulk, true); // E0382: use after move
            //let result = coll.bulk_write(&bulk, true); // E0308: expected struct `std::vec::Vec`, found reference 

            // Check result
            match result.bulk_write_exception {
                Some(exception) => {
                    if exception.message.len()>0 {
                        println!("ERROR: {}",exception.message);
                    }
                }
                None => ()
            }
            //bulk.clear(); // bulk is now a new empty bulk, clear is unecessary
        } // Compiler will drop bulk2 (the full bulk) at this point
    }

    // Final flush
    if bulk.len() > 0 {
        println!("Upsert {} docs into collection...",bulk.len());
        let result = coll.bulk_write(bulk, true); // No clone nor swap needed here
        match result.bulk_write_exception {
            Some(exception) => {
                if exception.message.len()>0 {
                    println!("ERROR: {}",exception.message);
                }
            }
            None => ()
        }
    }
}
user2655800
  • 141
  • 10