1

I'm struggling to express my code in a way to please the borrow-checker.

I have a function create_task which creates a future of some database operations. There's a stream of values where each element needs to be inserted to a database within a transaction. The problem is sharing the transaction between multiple closures as it has also mutably borrowed the connection object.

#![feature(conservative_impl_trait)]

extern crate futures;
extern crate rusqlite;

use futures::prelude::*;
use futures::{future, stream};
use rusqlite::Connection;

fn main() {
    let task = create_task();
    task.wait().unwrap();
}

fn create_task() -> impl Future<Item = (), Error = ()> {
    let mut conn = Connection::open("temp.db").unwrap();
    conn.execute("CREATE TABLE IF NOT EXISTS temp (val INTEGER)", &[]).unwrap();

    // tx takes a mut ref to conn!
    let tx = conn.transaction().unwrap();

    stream::iter_ok::<_, ()>(vec![1, 2, 3])
        .for_each(|val| {
            // tx borrowed here!
            tx.execute("INSERT INTO temp (val) VALUES (?1)", &[&val]).unwrap();
            future::ok(())
        })
        .map(|_| {
            // tx moved/consumed here!
            tx.commit().unwrap();
        })
}

There are multiple issues with the code:

  • conn does not live long enough. It needs to be also moved to the closures. Perhaps as an Rc<Connection> because of two closures?
  • conn can't be simply shared as an Rc because of mutability requirements. Perhaps Rc<RefCell<Connection>> is a more suitable type?
  • the borrow-checker does not know that the borrow to tx ends after the first for_each closure, therefore it cannot be moved to the second map closure. Once again, moving it as Rc<Transaction> to both closures might be reasonable?

I've been fiddling around with those ideas and know that the desired lifetimes are possible and make sense, but have not been able to express my code to the compiler in a correct way.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Jaanus Varus
  • 3,508
  • 3
  • 31
  • 49

1 Answers1

2

I believe your first problem is that you haven't fully grasped how lazy futures are. You are creating a Connection inside of create_task, taking a reference to it, putting that reference into a stream/future, then attempting to return that future. None of the closures have even executed at this point.

You cannot return a reference to a value created in a function. Don't try to store the transaction and the connection in the same struct, either.

Instead, accept a reference to a Connection and return a Future containing that lifetime.

The next problem is that the compiler doesn't know how the closures will be called or in what order. Instead of trying to close over the transaction, let it "flow" from one to the other, letting the ownership system ensure that it's always in the right place.

#![feature(conservative_impl_trait)]

extern crate futures;
extern crate rusqlite;

use futures::prelude::*;
use futures::{future, stream};
use rusqlite::Connection;

fn main() {
    let mut conn = Connection::open("temp.db").unwrap();
    conn.execute("CREATE TABLE IF NOT EXISTS temp (val INTEGER)", &[]).unwrap();

    let task = create_task(&mut conn);
    task.wait().unwrap();
}

fn create_task<'a>(conn: &'a mut rusqlite::Connection) -> impl Future<Item = (), Error = ()> + 'a {
    let tx = conn.transaction().unwrap();
    stream::iter_ok::<_, ()>(vec![1, 2, 3])
        .fold(tx, |tx, val| {
            tx.execute("INSERT INTO temp (val) VALUES (?1)", &[&val]).unwrap();
            future::ok(tx)
        })
        .map(move |tx| {
            tx.commit().unwrap();
        })
}

A giant word of warning: If execute isn't asynchronous, you really shouldn't be using it inside of a future like that. Any blocking operation will cause all of your futures to stall out. You probably should be running the synchronous workload on a separate thread / threadpool.

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Thanks! I'll dig deeper into the answer tomorrow. One note I wanted to make is that the caller (in this case the `main` function) should not be aware of the storage mechanism. Therefore, creation and dropping of connection should be driven lazily by the closure(s) within `create_task`. Ideally, the connection was opened when first value from the stream arrived. – Jaanus Varus Feb 03 '18 at 18:12