I am using Quinn and qp2p to build a struct that looks like this:
pub struct Broker {
main_endpoint: Endpoint,
main_incoming: IncomingConnections,Connection, Bytes)>)>,
}
impl Broker {
pub async fn new(
config: Config
) -> Result<Self, EndpointError> {
let (main_endpoint, main_incoming, _) = Endpoint::new_peer(
local_addr(),
&[],
config,
)
.await?;
let mut broker = Self {
main_endpoint,
main_incoming,
};
Ok(broker)
}
async fn on_message(&mut self, src: SocketAddr, msg: Bytes) {
println!("Received from {:?} --> {:?}", src, msg);
}
async fn on_connection(&mut self) -> Result<(), RecvError> {
// loop over incoming connections
while let Some((connection, mut incoming_messages)) = self.main_incoming.next().await {
let src = connection.remote_address();
// loop over incoming messages
while let Some(bytes) = incoming_messages.next().await? {
self.on_message(src, bytes);
}
}
Ok(())
}
}
I am then trying to write some tests for this that look like this:
#[cfg(test)]
mod tests {
use super::Broker;
use qp2p::{Endpoint, Config, RetryConfig, ConfigError, utils::local_addr};
use std::{time::Duration, net::{Ipv4Addr, SocketAddr}};
use color_eyre::eyre::Result;
use futures::future;
use bytes::Bytes;
#[tokio::test(flavor = "multi_thread")]
async fn basic_usage() -> Result<()> {
const MSG_HELLO: &str = "HELLO";
let config = Config {
idle_timeout: Duration::from_secs(60 * 60).into(), // 1 hour idle timeout.
..Default::default()
};
let mut broker = Broker::new(config.clone(), None).await?;
let (worker, _, _) = Endpoint::new_peer(
local_addr(),
&[],
Config {
retry_config: RetryConfig {
retrying_max_elapsed_time: Duration::from_millis(500),
..RetryConfig::default()
},
keep_alive_interval: Some(Duration::from_secs(5)),
..Config::default()
},
).await?;
tokio::spawn(broker.on_connection());
worker.connect_to(&broker.main_endpoint.local_addr()).await.map(drop)?;
Ok(())
}
}
I assume because on_connection
lives on a different thread broker
might get destroyed while it is running and thus I get this error.
error[E0597]: `broker` does not live long enough
--> src/broker.rs:143:22
|
143 | tokio::spawn(broker.on_connection());
| ^^^^^^^^^^^^^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `broker` is borrowed for `'static`
...
147 | }
| - `broker` dropped here while still borrowed
Is there a different architecture for my Broker that is better suited to Rust? If not how can I satisfy Rust and have Broker listen for messages in a new thread?