0

I have this piece of code which is supposed to serialize a futures::stream::Stream to a Write. I want this code to return Err if write_all fails, but I don't see a way to get that out of the callback to for_each. I found How to send data through a futures Stream by writing through the io::Write trait?, but I don't understand how to make it work in my situation.

use std::io::Write;
use std::error::Error;
use futures::stream::StreamExt;

pub async fn download(url: &str, mut dest: impl Write) -> Result<(), Box<dyn Error>> {
    let byte_stream = reqwest::get(url).await?.bytes_stream();
    
    byte_stream.for_each(|bytes| {
        if let Ok(bytes) = bytes {
            dest.write_all(&bytes).expect("failed to write");
        }
        futures::future::ready(())
    }).await;

    Ok(())
}

Cargo.toml

[dependencies]
reqwest = { version = "0.11", features = ["json", "stream"] }
tokio = { version = "1", features = ["full"] }
futures = "0.3"
bytes = "1"
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Brady Dean
  • 3,378
  • 5
  • 24
  • 50
  • It appears that you are doing sync IO in an async program. [That's a bad idea](https://stackoverflow.com/q/41932137/155423). – Shepmaster Jul 12 '21 at 14:45
  • I might try using `reqwest::Response::bytes` to skip the stream stuff, then save to disk in a tokio blocking task as your other linked answer shows. – Brady Dean Jul 12 '21 at 14:53

1 Answers1

1

I'd probably use StreamExt::next and write the loop out a bit more manually:

use futures::stream::StreamExt;
use std::{error::Error, io::Write};

pub async fn download(url: &str, mut dest: impl Write) -> Result<(), Box<dyn Error>> {
    let mut byte_stream = reqwest::get(url).await?.bytes_stream();

    while let Some(bytes) = byte_stream.next().await {
        let bytes = bytes?;
        dest.write_all(&bytes)?;
    }

    Ok(())
}

Notes:

  1. These is TryStreamExt::try_for_each which can be used here, but it wants to take ownership of dest so I went in a different direction.
  2. Mixing synchronous IO (std::io::Write) and asynchronous IO (reqwest) is a bad idea without taking special care, which is not demonstrated in this answer.

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366