I am quite new to Rust and Database handling in general and so i have clearly bitten off more than I can chew with my current async project.
I am using actix
actors, actix_web
, diesel
and r2d2
for my connection pool.
In essence, the user sends a request in form of a list of n lists for n items. In a try_join_all
each item is iterated over and individually sent to an actix actor where a connection is gotten from a pool and then the database request is made.
The problem arises in a situation where there are more requested items in a list than max connections in the connection pool, the connection_pool.get()
function will return a timeout error.
An abstracted form of the main function:
#[async_trait]
impl CreateAccounts for Bank {
async fn create_accounts(
&self,
database_address: &Addr<Database>,
request: Vec<Vec<Account>>,
) -> Result<()> {
for accounts in request.iter() {
try_join_all(
accounts // connection timeout if this is longer than pool.max_size()
.iter()
.map(|account| async move {
// the actor that handles the database request
let mut account_entry = database_address
.send(CreateAccountEntry(
account,
))
.await??;
...
...
})
.collect::<Vec<_>>(),
)
.await?;
}
Ok(())
}
}
The actor in which the connection is called:
#[derive(Message)]
#[rtype(result = "Result<AccountEntry>")]
pub struct CreateAccountEntry(pub Account);
impl Handler<CreateAccountEntry> for Database {
type Result = ResponseActFuture<Self, Result<AccountEntry>>;
fn handle(&mut self, msg: CreateAccountEntry, _ctx: &mut Self::Context) -> Self::Result {
let CreateAccountEntry(new_account) = msg;
let connection_result = self.get_connection(); // connection is gotten from the pool
Box::pin(
async move {
let account = block(move || {
new_account
.create_insert()
.execute(&mut connection)?;
let account = Account::by_id(new_account.id)
.first(&mut connection)?;
Ok(account)
})
.await??;
Ok(account)
}
.into_actor(self),
)
}
}
And another file where the diesel
/ r2d2
connection pool is created:
pub struct Database {
connection_pool: Pool<ConnectionManager<MysqlConnection>>
}
impl Actor for Database {
type Context = actix::Context<Self>;
}
impl Database {
pub fn try_new(database_url: &str) -> Result<Self> {
let manager = ConnectionManager::<MysqlConnection>::new(database_url);
let connection_pool = Pool::builder()
.build(manager).unwrap();
let mut connection = connection_pool
.get().unwrap();
Ok(Self { connection_pool })
}
pub fn get_connection(&self) -> Result<PooledConnection<ConnectionManager<MysqlConnection>>> {
Ok(self.connection_pool.get()?)
}
}
My expected behavior was that once the number of max connections has been reached, it would wait for one to become available and then continue. However, the connections are seemingly never returned to the pool after the actor is done with them.
I have tried to manually drop()
the connection variable right after the database request or move the entire logic of the CreateAccountEntry out of the Box::pin(async move{...})
and only put the final result into it but none of it changed the behavior of the connection pool.
To confirm my suspicions i wrote a simple while loop for the get_connection()
that tries to get a working connection but once the pool is depleted a new connection never becomes available.
One of the workarounds would be to limit the amount of items in the accounts
list to be less than the amount of max connections but in reality this is not practical, neither is increasing max amount of connections in the pool as the users request could easily surpass that.
I assume it has something to do with the try_join_all()
but i don't quite know how to move on from here...