1

I am using Quinn and qp2p to build a struct that looks like this:

pub struct Broker {
    main_endpoint: Endpoint,
    main_incoming: IncomingConnections,Connection, Bytes)>)>,
}

impl Broker {
    pub async fn new(
        config: Config
    ) -> Result<Self, EndpointError> {
        let (main_endpoint, main_incoming, _) = Endpoint::new_peer(
            local_addr(),
            &[],
            config,
        )
            .await?;

        let mut broker = Self {
            main_endpoint,
            main_incoming,
        };

        Ok(broker)
    }

    async fn on_message(&mut self, src: SocketAddr, msg: Bytes) {
        println!("Received from {:?} --> {:?}", src, msg);
    }

    async fn on_connection(&mut self) -> Result<(), RecvError> {
        // loop over incoming connections
        while let Some((connection, mut incoming_messages)) = self.main_incoming.next().await {
            let src = connection.remote_address();
            // loop over incoming messages
            while let Some(bytes) = incoming_messages.next().await? {
                self.on_message(src, bytes);
            }
        }
        Ok(())
    }
}

I am then trying to write some tests for this that look like this:

#[cfg(test)]
mod tests {
    use super::Broker;
    use qp2p::{Endpoint, Config, RetryConfig, ConfigError, utils::local_addr};
    use std::{time::Duration, net::{Ipv4Addr, SocketAddr}};
    use color_eyre::eyre::Result;
    use futures::future;
    use bytes::Bytes;

    #[tokio::test(flavor = "multi_thread")]
    async fn basic_usage() -> Result<()> {
        const MSG_HELLO: &str = "HELLO";

        let config = Config {
            idle_timeout: Duration::from_secs(60 * 60).into(), // 1 hour idle timeout.
            ..Default::default()
        };

        let mut broker = Broker::new(config.clone(), None).await?;

        let (worker, _, _) = Endpoint::new_peer(
            local_addr(),
            &[],
            Config {
                retry_config: RetryConfig {
                    retrying_max_elapsed_time: Duration::from_millis(500),
                    ..RetryConfig::default()
                },
                keep_alive_interval: Some(Duration::from_secs(5)),
                ..Config::default()
            },
        ).await?;

        tokio::spawn(broker.on_connection());
        worker.connect_to(&broker.main_endpoint.local_addr()).await.map(drop)?;

        Ok(())
    }
}

I assume because on_connection lives on a different thread broker might get destroyed while it is running and thus I get this error.

error[E0597]: `broker` does not live long enough
   --> src/broker.rs:143:22
    |
143 |         tokio::spawn(broker.on_connection());
    |                      ^^^^^^^^^^^^^^^^^^^^^^
    |                      |
    |                      borrowed value does not live long enough
    |                      argument requires that `broker` is borrowed for `'static`
...
147 |     }
    |     - `broker` dropped here while still borrowed

Is there a different architecture for my Broker that is better suited to Rust? If not how can I satisfy Rust and have Broker listen for messages in a new thread?

dearn44
  • 3,198
  • 4
  • 30
  • 63
  • Does this answer your question? [Spawn non-static future with Tokio](https://stackoverflow.com/questions/65269738/spawn-non-static-future-with-tokio) – Chayim Friedman Apr 07 '22 at 02:26
  • @ChayimFriedman, I had seen the use of `Arc` before but I'm afraid it won't do for me. Because I think in order to use `on_message`, `broker` has to be mutable. But and `Arc` object cannot be mutable, correct? – dearn44 Apr 07 '22 at 03:08
  • @dearn44 An `Arc>>` is a shared, thread-safe, mutable `T`. – cdhowie Apr 07 '22 at 04:01
  • 1
    You're trying to have a mutable and an immutable reference to the same struct, which can't be done. Without seeing more of your code (e.g. the complete `Broker` struct), we can't tell you what else to do. Btw, [`Endpoint`](https://docs.rs/quinn/latest/quinn/struct.Endpoint.html): "May be cloned to obtain another handle to the same endpoint." - it probably contains an `Arc<…>` or something similar internally. – Caesar Apr 07 '22 at 04:07
  • 2
    @cdhowie The `RefCell` is a mistake, right? – Chayim Friedman Apr 07 '22 at 04:34
  • @ChayimFriedman Probably. – cdhowie Apr 07 '22 at 04:37
  • I added a more complete example of what I have, hopefully it makes more sense now. – dearn44 Apr 07 '22 at 13:08

0 Answers0