5

I need to explore a directory and all its sub-directories. I can explore the directory easily with recursion in a synchronous way:

use failure::Error;
use std::fs;
use std::path::Path;

fn main() -> Result<(), Error> {
    visit(Path::new("."))
}

fn visit(path: &Path) -> Result<(), Error> {
    for e in fs::read_dir(path)? {
        let e = e?;
        let path = e.path();
        if path.is_dir() {
            visit(&path)?;
        } else if path.is_file() {
            println!("File: {:?}", path);
        }
    }
    Ok(())
}

When I try to do the same in an asynchronous manner using tokio_fs:

use failure::Error; // 0.1.6
use futures::Future; // 0.1.29
use std::path::PathBuf;
use tokio::{fs, prelude::*}; // 0.1.22

fn visit(path: PathBuf) -> impl Future<Item = (), Error = Error> {
    let task = fs::read_dir(path)
        .flatten_stream()
        .for_each(|entry| {
            println!("{:?}", entry.path());
            let path = entry.path();
            if path.is_dir() {
                let task = visit(entry.path());
                tokio::spawn(task.map_err(drop));
            }
            future::ok(())
        })
        .map_err(Error::from);

    task
}

Playground

I get the following error:

error[E0391]: cycle detected when processing `visit::{{opaque}}#0`
 --> src/lib.rs:6:28
  |
6 | fn visit(path: PathBuf) -> impl Future<Item = (), Error = Error> {
  |                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
note: ...which requires processing `visit`...
 --> src/lib.rs:6:1
  |
6 | fn visit(path: PathBuf) -> impl Future<Item = (), Error = Error> {
  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  = note: ...which requires evaluating trait selection obligation `futures::future::map_err::MapErr<impl futures::future::Future, fn(failure::error::Error) {std::mem::drop::<failure::error::Error>}>: std::marker::Send`...
  = note: ...which again requires processing `visit::{{opaque}}#0`, completing the cycle
note: cycle used when checking item types in top-level module
 --> src/lib.rs:1:1
  |
1 | / use failure::Error; // 0.1.6
2 | | use futures::Future; // 0.1.29
3 | | use std::path::PathBuf;
4 | | use tokio::{fs, prelude::*}; // 0.1.22
... |
20| |     task
21| | }
  | |_^

error[E0391]: cycle detected when processing `visit::{{opaque}}#0`
 --> src/lib.rs:6:28
  |
6 | fn visit(path: PathBuf) -> impl Future<Item = (), Error = Error> {
  |                            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  |
note: ...which requires processing `visit`...
 --> src/lib.rs:6:1
  |
6 | fn visit(path: PathBuf) -> impl Future<Item = (), Error = Error> {
  | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  = note: ...which again requires processing `visit::{{opaque}}#0`, completing the cycle
note: cycle used when checking item types in top-level module
 --> src/lib.rs:1:1
  |
1 | / use failure::Error; // 0.1.6
2 | | use futures::Future; // 0.1.29
3 | | use std::path::PathBuf;
4 | | use tokio::{fs, prelude::*}; // 0.1.22
... |
20| |     task
21| | }
  | |_^

What is the correct way of exploring a directory and its sub-directories asynchronously while propagating all the errors?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Nick
  • 10,309
  • 21
  • 97
  • 201

2 Answers2

16

I would make several modifications to rodrigo's existing answer:

  1. Return a Stream from the function, allowing the caller to do what they need with a given file entry.
  2. Return an impl Stream instead of a Box<dyn Stream>. This leaves room for more flexibility in implementation. For example, a custom type could be created that uses an internal stack instead of the less-efficient recursive types.
  3. Return io::Error from the function to allow the user to deal with any errors.
  4. Accept a impl Into<PathBuf> to allow a nicer API.
  5. Create an inner hidden implementation function that uses concrete types in its API.

Futures 0.3 / Tokio 0.2

In this version, I avoided the deeply recursive calls, keeping a local stack of paths to visit (to_visit).

use futures::{stream, Stream, StreamExt}; // 0.3.1
use std::{io, path::PathBuf};
use tokio::fs::{self, DirEntry}; // 0.2.4

fn visit(path: impl Into<PathBuf>) -> impl Stream<Item = io::Result<DirEntry>> + Send + 'static {
    async fn one_level(path: PathBuf, to_visit: &mut Vec<PathBuf>) -> io::Result<Vec<DirEntry>> {
        let mut dir = fs::read_dir(path).await?;
        let mut files = Vec::new();

        while let Some(child) = dir.next_entry().await? {
            if child.metadata().await?.is_dir() {
                to_visit.push(child.path());
            } else {
                files.push(child)
            }
        }

        Ok(files)
    }

    stream::unfold(vec![path.into()], |mut to_visit| {
        async {
            let path = to_visit.pop()?;
            let file_stream = match one_level(path, &mut to_visit).await {
                Ok(files) => stream::iter(files).map(Ok).left_stream(),
                Err(e) => stream::once(async { Err(e) }).right_stream(),
            };

            Some((file_stream, to_visit))
        }
    })
    .flatten()
}

#[tokio::main]
async fn main() {
    let root_path = std::env::args().nth(1).expect("One argument required");
    let paths = visit(root_path);

    paths
        .for_each(|entry| {
            async {
                match entry {
                    Ok(entry) => println!("visiting {:?}", entry),
                    Err(e) => eprintln!("encountered an error: {}", e),
                }
            }
        })
        .await;
}

Futures 0.1 / Tokio 0.1

use std::path::PathBuf;
use tokio::{fs, prelude::*}; // 0.1.22
use tokio_fs::DirEntry; // 1.0.6

fn visit(
    path: impl Into<PathBuf>,
) -> impl Stream<Item = DirEntry, Error = std::io::Error> + Send + 'static {
    fn visit_inner(
        path: PathBuf,
    ) -> Box<dyn Stream<Item = DirEntry, Error = std::io::Error> + Send + 'static> {
        Box::new({
            fs::read_dir(path)
                .flatten_stream()
                .map(|entry| {
                    let path = entry.path();
                    if path.is_dir() {
                        // Optionally include `entry` if you want to
                        // include directories in the resulting
                        // stream.
                        visit_inner(path)
                    } else {
                        Box::new(stream::once(Ok(entry)))
                    }
                })
                .flatten()
        })
    }

    visit_inner(path.into())
}

fn main() {
    tokio::run({
        let root_path = std::env::args().nth(1).expect("One argument required");
        let paths = visit(root_path);

        paths
            .then(|entry| {
                match entry {
                    Ok(entry) => println!("visiting {:?}", entry),
                    Err(e) => eprintln!("encountered an error: {}", e),
                };

                Ok(())
            })
            .for_each(|_| Ok(()))
    });
}

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • When I try to build this example I get the following error: error[E0432]: unresolved import `tokio_fs` What is missing in Cargo.toml? – Martin Bammer Nov 13 '19 at 07:41
  • @MartinBammer the dependencies and their versions are listed on the first 3 lines of the example code: tokio and tokio_fs. – Shepmaster Nov 13 '19 at 12:28
  • Ah I've found the problem. I had added "tokio_fs" to Cargo.toml instead of "tokio-fs". – Martin Bammer Nov 14 '19 at 13:31
  • @Shepmaster thanks, it would be great if you could also provide an updated async await solution – Nick Dec 14 '19 at 12:21
  • @Shepmaster I'm having trouble grokking this answer. I think I understand how it works, but I'm wondering why `fn one_level()` is written to return `impl Future>>`. Why not return something that is `impl Stream>` instead? I.e. wouldn't it be preferable to return a stream that returns dir entries one by one, instead of a vec once all of them have been read? (I don't know how to write such a thing, so maybe it would just be unnecessarily complex.) – Casey Rodarmor Dec 29 '19 at 21:38
  • 1
    @CaseyRodarmor that would be ideal, but it kind of requires making things recursive in an awkward way. See the various answers in [How can I create a stream where the items are based on items that the stream previously returned?](https://stackoverflow.com/q/59697795/155423). If you wanted to go down that path, I'd recommend the `Rc`-based solution today. – Shepmaster Jan 14 '20 at 18:56
  • 1
    This is the most clever thing that I saw so far in this journey of learning rust. My mind is exploding of ideas. Thanks @Shepmaster – Juan Manuel Cañabate Jul 10 '22 at 05:01
  • I get this error in 2021: `method cannot be called on \`impl futures::Future> + std::marker::Send>\` due to unsatisfied trait bounds` Good news! I was able to sorted it by replacing `visit(root_path);` with `visit(root_path).await;` – Pedro Paulo Amorim Oct 12 '22 at 20:32
  • @PedroPauloAmorim then you changed the code from what is in the answer (probably by making `visit` async, when mine isn't). You can see that the code from this answer [compiles fine](https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=d71e832e0c57541f698c711cac2756d9) – Shepmaster Oct 13 '22 at 01:37
4

Your code has two errors:

First, a function returning impl Trait cannot currently be recursive, because the actual type returned would depend on itself.

To make your example work, you need to return a sized type. The simple candidate is a trait object, that is, a Box<dyn Future<...>>:

fn visit(path: PathBuf) -> Box<dyn Future<Item = (), Error = Error>> {
    // ...
            let task = visit(entry.path());
            tokio::spawn(task.map_err(drop));
    // ...

    Box::new(task)
}

There is still your second error:

error[E0277]: `dyn futures::future::Future<Item = (), Error = failure::error::Error>` cannot be sent between threads safely
   --> src/lib.rs:14:30
    |
14  |                 tokio::spawn(task.map_err(drop));
    |                              ^^^^^^^^^^^^^^^^^^ `dyn futures::future::Future<Item = (), Error = failure::error::Error>` cannot be sent between threads safely
    | 
   ::: /root/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.1.22/src/executor/mod.rs:131:52
    |
131 | where F: Future<Item = (), Error = ()> + 'static + Send
    |                                                    ---- required by this bound in `tokio::executor::spawn`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn futures::future::Future<Item = (), Error = failure::error::Error>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn futures::future::Future<Item = (), Error = failure::error::Error>>`
    = note: required because it appears within the type `std::boxed::Box<dyn futures::future::Future<Item = (), Error = failure::error::Error>>`
    = note: required because it appears within the type `futures::future::map_err::MapErr<std::boxed::Box<dyn futures::future::Future<Item = (), Error = failure::error::Error>>, fn(failure::error::Error) {std::mem::drop::<failure::error::Error>}>`

This means that your trait object is not Send so it cannot be scheduled for execution in another thread using tokio::spawn(). Fortunately, this is easy to fix: just add + Send to your trait object:

fn visit(path: PathBuf) -> Box<dyn Future<Item = (), Error = Error> + Send> {
    //...
}

See the full code in the Playground.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
rodrigo
  • 94,151
  • 12
  • 143
  • 190
  • Thanks for your answer, could you elaborate when you say cannot *currently* be recursive? Is there any plan to change this? – Nick Jun 22 '19 at 18:01
  • Also, is there a way to propagate the errors instead of dropping them? – Nick Jun 22 '19 at 18:06
  • Currently, functions returning `impl Trait` must still return a concrete type, it is just that this type isn't named explicitly. See for example this [playground](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=aba1f67fa2cd362328dda5b17b956279). A recursive function is just a particular more complicated case. I think there has been some discussion about extending this feature to more use cases, but nothing is implemented AFAIK. – rodrigo Jun 23 '19 at 08:40
  • About propagating errors, since the function is executed asynchronously, there is nowhere to propagate the errors. However, I think you can easily create a `mpsc::Channel`, post any errors there and collect them all in the main thread. – rodrigo Jun 23 '19 at 08:42