5

I'm having trouble understanding how to write concurrent async code encapsulated in one single structure.

I'm not sure how to explain the problem exactly, so i'll try to do it with an example.

Let's say I have a UdpServer struct. This struct has multiple methods related to its behavior (e.g, handle_datagram, deserialize_datagram, etc)
If I want to make the code concurrent I will spawn tokio task, which requires closure provided to it to be static, which means that I can't call &self from within this task as long as &self is not static, which means that i can't call self.serialize_datagram().

I understand the problem (there is no guarantee the struct will outlive the thread), but can't see a proper way of solving it. I know it's possible to just move the function out of impl, but this doesn't look like a good solution to me.
Also, even if we assume for a moment that i could take &self as static, this code still doesn't look right to me for some reason (Not Rusty enough, i guess).
Another "solution" is to take self: Arc<Self> instead of &self, but this feels even worse.

So I'm assuming there is some pattern I'm not aware of. Can someone explain to me how should i refactor the whole thing?

Example code:

struct UdpServer {}
impl UdpServer {
    pub async fn run(&self) {
        let socket = UdpSocket::bind(self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            // I spawn tokio task to enable concurrency
            tokio::spawn(async move {
                // But i can't use &self in here because it's not static.
                let datagram = self.deserialize_datagram(buf).await;
                self.handle_datagram(()).await;
            });
        }
    }

    pub async fn deserialize_datagram(&self, buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, datagram: Datagram) {
        unimplemented!()
    }
}
Bonanov
  • 55
  • 1
  • 5
  • "Another 'solution' is to take `self: Arc` instead of &self, but this feels even worse." Why does that feel worse to you? It seems like a perfectly good solution to me. – Aplet123 Nov 13 '21 at 15:18
  • @Aplet123 i don't remember seeing much of it in other codebases, so i assumed it's not optimal. – Bonanov Nov 13 '21 at 15:23
  • Also gotta mention that this solution will require placing a lot of `self.clone()` around the code before moving the variable into a closure. – Bonanov Nov 13 '21 at 22:41
  • 1
    If you don't mind an external crate (and a macro), the [`closure`](https://docs.rs/closure/0.3.0/closure/) crate is designed to make it easier to clone variables sent to closures. – user4815162342 Nov 14 '21 at 13:53

2 Answers2

5

Currently the only way to do it is to make self last arbitrarily long through the use of Arc. Since run() is a method on UdpServer, it requires the change to Arc<Self>, which you considered but rejected because it felt worse. Still, that's the way to do it:

pub async fn run(self: Arc<Self>) {
    let socket = UdpSocket::bind(&self.addr).await.unwrap();
    loop {
        let mut buf: &mut [u8] = &mut [];
        let (_, _) = socket.recv_from(&mut buf).await.unwrap();

        tokio::spawn({
            let me = Arc::clone(&self);
            async move {
                let datagram = me.deserialize_datagram(buf).await;
                me.handle_datagram(datagram).await;
            }
        });
    }
}

Playground

Interestingly, the smol async runtime might actually provide what you're looking for, because its executor carries a lifetime. That lifetime is associated with values from the caller's environment, and the futures spawned on the executor may refer to it. For example, this compiles:

use futures_lite::future;
use smol::{Executor, net::UdpSocket};

struct Datagram;

struct UdpServer {
    addr: String,
}

impl UdpServer {
    pub async fn run<'a>(&'a self, ex: &Executor<'a>) {
        let socket = UdpSocket::bind(&self.addr).await.unwrap();
        loop {
            let mut buf: &mut [u8] = &mut [];
            let (_, _) = socket.recv_from(&mut buf).await.unwrap();

            ex.spawn({
                async move {
                    let datagram = self.deserialize_datagram(buf).await;
                    self.handle_datagram(datagram).await;
                }
            }).detach();
        }
    }

    pub async fn deserialize_datagram(&self, _buf: &mut [u8]) -> Datagram {
        unimplemented!()
    }

    pub async fn handle_datagram(&self, _datagram: Datagram) {
        unimplemented!()
    }
}

fn main() {
    let server = UdpServer { addr: "127.0.0.1:8080".to_string() };
    let ex = Executor::new();
    future::block_on(server.run(&ex));
}
user4815162342
  • 141,790
  • 18
  • 296
  • 355
1

You're absolutely right. At tokio tutorial this solution is mentioned:

If a single piece of data must be accessible from more than one task concurrently, then it must be shared using synchronization primitives such as Arc.

  • This does not provide an answer to the question. Once you have sufficient [reputation](https://stackoverflow.com/help/whats-reputation) you will be able to [comment on any post](https://stackoverflow.com/help/privileges/comment); instead, [provide answers that don't require clarification from the asker](https://meta.stackexchange.com/questions/214173/why-do-i-need-50-reputation-to-comment-what-can-i-do-instead). - [From Review](/review/late-answers/31265054) – Tugrul Ates Mar 15 '22 at 01:53