0

I'm trying to create a minimal clean architecture structure with Rust. Basically a controller that receives a request, a use case that handles the logic, an entitiy that holds the data model, and a repository that actually retrieves the data. Here is a diagram of the structure I'm using: diagram.

A synchronous version of this can be see at Generics and dependency inversion with multiple structs now I'm trying to do it asynchronously with Tokio: https://crates.io/crates/tokio. For starters I wont set the repository, so will pull the data directly from the use case.

Here is a playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=ed9c8c0e12c043ad4e3e4bea956fa0cc

Here same code for reference:

#![allow(dead_code)]

use async_trait::async_trait;
use std::io::Error;
use tokio::time;

// CUSTOM RETURN TYPE - - - - - - - - - - -
type AsyncResult<T> = Result<T, Error>;

// ENTITY - - - - - - - - - - - - - - - - -
#[derive(Debug)]
struct User {
  pub id: i32,
  pub name: String,
}

// USE CASE - - - - - - - - - - - - - - - - -
#[async_trait]
trait IUserGetAllUseCase {
  fn new() -> UserGetAllUseCase;
  async fn execute(&self) -> AsyncResult<Vec<User>>;
}

struct UserGetAllUseCase;

#[async_trait]
impl IUserGetAllUseCase for UserGetAllUseCase {
  fn new() -> UserGetAllUseCase {
    UserGetAllUseCase {}
  }

  async fn execute(&self) -> AsyncResult<Vec<User>> {
    // Simulating async data retrieval from persistence
    time::sleep(std::time::Duration::from_secs(3)).await;
    let user_1 = User {
      id: 1,
      name: String::from("user_1"),
    };
    let user_2 = User {
      id: 2,
      name: String::from("user_2"),
    };
    let users = vec![user_1, user_2];

    Ok(users)
  }
}

// CONTROLLER - - - - - - - - - - - - - - - - -
struct UserGetAllController<T> {
  user_get_all_use_case: T,
}

impl<T: IUserGetAllUseCase> UserGetAllController<T> {
  fn new(user_get_all_use_case: T) -> UserGetAllController<T> {
    UserGetAllController {
      user_get_all_use_case,
    }
  }

  async fn execute(&self) -> AsyncResult<Vec<User>> {
    let users = self.user_get_all_use_case.execute().await;

    users
  }
}

// MOCK SERVER - - - - - - - - - - - - - - - - -
#[tokio::main]
async fn main() -> AsyncResult<()> {
  let user_get_all_use_case = UserGetAllUseCase::new();
  let user_get_all_controller = UserGetAllController::new(user_get_all_use_case);
  let users = user_get_all_controller.execute().await?;

  println!("{:#?}", users);

  Ok(())
}

Ok, this works, is asynchronous. Now I want to add a repository trait to be able to create any repositories I may need to retrieve data from different persistence systems: PostgreSQL, filesystem, etc. To do this I create IUserRepository, and the implement in in PostgreSQLUserRepository.

Here is a playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=8758491ba16eee3821df8d1f800c9c85. And here the same code for reference:

#![allow(dead_code)]

use async_trait::async_trait;
use std::io::Error;
use tokio::time;

// CUSTOM RETURN TYPE - - - - - - - - - - -
type AsyncResult<T> = Result<T, Error>;

// ENTITY - - - - - - - - - - - - - - - - -
#[derive(Debug)]
pub struct User {
  pub id: i32,
  pub name: String,
}

// REPOSITORY - - - - - - - - - - - - - - - - -
#[async_trait]
pub trait IUserRepository {
  async fn company_get_all(&self) -> AsyncResult<Vec<User>>;
}

pub struct UserRepository;

#[async_trait]
impl IUserRepository for UserRepository {
  async fn company_get_all(&self) -> AsyncResult<Vec<User>> {
    // Simulating async data retrieval from persistence
    time::sleep(std::time::Duration::from_secs(3)).await;
    let user_1 = User {
      id: 1,
      name: String::from("user_1"),
    };
    let user_2 = User {
      id: 2,
      name: String::from("user_2"),
    };
    let users = vec![user_1, user_2];

    Ok(users)
  }
}

// USE CASE - - - - - - - - - - - - - - - - -
#[async_trait]
trait IUserGetAllUseCase {
  fn new<T: IUserRepository>(user_repository: T) -> UserGetAllUseCase<T>;
  async fn execute(&self) -> AsyncResult<Vec<User>>;
}

struct UserGetAllUseCase<T> {
  user_repository: T,
}

#[async_trait]
impl<T: IUserRepository> IUserGetAllUseCase for UserGetAllUseCase<T> {
  fn new<K: IUserRepository>(user_repository: K) -> UserGetAllUseCase<K> {
    UserGetAllUseCase { user_repository }
  }

  async fn execute(&self) -> AsyncResult<Vec<User>> {
    let users = self.user_repository.company_get_all().await;

    users
    // Two errors here:
    // «future cannot be sent between threads safely»
    // «future created by async block is not `Send`»
  }
}

// CONTROLLER - - - - - - - - - - - - - - - - -
struct UserGetAllController<T> {
  user_get_all_use_case: T,
}

impl<T: IUserGetAllUseCase> UserGetAllController<T> {
  fn new(user_get_all_use_case: T) -> UserGetAllController<T> {
    UserGetAllController {
      user_get_all_use_case,
    }
  }

  async fn execute(&self) -> AsyncResult<Vec<User>> {
    let users = self.user_get_all_use_case.execute().await;

    users
  }
}

// MOCK SERVER - - - - - - - - - - - - - - - - -
#[tokio::main]
async fn main() -> AsyncResult<()> {
  let user_repository = UserRepository {};
  let user_get_all_use_case = UserGetAllUseCase::<UserRepository>::new(user_repository);
  let user_get_all_controller = UserGetAllController::new(user_get_all_use_case);
  let users = user_get_all_controller.execute().await?;

  println!("{:#?}", users);

  Ok(())
}

As you see, I get two errors:

«future cannot be sent between threads safely»
«future created by async block is not `Send`»

I'm not sure why this wont compile while my previous structure was similar, and it was compiled and run without issues.

Any help will be welcome ))

miravelardo
  • 75
  • 1
  • 9

1 Answers1

0

async fns create "state machines", which you can think of as similar to an enum with each variant representing the data required between every await point.

These state machines still need to obey Rust's concurrency guarantees, however, and critically, any data shared across await points must be Send, since it might be executed on a different thread by your executor.

If you run the playground link, you get the following errors:

  --> src/main.rs:61:53
   |
61 |     async fn execute(&self) -> AsyncResult<Vec<User>> {
   |  _____________________________________________________^
62 | |     let users = self.user_repository.company_get_all().await;
63 | |
64 | |     users
...  |
67 | |     // «future created by async block is not `Send`»
68 | |   }
   | |___^ future created by async block is not `Send`
   |
note: captured value is not `Send` because `&` references cannot be sent unless their referent is `Sync`
  --> src/main.rs:61:21
   |
61 |   async fn execute(&self) -> AsyncResult<Vec<User>> {
   |                     ^^^^ has type `&UserGetAllUseCase<T>` which is not `Send`, because `UserGetAllUseCase<T>` is not `Sync`
   = note: required for the cast to the object type `dyn Future<Output = Result<Vec<User>, std::io::Error>> + Send`
help: consider further restricting this bound
   |
56 | impl<T: IUserRepository + std::marker::Sync> IUserGetAllUseCase for UserGetAllUseCase<T> {
   |                         +++++++++++++++++++

error: future cannot be sent between threads safely

There's a little bit to unpack here, but the main point is on this line:

&self has type `&UserGetAllUseCase<T>` which is not `Send`, because `UserGetAllUseCase<T>` is not `Sync`

The solution is to mark your IUserRepository as Send + Sync (there's another issue that the compiler will raise which is similar but also requires the Send bound).

This is made slightly more confusing by trait objects. Generally, you don't have to worry too much about Send and Sync, because they are "auto traits", meaning they are automatically derived where possible by the compiler.

However, when you create a trait object, you erase all knowledge of the "real" type of the data other than what is specified in the trait bound.

For example, a Box<dyn Foo> isn't known to be Send even if it actually contains a (). You have to tell the compiler that the Send bound is important explicitly.

Some more reading on futures, specifically w.r.t. send-ness: https://rust-lang.github.io/async-book/07_workarounds/03_send_approximation.html

cameron1024
  • 9,083
  • 2
  • 16
  • 36
  • Thanks a lot, didn't know neither about `Send` nor `Sync`. In the docs you sent I read: «Some async fn state machines are safe to be sent across threads, while others are not». Is this issue related to the fact that I'm using Tokio runtime? Would I avoid having to add `+ Send + Sync` by using a different runtime? – miravelardo Nov 18 '21 at 11:58
  • Rust is different to many other languages in that the "executor/event loop" and "futures" are quite separated. Futures and async functions are part of the language, and are independent of the runtime, so no, using a different runtime wouldn't change this. Instead, it's probably worth accepting that you want the `Send + Sync` bounds on things. If you want, you can define a "combination trait" of sorts (to save some keystrokes), as in this question: https://stackoverflow.com/questions/26983355/is-there-a-way-to-combine-multiple-traits-in-order-to-define-a-new-trait – cameron1024 Nov 18 '21 at 12:19
  • Futures and async functions by themselves don't require to be `Send` or `Sync`. `async-trait` desugars to `Pin>`, you could define your own trait returning non-send futures instead and as long as you run them on an executor that doesn't require `Send` futures, you'll be fine. If you want any kind of concurrency from the tokio runtime, you'll need them to be `Send`, as defined by the bounds on `task::spawn`. https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=a38e6d5da0b8f6d3e40975814e8cfe28 – sebpuetz Nov 19 '21 at 17:37