1

I am using actix web and I am trying to return an async function in a closure but I am getting the following error:



error[E0525]: expected a closure that implements the `Fn` trait, but this closure only implements `FnOnce`
   --> src/server.rs:134:33
    |
133 |    ...                   web::get().to(
    |                                     -- the requirement to implement `Fn` derives from here
134 |  / ...                       move |router: web::Data<Arc<Router>>,
135 |  | ...                             headers: web::Data<Arc<Headers>>,
136 |  | ...                             stream: web::Payload,
137 |  | ...                             req: HttpRequest| async {
    |  |_________________________________________________________-
138 | || ...                           start_web_socket(req, stream, params).await
139 | || ...                       },
    | ||                           ^
    | ||___________________________|
    | |____________________________this closure implements `FnOnce`, not `Fn`
    |                              closure is `FnOnce` because it moves the variable `params` out of its environment


error: aborting due to previous error; 1 warning emitted

This is the snippet that is resulting in the error. I have tried to document the code as much as I could.

I have tried moving out the variables in and out of the move blocks and have tried placing them at various places but without success.

What should be done instead of this?

    pub fn start(
        &mut self,
        py: Python,
        url: String,
        port: u16,
        socket: &PyCell<SocketHeld>,
        name: String,
        workers: usize,
    ) -> PyResult<()> {
        if STARTED
            .compare_exchange(false, true, SeqCst, Relaxed)
            .is_err()
        {
            println!("Already running...");
            return Ok(());
        }

        println!("{}", name);

        let borrow = socket.try_borrow_mut()?;
        let held_socket: &SocketHeld = &*borrow;

        let raw_socket = held_socket.get_socket();
        println!("Got our socket {:?}", raw_socket);

        let router = self.router.clone();
        let headers = self.headers.clone();
        let directories = self.directories.clone();
        let workers = Arc::new(workers);

        let asyncio = py.import("asyncio").unwrap();

        let event_loop = asyncio.call_method0("new_event_loop").unwrap();
        asyncio
            .call_method1("set_event_loop", (event_loop,))
            .unwrap();
        let event_loop_hdl = PyObject::from(event_loop);

        thread::spawn(move || {
            //init_current_thread_once();
            actix_web::rt::System::new().block_on(async move {
                let addr = format!("{}:{}", url, port);

                println!("The number of workers are {}", workers.clone());

                HttpServer::new(move || {
                    let mut app = App::new();
                    let event_loop_hdl = event_loop_hdl.clone();
                    let directories = directories.read().unwrap();
                    let router_copy = router.clone();

                    // this loop matches three types of directory serving
                    // 1. Serves a build folder. e.g. the build folder generated from yarn build
                    // 2. Shows file listing
                    // 3. Just serves the file without any redirection to sub links
                    for directory in directories.iter() {
                        if let Some(index_file) = &directory.index_file {
                            app = app.service(
                                Files::new(&directory.route, &directory.directory_path)
                                    .index_file(index_file)
                                    .redirect_to_slash_directory(),
                            );
                        } else if directory.show_files_listing {
                            app = app.service(
                                Files::new(&directory.route, &directory.directory_path)
                                    .redirect_to_slash_directory()
                                    .show_files_listing(),
                            );
                        } else {
                            app = app
                                .service(Files::new(&directory.route, &directory.directory_path));
                        }
                    }

                    app = app
                        .app_data(web::Data::new(router.clone()))
                        .app_data(web::Data::new(headers.clone()));

                    let web_socket_map = router_copy.get_web_socket_map().unwrap();
                    for elem in (web_socket_map).iter() {
                        let route = elem.key().clone();
                        let params = elem.value().clone();
                        app = app.route(
                            &route,
                            web::get().to(
                                move |router: web::Data<Arc<Router>>,
                                      headers: web::Data<Arc<Headers>>,
                                      stream: web::Payload,
                                      req: HttpRequest| async {
                                    start_web_socket(req, stream, params).await
                                },
                            ),
                        )
                    }

                    app.default_service(web::route().to(move |router, headers, payload, req| {
                        pyo3_asyncio::tokio::scope_local(event_loop_hdl.clone(), async move {
                            index(router, headers, payload, req).await
                        })
                    }))
                })
                .keep_alive(KeepAlive::Os)
                .workers(*workers.clone())
                .client_timeout(0)
                .listen(raw_socket.try_into().unwrap())
                .unwrap()
                .run()
                .await
                .unwrap();
            });
        });

----UPDATE----

Thank you for the suggestions. On updating the params, I was able to get rid of the original error but I am getting a similar but new error:

error[E0525]: expected a closure that implements the `Fn` trait, but this closure only implements `FnOnce`
   --> src/server.rs:134:33
    |
133 |    ...                   web::get().to(
    |                                     -- the requirement to implement `Fn` derives from here
134 |  / ...                       |router: web::Data<Arc<Router>>,
135 |  | ...                        headers: web::Data<Arc<Headers>>,
136 |  | ...                        stream: web::Payload,
137 |  | ...                        req: HttpRequest| async move {
    |  |_________________________________________________________-
138 | || ...                           start_web_socket(req, stream, params.clone()).await
139 | || ...                       },
    | ||                           ^
    | ||___________________________|
    | |____________________________this closure implements `FnOnce`, not `Fn`
    |                              closure is `FnOnce` because it moves the variable `params` out of its environment

The new snippet looks like this:

                    let web_socket_map = router_copy.get_web_socket_map().unwrap();
                    for elem in (web_socket_map).iter() {
                        let route = elem.key().clone();
                        let params = elem.value().clone();
                        app = app.route(
                            &route,
                            web::get().to(
                                |router: web::Data<Arc<Router>>,
                                 headers: web::Data<Arc<Headers>>,
                                 stream: web::Payload,
                                 req: HttpRequest| async move {
                                    start_web_socket(req, stream, params.clone()).await
                                },
                            ),
                        )
                    }

I have tried cloning params inside the async move but it still is giving the same error.

Sebastián Palma
  • 32,692
  • 6
  • 40
  • 59
Sanskar Jethi
  • 544
  • 5
  • 17
  • I think https://stackoverflow.com/questions/30177395/when-does-a-closure-implement-fn-fnmut-and-fnonce could be helpful; though I'm not very versed in Rust, my guess is that use of `move` here means that this closure can't implement `Fn`. – msbit Nov 27 '21 at 01:29
  • @msbit , I've seen this answer. But this answer doesn't tell me about any changes that can be done here. – Sanskar Jethi Nov 27 '21 at 01:38
  • No, it's more general information. – msbit Nov 27 '21 at 01:44
  • Towards the very end, in the `start_web_socket`, try and replace `params` with `elem.value().clone()` – cadolphs Nov 27 '21 at 02:24
  • Instead of `start_web_socket(req, stream, params)`, call `start_web_socket(req, stream, params.clone())`. (Or modify `start_web_socket` to accept params by reference, and pass it `&params`.) – user4815162342 Nov 27 '21 at 08:47
  • 1
    @msbit `move` **doesn't** mean that the closure can't implement `Fn`. `move` causes the values from the environment to be owned by the closure. It doesn't apply that the values are consumed when the closure is invoked, which is what `FnOnce` means. – user4815162342 Nov 27 '21 at 08:48
  • @Lagerbaer , thank you for the suggestions but after updating, It is giving a new but similar error. – Sanskar Jethi Nov 27 '21 at 14:19

1 Answers1

2

web::get() returns a Route, and web::get().to(...) is a Route method that expects a Handler. The Handler is expected to be an async function (async fn) - a function that returns a Future when called.

The problem is that in your code you are passing an async block, which IS a future.

An async block is a variant of a block expression which evaluates to a future.

So your code:

async move |...| { // Future
    start_web_socket(req, stream, params.clone()).await
}

Could be converted to:

move |...| { // Handler Fn
    async move { // Future
        start_web_socket(req, stream, params.clone()).await
    }
}

and that is equivalent to:

move |...| { // Handler Fn
    start_web_socket(req, stream, params.clone()) // Future
}

because when you call an async fn start_web_socket without .await, you get a Future.

A tip for debugging such things is to assign things to intermediate variables, and checking the types that compiler deduces for them.

battlmonstr
  • 5,841
  • 1
  • 23
  • 33