2

I want to create a function that returns a tokio_postgres client. However, I can't find a solution to take ownership of variable (a database connection from the library tokio_postgres) in an async task (to connect to the database).

Here is my code (Playground):

use std::sync::{Arc, Mutex};
use tokio_postgres::tls::NoTlsStream;
use tokio_postgres::{Client, Connection, Error, NoTls, Socket};

#[tokio::main] // By default, tokio_postgres uses the tokio crate as its runtime.
async fn main() -> Result<(), Error> {
    let pg_client = create_pg_client("postgres://postgres:root@localhost:5432");
    // Use pg_client for the program
    Ok(())
}

//
// Creates a pg client
//
pub async fn create_pg_client(
    config: &str,
) -> Result<Arc<Mutex<(Client, Connection<Socket, NoTlsStream>)>>, Error> {
    // Connect to the database.
    let mut connect_result = Arc::new(Mutex::new(tokio_postgres::connect(config, NoTls).await?));

    let connect_result_thread = connect_result.clone();
    // The connection object performs the actual communication with the database,
    // so spawn it off to run on its own.
    tokio::spawn(async move {
        let mut result = connect_result_thread.lock().unwrap();
        if let Err(e) = (&mut result.1).await {
            eprintln!("An error occured while trying to connect to the database");
        }
    });

    Ok(connect_result)
}

My code doesn't compile:

error: future cannot be sent between threads safely
   --> src\pg_client.rs:18:5
    |
18  |     tokio::spawn(async move {
    |     ^^^^^^^^^^^^ future created by async block is not `Send`
    | 
   ::: src\github.com-1ecc6299db9ec823\tokio-1.5.0\src\task\spawn.rs:129:21
    |
129 |         T: Future + Send + 'static,
    |                     ---- required by this bound in `tokio::spawn`
    |
    = help: within `impl Future`, the trait `Send` is not implemented for `std::sync::MutexGuard<'_, (Client, tokio_postgres::Connection<Socket, NoTlsStream>)>`
note: future is not `Send` as this value is used across an await
   --> src\pg_client.rs:20:25
    |
19  |         let mut result = connect_result_thread.lock().unwrap();
    |             ---------- has type `std::sync::MutexGuard<'_, (Client, tokio_postgres::Connection<Socket, NoTlsStream>)>` which is not `Send`
20  |         if let Err(e) = (*result).1.await {
    |                         ^^^^^^^^^^^^^^^^^ await occurs here, with `mut result` maybe used later
...
23  |     });
    |     - `mut result` is later dropped here

It says that future cannot be sent between threads safely. Is it possible to achieve what I want?

Crates used :

tokio = { version = "1.5.0", features = ["full"]}
tokio-postgres = "0.7.2"
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Léo Coletta
  • 1,099
  • 2
  • 12
  • 24
  • 1
    You can't hold a lock across awaits. Try establishing the connection, before you put it inside the lock. – user1937198 May 11 '21 at 15:02
  • 1
    You can use a tokio mutex, whose guard can be sent across `await` points. For example, [this compiles](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=3c4929e382e7b7b703898cf17d274c98). But it's not really clear what the goal is here - if the connection is behind a mutex, you are effectively serializing its access from various tasks. Should it even be shared among tasks, then? – user4815162342 May 11 '21 at 16:25
  • @user4815162342 Thank you for your answer, indeed it works with the `tokio` `Mutex`. After reading your message and some reflexion, using the `Connection` was a bad idea, I only had to return the `Client`, which doesn't requires any lock. If you want to add your anwser, I can mark it as the good answer :). Thanks ! – Léo Coletta May 11 '21 at 17:23

1 Answers1

3

The standard library Mutex is designed to hold a lock within the context of a single thread. Since the task can be picked up by a different thread, values that cross await points must be Send. Additionally, even if it did work, using a blocking mutex in an async program is a bad idea because acquiring the mutex can take arbitrarily long, during which time other tasks cannot run on the same executor thread.

You can fix both issues by switching to tokio's mutex whose guard is Send and whose lock() method is async. For example, this compiles:

// Pick up an async aware Mutex
use tokio::sync::Mutex;

let connect_result = Arc::new(Mutex::new(tokio_postgres::connect(config, NoTls).await?));
let connect_result_thread = connect_result.clone();
tokio::spawn(async move {
    let mut result = connect_result_thread.lock().await;
    if let Err(e) = (&mut result.1).await {
        eprintln!("An error occured while trying to connect to the database: {}", e);
    }
});

Ok(connect_result)

Playground

What remains unclear is the ultimate goal of this design. If the connection is behind a mutex, you are effectively serializing access to it from various tasks. Such a connection probably shouldn't be shared among tasks in the first place.

user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • It's not generally a bad idea to use a blocking Mutex in an async program, this is a quote from the tokio `Mutex` docs: `If the data stored behind the mutex is just data, it is often better to use a blocking mutex such as the one in the standard library`. In this case it's a connection, so your advice is correct here. – sebpuetz May 11 '21 at 17:54
  • 1
    @sebpuetz Agreed - some mutexes are held for only a very short while (e.g. those that just guard the update to a single number). But the answer tries to provide a zeroth-approximation advice to a beginner, and as such it makes sense. You are unlikely to do _wrong_ by always using an async mutex in async code, whereas you could do wrong by always using the stdlib one. – user4815162342 May 11 '21 at 18:02