1

I have this websocket code that uses tokio and serde here:

use async_once::AsyncOnce;
use common_wasm::models::status::{CommandMessage, StatusMessage};
use futures_util::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use std::{collections::VecDeque, net::SocketAddr};
use tokio::{
    net::{TcpListener, TcpStream}, sync::{broadcast, mpsc}
};
use tokio_tungstenite::{
    accept_async, tungstenite::{Error, Message, Result}
};
use tracing::*;

// https://stackoverflow.com/questions/67650879/rust-lazy-static-with-async-await
lazy_static! {
    pub static ref STATUS_REPORTER: AsyncOnce<StatusWs> = AsyncOnce::new(async {
        info!("Init lazy static WS");
        let server = StatusWs::init("ws://localhost:44444").await;
        server
    });
}

use StatusMessage as SenderType;
use CommandMessage as ReceiveType;

pub struct StatusWs {
    buf: VecDeque<ReceiveType>,
    rx_client_msg: mpsc::Receiver<ReceiveType>,
    tx_server_msg: broadcast::Sender<SenderType>,
}

impl StatusWs {
    pub async fn init(addr: &str) -> StatusWs {
        info!("Init Status WS on {}", addr);

        let listener = TcpListener::bind(&addr).await.expect("Can't listen");

        // Clients producting to server, they use the tx to send and server uses the rx to read
        let (tx_client_msg, rx_client_msg) = mpsc::channel::<ReceiveType>(32);

        // spmc for server to broadcast status to listeners. Server uses tx to send and client uses rx to read
        let (tx_server_msg, _rx_server_msg) = broadcast::channel::<SenderType>(10);

        let tx_server_2 = tx_server_msg.clone();
        tokio::spawn(async move {
            while let Ok((stream, peer)) = listener.accept().await {
                info!("Peer address connected: {}", peer);

                let tx_client = tx_client_msg.clone();
                let rx_server = tx_server_msg.subscribe();
                tokio::spawn(async move {
                    accept_connection(peer, stream, tx_client, rx_server).await;
                });
            }
        });

        StatusWs { buf: VecDeque::new(), rx_client_msg, tx_server_msg: tx_server_2 }
    }

    pub async fn reportinfo(&self, msg: &SenderType) {
        let my_msg = msg.clone();

        match &self.tx_server_msg.send(my_msg) {
            Ok(_size) => {
                //trace!("Server Sending OK {}", size)
            },
            Err(_err) => {
                //trace!("Server Sending ERR {:?}", err)
            },
        }
    }

    pub async fn next(&mut self) -> Result<Option<ReceiveType>> {
        loop {
            // If buffer contains data, we can directly return it.
            if let Some(data) = self.buf.pop_front() {
                return Ok(Some(data));
            }

            // Fetch new response if buffer is empty.
            let response = self.next_response().await?;

            // Handle the response, possibly adding to the buffer
            self.handle_response(response)?;
        }
    }

    async fn next_response(&mut self) -> Result<ReceiveType> {
        loop {
            tokio::select! { // TODO don't need select if there's only one thing?
                Some(msg) = self.rx_client_msg.recv() => {
                    return Ok(msg)
                },
            }
        }
    }

    fn handle_response(&mut self, response: ReceiveType) -> Result<()> {
        self.buf.push_back(response);
        Ok(())
    }
}

async fn accept_connection(peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, rx_server: broadcast::Receiver<SenderType>) {
    info!("Accepting connection from {}", peer);
    if let Err(e) = handle_connection(peer, stream, tx_client, rx_server).await {
        match e {
            Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => error!("Connection closed"),
            err => error!("Error processing connection: {}", err),
        }
    }
}

async fn handle_connection(
    _peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, mut rx_server: broadcast::Receiver<SenderType>,
) -> Result<()> {
    let ws_stream = accept_async(stream).await.expect("Failed to accept");
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();

    loop {
        tokio::select! {
            remote_msg = ws_receiver.next() => {
                match remote_msg {
                    Some(msg) => {
                        let msg = msg?;
                        match msg {
                            Message::Text(resptxt) => {
                                match serde_json::from_str::<ReceiveType>(&resptxt) {
                                    Ok(cmd) => { let _ = tx_client.send(cmd).await; },
                                    Err(err) => error!("Error deserializing: {}", err),
                                }
                            },
                            Message::Close(_) => break,
                            _ => { },
                        }
                    }
                    None => break,
                }
            }
            Ok(msg) = rx_server.recv() => {
                match serde_json::to_string(&msg) {
                    Ok(txt) => ws_sender.send(Message::Text(txt)).await?,
                    Err(_) => todo!(),
                }
            }
        }
    }

    Ok(())
}

The sender and receiver types are simple (simple types all the way down):

use std::{collections::BTreeMap, fmt::Debug};



use serde::{Deserialize, Serialize};

#[derive(Default, Clone, Debug, Serialize, Deserialize)]
pub struct StatusMessage {
    pub name: String,
    pub entries: BTreeMap<i32, GuiEntry>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CommandMessage {
    pub sender: String,
    pub entryid: i32,
    pub command: GuiValue,
}

Now I want to generalize the code so that I can create a struct that takes some other kind of Sender and Receiver type. Yes, I could just change the aliases, but I want to be able to use the generic type arguments rather than duplicate the whole file. The problem is as I follow the suggestions from the compiler, I end up in a place where I don't know what to do next. It's telling me resptext does not live long enough:

`resptxt` does not live long enough
borrowed value does not live long enoughrust cE0597
status_ws.rs(133, 29): `resptxt` dropped here while still borrowed
status_ws.rs(115, 28): lifetime `'a` defined here
status_ws.rs(129, 39): argument requires that `resptxt` is borrowed for `'a`

Here's what I have thus far:

use async_once::AsyncOnce;
use common_wasm::models::status::{CommandMessage, StatusMessage};
use futures_util::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use serde::{Serialize, Deserialize};
use std::{collections::VecDeque, net::SocketAddr};
use tokio::{
    net::{TcpListener, TcpStream}, sync::{broadcast, mpsc}
};
use tokio_tungstenite::{
    accept_async, tungstenite::{Error, Message, Result}
};
use tracing::*;

// https://stackoverflow.com/questions/67650879/rust-lazy-static-with-async-await
lazy_static! {
    pub static ref STATUS_REPORTER: AsyncOnce<StatusWs<CommandMessage, StatusMessage>> = AsyncOnce::new(async {
        info!("Init lazy static WS");
        let server = StatusWs::init("ws://localhost:44444").await;
        server
    });
}

// use StatusMessage as SenderType;
// use CommandMessage as ReceiveType;

pub struct StatusWs<ReceiveType, SenderType> {
    buf: VecDeque<ReceiveType>,
    rx_client_msg: mpsc::Receiver<ReceiveType>,
    tx_server_msg: broadcast::Sender<SenderType>,
}

impl <'a, ReceiveType: Deserialize<'a> + Send, SenderType: Serialize + Clone + Send + Sync> StatusWs <ReceiveType, SenderType> {
    pub async fn init(addr: &str) -> StatusWs<ReceiveType, SenderType> {
        info!("Init Status WS on {}", addr);

        let listener = TcpListener::bind(&addr).await.expect("Can't listen");

        // Clients producting to server, they use the tx to send and server uses the rx to read
        let (tx_client_msg, rx_client_msg) = mpsc::channel::<ReceiveType>(32);

        // spmc for server to broadcast status to listeners. Server uses tx to send and client uses rx to read
        let (tx_server_msg, _rx_server_msg) = broadcast::channel::<SenderType>(10);

        let tx_server_2 = tx_server_msg.clone();
        tokio::spawn(async move {
            while let Ok((stream, peer)) = listener.accept().await {
                info!("Peer address connected: {}", peer);

                let tx_client = tx_client_msg.clone();
                let rx_server = tx_server_msg.subscribe();
                tokio::spawn(async move {
                    accept_connection(peer, stream, tx_client, rx_server).await;
                });
            }
        });

        StatusWs { buf: VecDeque::new(), rx_client_msg, tx_server_msg: tx_server_2 }
    }

    pub async fn reportinfo(&self, msg: &SenderType) {
        let my_msg = msg.clone();

        match &self.tx_server_msg.send(my_msg) {
            Ok(_size) => {
                //trace!("Server Sending OK {}", size)
            },
            Err(_err) => {
                //trace!("Server Sending ERR {:?}", err)
            },
        }
    }

    pub async fn next(&mut self) -> Result<Option<ReceiveType>> {
        loop {
            // If buffer contains data, we can directly return it.
            if let Some(data) = self.buf.pop_front() {
                return Ok(Some(data));
            }

            // Fetch new response if buffer is empty.
            let response = self.next_response().await?;

            // Handle the response, possibly adding to the buffer
            self.handle_response(response)?;
        }
    }

    async fn next_response(&mut self) -> Result<ReceiveType> {
        loop {
            tokio::select! { // TODO don't need select if there's only one thing?
                Some(msg) = self.rx_client_msg.recv() => {
                    return Ok(msg)
                },
            }
        }
    }

    fn handle_response(&mut self, response: ReceiveType) -> Result<()> {
        self.buf.push_back(response);
        Ok(())
    }
}

async fn accept_connection<'a, ReceiveType: Deserialize<'a>, SenderType: Clone + Serialize>(peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, rx_server: broadcast::Receiver<SenderType>) {
    info!("Accepting connection from {}", peer);
    if let Err(e) = handle_connection(peer, stream, tx_client, rx_server).await {
        match e {
            Error::ConnectionClosed | Error::Protocol(_) | Error::Utf8 => error!("Connection closed"),
            err => error!("Error processing connection: {}", err),
        }
    }
}

async fn handle_connection<'a, ReceiveType: Deserialize<'a>, SenderType: Clone + Serialize>(
    _peer: SocketAddr, stream: TcpStream, tx_client: mpsc::Sender<ReceiveType>, mut rx_server: broadcast::Receiver<SenderType>,
) -> Result<()> {
    let ws_stream = accept_async(stream).await.expect("Failed to accept");
    let (mut ws_sender, mut ws_receiver) = ws_stream.split();

    loop {
        tokio::select! {
            remote_msg = ws_receiver.next() => {
                match remote_msg {
                    Some(msg) => {
                        let msg = msg?;
                        match msg {
                            Message::Text(resptxt) => {
                                match serde_json::from_str::<ReceiveType>(&resptxt) {
                                    Ok(cmd) => { let _ = tx_client.send(cmd).await; },
                                    Err(err) => error!("Error deserializing: {}", err),
                                }
                            },
                            Message::Close(_) => break,
                            _ => { },
                        }
                    }
                    None => break,
                }
            }
            Ok(msg) = rx_server.recv() => {
                match serde_json::to_string(&msg) {
                    Ok(txt) => ws_sender.send(Message::Text(txt)).await?,
                    Err(_) => todo!(),
                }
            }
        }
    }

    Ok(())
}

I think there's some confusion about the necessary lifetimes and bounds, in particular the lifetime on the Deserializer from Serde and the Send/Sync auto trait markers on the message types.

In any case, it seems a bit brute force to just copy the whole original file and change out the aliases, which would definitely work, when it seems there's some sort of useful lesson here.

Carlos
  • 5,991
  • 6
  • 43
  • 82
  • Please take a look at: https://stackoverflow.com/help/minimal-reproducible-example – Netwave Dec 24 '21 at 17:18
  • What's the problem? Do you think it's too long, or is it not reproducible? – Carlos Dec 24 '21 at 20:10
  • 1
    it think is too long. Usually it is better if you could add something that is reproducible in https://play.rust-lang.org/, and goes to the problem straight with as less boilerplate on it as possible. – Netwave Dec 25 '21 at 10:03
  • Ok I'll have a go at that. – Carlos Dec 25 '21 at 16:20
  • I ran into the problem that the playground doesn't seem to have tokio-tungstenite installed – Carlos Dec 25 '21 at 16:46
  • The Rust playground only has a select few crates (top 100 I believe) so you shouldn't worry too much about making it reproducible on the playground. But yeah, reducing the code so people have less to read and comprehend makes it easier to get an answer. – kmdreko Dec 31 '21 at 18:40

1 Answers1

1

You should use serde::de::DeserializeOwned instead of Deserialize<'a>.

The Deserialize trait takes a lifetime parameter to support zero-cost deserialization, but you can't take advantage of that since the source, resptxt, is a transient value that isn't persisted anywhere. The DeserializeOwned trait can be used to constrain that the deserialized type does not keep references to the source and can therefore be used beyond it.

After fixing that, you'll get errors that ReceiveType and SenderType must be 'static to be used in a tokio::spawn'd task. Adding that constraint finally makes your code compile.

See the full compiling code on the playground for brevity.

kmdreko
  • 42,554
  • 6
  • 57
  • 106
  • The compiler doesn't complain. However, I have another question. The static lifetimes, how does that work? Is this just a formality, because there's no references to those types? Meaning that if I introduced a reference, due to how the tokio async works, it would have to have static lifetime? It seems to me that actual instances of those types would not live as long as say a static string. – Carlos Jan 01 '22 at 19:59
  • 1
    The `'static` bound means the type itself shouldn't hold any local references. This is required to call `spawn` since the lifetime of the task is independent of where you spawned it. If it weren't local variables could be dropped while the task is still running. – kmdreko Jan 01 '22 at 20:03