7

I want to create a simple websocket server. I want to process the incoming messages and send a response, but I get an error:

error: captured variable cannot escape `FnMut` closure body
  --> src\main.rs:32:27
   |
32 |       incoming.for_each(|m| async {
   |  _________________________-_^
   | |                         |
   | |                         inferred to be a `FnMut` closure
33 | |         match m {
34 | |             // Error here...
35 | |             Ok(message) => do_something(message, db, &mut outgoing).await,
36 | |             Err(e) => panic!(e)
37 | |         }
38 | |     }).await;
   | |_____^ returns a reference to a captured variable which escapes the closure body
   |
   = note: `FnMut` closures only have access to their captured variables while they are executing...
   = note: ...therefore, they cannot allow references to captured variables to escape

This gives a few hits on Stack Overflow but I don't see anywhere in my code where a variable is escaping. The async block won't run concurrently, so I don't see any problem. Furthermore, I feel like I am doing something very simple: I get a type which allows me to send data back to the client, but when using a reference to it in the async block, it gives a compile error. The error only occurs when I use the outgoing or db variable in the async code.

This is my code (error is in the handle_connection function):

main.rs

use tokio::net::{TcpListener, TcpStream};
use std::net::SocketAddr;
use std::sync::Arc;
use futures::{StreamExt, SinkExt};
use tungstenite::Message;
use tokio_tungstenite::WebSocketStream;

struct DatabaseConnection;

#[tokio::main]
async fn main() -> Result<(), ()> {
    listen("127.0.0.1:3012", Arc::new(DatabaseConnection)).await
}

async fn listen(address: &str, db: Arc<DatabaseConnection>) -> Result<(), ()> {
    let try_socket = TcpListener::bind(address).await;
    let mut listener = try_socket.expect("Failed to bind on address");

    while let Ok((stream, addr)) = listener.accept().await {
        tokio::spawn(handle_connection(stream, addr, db.clone()));
    }

    Ok(())
}

async fn handle_connection(raw_stream: TcpStream, addr: SocketAddr, db: Arc<DatabaseConnection>) {
    let db = &*db;
    let ws_stream = tokio_tungstenite::accept_async(raw_stream).await.unwrap();

    let (mut outgoing, incoming) = ws_stream.split();

    // Adding 'move' does also not work
    incoming.for_each(|m| async {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e)
        }
    }).await;
}

async fn do_something(message: Message, db: &DatabaseConnection, outgoing: &mut futures_util::stream::SplitSink<WebSocketStream<TcpStream>, Message>) {
    // Do something...

    // Send some message
    let _ = outgoing.send(Message::Text("yay".to_string())).await;
}

Cargo.toml

[dependencies]
futures = "0.3.*"
futures-channel = "0.3.*"
futures-util = "0.3.*"
tokio = { version = "0.2.*", features = [ "full" ] }
tokio-tungstenite = "0.10.*"
tungstenite = "0.10.*"

When using async move, I get the following error:

code

incoming.for_each(|m| async move {
    let x = &mut outgoing;
    let b = db;
}).await;

error

error[E0507]: cannot move out of `outgoing`, a captured variable in an `FnMut` closure
  --> src\main.rs:33:38
   |
31 |       let (mut outgoing, incoming) = ws_stream.split();
   |            ------------ captured outer variable
32 | 
33 |       incoming.for_each(|m| async move {
   |  ______________________________________^
34 | |         let x = &mut outgoing;
   | |                      --------
   | |                      |
   | |                      move occurs because `outgoing` has type `futures_util::stream::stream::split::SplitSink<tokio_tungstenite::WebSocketStream<tokio::net::tcp::stream::TcpStream>, tungstenite::protocol::message::Message>`, which does not implement the `Copy` trait
   | |                      move occurs due to use in generator
35 | |         let b = db;
36 | |     }).await;
   | |_____^ move out of `outgoing` occurs here
Ömer Erden
  • 7,680
  • 5
  • 36
  • 45
NoKey
  • 129
  • 11
  • 32
  • Which variable is the error message complaining about? – Solomon Ucko Jun 24 '20 at 14:27
  • @SolomonUcko It's this line which gives me an error, I don't know the variable since the error message does not say that: `Ok(message) => do_something(message, db, &mut outgoing).await,` – NoKey Jun 24 '20 at 14:28
  • Maybe try making the closure `move` (`... move |m| async ...`) – Solomon Ucko Jun 24 '20 at 14:35
  • @SolomonUcko yes, that's what I saw in a similar question on SO. It didn't work :( – NoKey Jun 24 '20 at 14:36
  • @Shepmaster ok, I added the compile error I got from running `cargo run` – NoKey Jun 24 '20 at 14:40
  • Does this highlights/solves your problem? https://stackoverflow.com/questions/60645623/how-to-return-the-captured-variable-from-fnmut-closure-which-is-a-captor-at-t – Ömer Erden Jun 24 '20 at 14:44
  • I wonder if the issue is from returning the result of `do_something(...).await` where `do_something` is passed references... – Solomon Ucko Jun 24 '20 at 14:44
  • What happens if, in addition to the `move` keyword, you also move `let db = &*db;` to inside the closure? – Solomon Ucko Jun 24 '20 at 14:46
  • @ÖmerErden I don't see any highlights when clicking the provided link. I am having a hard time understanding the code (I don't code long in Rust). I am not sure how I can implement the answer in my code to make it work. Do I need some variables to wrap inside a Arc/Mutex? – NoKey Jun 24 '20 at 14:51
  • @SolomonUcko Still does not work, tried it with this code: `incoming.for_each(move |m| async { let x = db;let y = &mut outgoing;}).await;` – NoKey Jun 24 '20 at 14:53
  • Playground demo: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=bf51492ef0a233e32618b7fdd6fab180. Are you getting both of these error messages? – Solomon Ucko Jun 24 '20 at 15:03
  • You can also try changing `async` to `async move`. – Solomon Ucko Jun 24 '20 at 15:05
  • Try `|m| async move` and returning something other than `do_something(...).await` (e.g. `panic!(...)`), and make sure that compiles. https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=0a236558eb6d14eb13ae0b4bed59b989 – Solomon Ucko Jun 24 '20 at 15:12
  • 1
    From the @SolomonUcko 's link https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=76c966938016955cb90d3303e99c0299 , you can use `fold` instead `for_each` to keep outgoing in hand, this will fix the compile errors but i don't know if there is any logical error. I've explained this problem in the link that i've shared at comment above. – Ömer Erden Jun 24 '20 at 15:15
  • @SolomonUcko When using this code: `incoming.for_each(|m| async move {let x = &mut outgoing;let b = db;}).await;`, I get a more specific error, which I added in the question – NoKey Jun 24 '20 at 15:17
  • @ÖmerErden I checked out fold, but I get the same compile error: `incoming.fold(0, |i, m| async move {let x = &mut outgoing;let b = db;i}).await;` – NoKey Jun 24 '20 at 15:20
  • 2
    @NoKey Why do you use like that Please check the `fold` solution in this link: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=76c966938016955cb90d3303e99c0299 (which i've shared before) – Ömer Erden Jun 24 '20 at 15:22
  • @ÖmerErden Thanks a million times!! I got it now working, and now I understand your post better and fold in general. If you post a short answer describing I should use fold, rather than for_each, I would be happy to accept it :). Thanks again (took a few frustating hours of my life). – NoKey Jun 24 '20 at 16:16

1 Answers1

14

FnMut is an anonymous struct, since FnMutcaptured the &mut outgoing, it becomes a field inside of this anonymous struct and this field will be used on each call of FnMut , it can be called multiple times. If you lose it somehow (by returning or moving into another scope etc...) your program will not able to use that field for further calls, due to safety Rust Compiler doesn't let you do this(for your both case).

In your case instead of capturing the &mut outgoing we can use it as argument for each call, with this we'll keep the ownership of outgoing. You can do this by using fold from futures-rs:

incoming
    .fold(outgoing, |mut outgoing, m| async move {
        match m {
            // Error here...
            Ok(message) => do_something(message, db, &mut outgoing).await,
            Err(e) => panic!(e),
        }

        outgoing
    })
    .await;

This may seem a bit tricky but it does the job, we are using constant accumulator(outgoing) which will be used as an argument for our FnMut.

Playground (Thanks @Solomon Ucko for creating reproducible example)

See also :

Ömer Erden
  • 7,680
  • 5
  • 36
  • 45