3

I'm writing an application using actix_web and rusoto_s3.

When I run a command outside of an actix request directly from main, it runs fine, and the get_object works as expected. When this is encapsulated inside an actix_web request, the stream is blocked forever.

I have a client that is shared for all requests which is encapsulated into an Arc (this happens in actix data internals).

Full code:

fn index(
    _req: HttpRequest,
    path: web::Path<String>,
    s3: web::Data<S3Client>,
) -> impl Future<Item = HttpResponse, Error = actix_web::Error> {
    s3.get_object(GetObjectRequest {
        bucket: "my_bucket".to_owned(),
        key: path.to_owned(),
        ..Default::default()
    })
    .and_then(move |res| {
        info!("Response {:?}", res);
        let mut stream = res.body.unwrap().into_blocking_read();
        let mut body = Vec::new();
        stream.read_to_end(&mut body).unwrap();
        match process_file(body.as_slice()) {
            Ok(result) => Ok(result),
            Err(error) => Err(RusotoError::from(error)),
        }
    })
    .map_err(|e| match e {
        RusotoError::Service(GetObjectError::NoSuchKey(key)) => {
            actix_web::error::ErrorNotFound(format!("{} not found", key))
        }
        error => {
            error!("Error: {:?}", error);
            actix_web::error::ErrorInternalServerError("error")
        }
    })
    .from_err()
    .and_then(move |img| HttpResponse::Ok().body(Body::from(img)))
}

fn health() -> HttpResponse {
    HttpResponse::Ok().finish()
}

fn main() -> std::io::Result<()> {
    let name = "rust_s3_test";
    env::set_var("RUST_LOG", "debug");
    pretty_env_logger::init();
    let sys = actix_rt::System::builder().stop_on_panic(true).build();
    let prometheus = PrometheusMetrics::new(name, "/metrics");
    let s3 = S3Client::new(Region::Custom {
        name: "eu-west-1".to_owned(),
        endpoint: "http://localhost:9000".to_owned(),
    });
    let s3_client_data = web::Data::new(s3);

    Server::build()
        .bind(name, "0.0.0.0:8080", move || {
            HttpService::build().keep_alive(KeepAlive::Os).h1(App::new()
                .register_data(s3_client_data.clone())
                .wrap(prometheus.clone())
                .wrap(actix_web::middleware::Logger::default())
                .service(web::resource("/health").route(web::get().to(health)))
                .service(web::resource("/{file_name}").route(web::get().to_async(index))))
        })?
        .start();
    sys.run()
}

In stream.read_to_end the thread is being blocked and never resolved.

I have tried cloning the client per request and also creating a new client per request, but I've got the same result in all scenarios.

Am I doing something wrong?

It works if I don't use it async...

s3.get_object(GetObjectRequest {
    bucket: "my_bucket".to_owned(),
    key: path.to_owned(),
    ..Default::default()
})
.sync()
.unwrap()
.body
.unwrap()
.into_blocking_read();
let mut body = Vec::new();
io::copy(&mut stream, &mut body);

Is this an issue with Tokio?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Augusto
  • 1,234
  • 1
  • 16
  • 35

1 Answers1

3
let mut stream = res.body.unwrap().into_blocking_read();

Check the implementation of into_blocking_read(): it calls .wait(). You shouldn't call blocking code inside a Future.

Since Rusoto's body is a Stream, there is a way to read it asynchronously:

.and_then(move |res| {
    info!("Response {:?}", res);
    let stream = res.body.unwrap();

    stream.concat2().map(move |file| {
        process_file(&file[..]).unwrap()
    })
    .map_err(|e| RusotoError::from(e)))
})

process_file should not block the enclosing Future. If it needs to block, you may consider running it on new thread or encapsulate with tokio_threadpool's blocking.

Note: You can use tokio_threadpool's blocking in your implementation, but I recommend you understand how it works first.


If you are not aiming to load the whole file into memory, you can use for_each:

stream.for_each(|part| {
    //process each part in here 
    //Warning! Do not add blocking code here either.
})

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Ömer Erden
  • 7,680
  • 5
  • 36
  • 45
  • makes sense. Thanks for the answer. unfortunately the code inside the process file does not support a partial stream, so I need to call it only after the stream is fully consumed. How would I chain the futures to do so? – Augusto Jul 03 '19 at 09:41
  • @Augusto updated the answer you can accumulate all parts into a collection, i've used Vector in revision. I wouldn't load whole file into memory btw. – Ömer Erden Jul 03 '19 at 10:26
  • The files won't be big enough to cause an issue, but I agree with you. The problem is I'm using a library (opencv) to process images and it expects the whole array of bytes as input... How would recommend doing that? – Augusto Jul 03 '19 at 11:21
  • 1
    @Augusto IMO, If there is no alternative api and the file is not big enough it is ok to do that. – Ömer Erden Jul 03 '19 at 11:35
  • @ÖmerErden, it seems like Rusoto is very close to transition to async-await now with 0.43.0: https://github.com/rusoto/rusoto/pull/1664 ... would you mind editing this answer? :) – brainstorm Jan 23 '20 at 07:05
  • @brainstorm thanks for letting me know, i'll check, probably we might avoid nested closures with async await, i'll let you know when i'm done ^^ . – Ömer Erden Jan 23 '20 at 07:36
  • 1
    As a matter of fact, I just cooked my own hackish example over here, perhaps you and other people find it useful: https://github.com/brainstorm/rusoto-s3-async-await ... pullrequests/cleanups welcome ;P – brainstorm Jan 23 '20 at 08:27
  • @brainstorm to me it looks fine but i still need to make more investigation on `async-await`, so don't trust me :), upvoted your comment so future visitors can see the link easily. – Ömer Erden Jan 24 '20 at 06:13
  • Thanks! The example above does work indeed :) Unfortunately I'm getting some issues applying that proof of concept into another crate by using https://stackoverflow.com/questions/57810173/streamed-upload-to-s3-with-rusoto/59884256#comment102487432_57812269 ... here's the other crate if you are curious too and want to investigate on an almost-done app: https://github.com/brainstorm/htsget-aws – brainstorm Jan 24 '20 at 08:59