2

There's an example of downloading a file with Rusoto S3 here: How to save a file downloaded from S3 with Rusoto to my hard drive?

The problem is that it looks like it's downloading the whole file into memory and then writing it to disk, because it uses the write_all method which takes an array of bytes, not a stream. How can I use the StreamingBody, which implements futures::Stream to stream the file to disk?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Nicholas Bishop
  • 1,141
  • 9
  • 21

1 Answers1

2

Since StreamingBody implements Stream<Item = Vec<u8>, Error = Error>, we can construct a MCVE that represents that:

extern crate futures; // 0.1.25

use futures::{prelude::*, stream};

type Error = Box<std::error::Error>;

fn streaming_body() -> impl Stream<Item = Vec<u8>, Error = Error> {
    const DUMMY_DATA: &[&[u8]] = &[b"0123", b"4567", b"89AB", b"CDEF"];
    let iter_of_owned_bytes = DUMMY_DATA.iter().map(|&b| b.to_owned());
    stream::iter_ok(iter_of_owned_bytes)
}

We can then get a "streaming body" somehow and use Stream::for_each to process each element in the Stream. Here, we just call write_all with some provided output location:

use std::{fs::File, io::Write};

fn save_to_disk(mut file: impl Write) -> impl Future<Item = (), Error = Error> {
    streaming_body().for_each(move |chunk| file.write_all(&chunk).map_err(Into::into))
}

We can then write a little testing main:

fn main() {
    let mut file = Vec::new();

    {
        let fut = save_to_disk(&mut file);
        fut.wait().expect("Could not drive future");
    }

    assert_eq!(file, b"0123456789ABCDEF");
}

Important notes about the quality of this naïve implementation:

  1. The call to write_all may potentially block, which you should not do in an asynchronous program. It would be better to hand off that blocking work to a threadpool.

  2. The usage of Future::wait forces the thread to block until the future is done, which is great for tests but may not be correct for your real use case.

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • One question about this. Where you call `streaming_body().for_each(...)`, is that more or less equivalent to doing `for chunk in streaming_body().wait() { ... }`, other than one using a closure and the other using an iterator? – Nicholas Bishop Nov 12 '18 at 02:45
  • 1
    @NicholasBishop There is a relevant difference between the two. The for loop you suggest blocks the current thread, so the thread can't do any other work until the complete stream is resolved. The stream combinator `for_each()`, on the other hand, yields control to the event loop whenever it would block. (Of course the test code in this answer does not use an event loop, and also blocks until the future is resolved. However, the whole point of asynchronous code is not to unnecessarily block the current thread, so you wouldn't do this in real code.) – Sven Marnach Nov 12 '18 at 10:12
  • For my use case I do actually want to block the thread -- just because the API provided is async doesn't mean that the calling code is. – Nicholas Bishop Nov 12 '18 at 15:11
  • @NicholasBishop Then both versions are fine. I'd probably use `Stream::wait()` in that case, since working with stream combinators can be cumbersome. – Sven Marnach Nov 12 '18 at 15:27
  • 1
    `Stream::wait` is fine for now, but it's being removed in the futures rework (just like `Future::wait`). There will be a direct replacement for `Future::wait`, but I don't know of one for `Stream::wait`. – Shepmaster Nov 12 '18 at 16:29
  • @Shepmaster wouldn't `stream.collect().wait()` work fine? – jbg Jan 28 '19 at 12:03
  • @jbg I think I'm missing something about your suggestion; `collect` will place everything in memory at once, but the OP states *without storing it entirely in memory first*. – Shepmaster Jan 28 '19 at 13:56
  • @Shepmaster apologies, I was replying to you in isolation without reading the OP's question. stream.collect().wait() is clearly not appropriate if you don't know the stream will fit in memory, or for some other reason don't want it buffered in memory. I also don't know of a replacement for Stream::wait in futures-preview. – jbg Jan 30 '19 at 04:00