3

I have a Rust application using warp. It implements a RESTful CRUD API. I need each route handler (i.e., the function that ends up being ultimately called by the warp filters) to have access to, and (in most cases) mutate shared application state.

The only way I can get this to compile is by cloning an Arc<Mutex<State>> for each route:

    /* internal_state is loaded from a dump file earlier on and is of type `State` */

    let state: Arc<Mutex<State>> = Arc::new(Mutex::new(internal_state));
    let index_book_state: Arc<Mutex<State>> = state.clone();
    let create_book_state: Arc<Mutex<State>> = state.clone();
    let read_book_state: Arc<Mutex<State>> = state.clone();

    let create_order_state: Arc<Mutex<State>> = state.clone();
    let read_order_state: Arc<Mutex<State>> = state.clone();
    let update_order_state: Arc<Mutex<State>> = state.clone();
    let destroy_order_state: Arc<Mutex<State>> = state.clone();

/* define CRUD routes for order books */
    let book_prefix = warp::path!("book");
    let index_book_route = book_prefix
        .and(warp::get())
        .and(warp::any().map(move || index_book_state.clone()))
        .and_then(handler::index_book_handler);
    let create_book_route = book_prefix
        .and(warp::post())
        .and(warp::body::json())
        .and(warp::any().map(move || create_book_state.clone()))
        .and_then(handler::create_book_handler);
    let read_book_route = warp::path!("book" / String)
        .and(warp::get())
        .and(warp::any().map(move || read_book_state.clone()))
        .and_then(handler::read_book_handler);

    /* define CRUD routes for orders */
    let create_order_route = warp::path!("book" / String)
        .and(warp::post())
        .and(warp::body::json())
        .and(warp::any().map(move || create_order_state.clone()))
        .and_then(handler::create_order_handler);
    let read_order_route = warp::path!("book" / String / "order" / String)
        .and(warp::get())
        .and(warp::any().map(move || read_order_state.clone()))
        .and_then(handler::read_order_handler);
    let update_order_route = warp::path!("book" / String / "order" / String)
        .and(warp::put())
        .and(warp::body::json())
        .and(warp::any().map(move || update_order_state.clone()))
        .and_then(handler::update_order_handler);
    let destroy_order_route = warp::path!("book" / String / "order" / String)
        .and(warp::delete())
        .and(warp::any().map(move || destroy_order_state.clone()))
        .and_then(handler::destroy_order_handler);

    /* aggregate all of our order book routes */
    let book_routes =
        index_book_route.or(create_book_route).or(read_book_route);

    /* aggregate all of our order routes */
    let order_routes = create_order_route
        .or(read_order_route)
        .or(update_order_route)
        .or(destroy_order_route);

    /* aggregate all of our routes */
    let routes = book_routes.or(order_routes);
  1. I doubt that this is actually correct behaviour (despite compiling and running).

  2. This seems extremely ugly for what is a relatively simple requirement.

  3. Most importantly, inside my route handlers I will need to make calls to async functions, thus requiring the handlers themselves to be marked as async, etc. When I mark the handlers as async, the compiler complains due to futures being unable to be sent across threads.

How can I achieve shared application state while having route handlers themselves be async?

A signature of a route handler (they're all the same):

/* matches routes like POST `http://example.com/[market]/` */
pub async fn create_order_handler(market: String, request: CreateOrderRequest, state: Arc<Mutex<State>>, rpc_endpoint: String) -> Result<impl Reply, Rejection>
kmdreko
  • 42,554
  • 6
  • 57
  • 106
sporejack
  • 71
  • 1
  • 6

1 Answers1

10

You share state via shared ownership (such as an Arc) paired with thread-safe interior mutability (such as Mutex, RwLock, or an atomic):

use std::sync::{Arc, Mutex};
use warp::Filter;

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(0));

    let market = warp::path!("market" / String).map({
        let state = state.clone();
        move |market| {
            *state.lock().unwrap() += 1;
            format!("Market: {}", market)
        }
    });

    let plaza = warp::path!("plaza" / String).map({
        let state = state.clone();
        move |plaza| {
            let state = *state.lock().unwrap();
            format!("Plaza: {} ({})", plaza, state)
        }
    });

    let routes = market.or(plaza);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}
% curl 127.0.0.1:3030/market/one
Market: one

% curl 127.0.0.1:3030/plaza/one
Plaza: one (1)

To perform asynchronous work, use Filter::and_then:

use std::{
    convert::Infallible,
    sync::{Arc, Mutex},
};
use warp::Filter;

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(0));

    let market = warp::path!("market" / String).and_then({
        let state = state.clone();
        move |market| {
            let state = state.clone();
            async move {
                *state.lock().unwrap() += 1;
                Ok::<_, Infallible>(format!("Market: {}", market))
            }
        }
    });

    let plaza = warp::path!("plaza" / String).and_then({
        let state = state.clone();
        move |plaza| {
            let state = state.clone();
            async move {
                let state = *state.lock().unwrap();
                Ok::<_, Infallible>(format!("Plaza: {} ({})", plaza, state))
            }
        }
    });

    let routes = market.or(plaza);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}

These can even be separate functions:

use std::{
    convert::Infallible,
    sync::{Arc, Mutex},
};
use warp::Filter;

#[tokio::main]
async fn main() {
    let state = Arc::new(Mutex::new(0));

    let market = warp::path!("market" / String).and_then({
        let state = state.clone();
        move |m| market(m, state.clone())
    });

    let plaza = warp::path!("plaza" / String).and_then({
        let state = state.clone();
        move |p| plaza(p, state.clone())
    });

    let routes = market.or(plaza);

    warp::serve(routes).run(([127, 0, 0, 1], 3030)).await;
}

type State = Arc<Mutex<i32>>;

async fn market(market: String, state: State) -> Result<String, Infallible> {
    *state.lock().unwrap() += 1;
    Ok::<_, Infallible>(format!("Market: {}", market))
}

async fn plaza(plaza: String, state: State) -> Result<String, Infallible> {
    let state = *state.lock().unwrap();
    Ok::<_, Infallible>(format!("Plaza: {} ({})", plaza, state))
}

There's a second set of clones here because there are two distinct things owning data:

  1. The handler itself (the closure)
  2. The future returned by the closure (the async code)

See also:


[dependencies]
warp = "0.3.0"
tokio = { version = "1.2.0", features = ["full"] }
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • 1
    A pattern I rather like is to use a "data" filter (`warp::any().map(move || thing.clone())`) to attach data to the filters chain and not have to do so by hand. That way you can just pass the `async` function to the `and_then`. – Masklinn Feb 09 '21 at 06:46
  • This only works if the resource contained with the Arc contains Copy. Is there a way to do this for a non-copyable resource? For instance, I have a big data store that I don't want to copy around (and can't because HashMap doesn't implement Copy). I also want it to be editable. So I put it on an Arc>, but still can't put that into a Filter because filters apparently require Copy all the way down. – Savanni D'Gerinel Oct 02 '21 at 16:58
  • @SavanniD'Gerinel that’s not accurate; `Arc` does not need need `Copy`, and it does not implement `Copy` itself, regardless of what it contains. – Shepmaster Oct 02 '21 at 17:01
  • It was a mistake on my part. I was having trouble because I thought Filter required Copy, which was imposing Copy upon the Arc that I was putting into the Fn. *but*, it turns out the Filter only requires Clone, and so all of my trouble went away. – Savanni D'Gerinel Oct 04 '21 at 01:15