4

I'd like to take a regular iterator and turn it into a stream so that I can do further stream processing. The trouble is that I may have an iterator or an error to deal with. I think I'm pretty close with this:

#[macro_use]
extern crate log;
extern crate futures; // 0.1.21
extern crate tokio;

use futures::prelude::*;
use futures::{future, stream};
use std::fmt::Debug;
use std::net::{SocketAddr, ToSocketAddrs};

fn resolve(addrs: impl ToSocketAddrs + Debug) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
            Some(a) => Some(future::ok((a, iter))),
            None => None,
        }),
        Err(e) => {
            error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
            stream::empty()
        }
    }
}

fn main() {
    let task = resolve("1.2.3.4:12345")
        .map_err(|e| error!("{:?}", e))
        .for_each(|addr| info!("{:?}", addr))
        .fold();
    tokio::run(task);
}

playground

error[E0308]: match arms have incompatible types
  --> src/main.rs:12:5
   |
12 | /     match addrs.to_socket_addrs() {
13 | |         Ok(iter) => stream::unfold(iter, |iter| match iter.next() {
14 | |             Some(a) => Some(future::ok((a, iter))),
15 | |             None => None,
...  |
20 | |         }
21 | |     }
   | |_____^ expected struct `futures::stream::Unfold`, found struct `futures::stream::Empty`
   |
   = note: expected type `futures::stream::Unfold<<impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter, [closure@src/main.rs:13:42: 16:10], futures::FutureResult<(std::net::SocketAddr, <impl ToSocketAddrs + Debug as std::net::ToSocketAddrs>::Iter), _>>`
              found type `futures::stream::Empty<_, _>`
note: match arm with an incompatible type
  --> src/main.rs:17:19
   |
17 |           Err(e) => {
   |  ___________________^
18 | |             error!("could not resolve socket addresses {:?}: {:?}", addrs, e);
19 | |             stream::empty()
20 | |         }
   | |_________^

error[E0277]: the trait bound `(): futures::Future` is not satisfied
  --> src/main.rs:27:10
   |
27 |         .for_each(|addr| info!("{:?}", addr))
   |          ^^^^^^^^ the trait `futures::Future` is not implemented for `()`
   |
   = note: required because of the requirements on the impl of `futures::IntoFuture` for `()`

error[E0599]: no method named `fold` found for type `futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()>` in the current scope
  --> src/main.rs:28:10
   |
28 |         .fold();
   |          ^^^^
   |
   = note: the method `fold` exists but the following trait bounds were not satisfied:
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : futures::Stream`
           `&mut futures::stream::ForEach<futures::stream::MapErr<impl futures::Stream, [closure@src/main.rs:26:18: 26:39]>, [closure@src/main.rs:27:19: 27:45], ()> : std::iter::Iterator`

The hint is pretty obvious. The two Results I'm returning from the match differ and should be the same. Now, how can I do that so that I return a stream?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Robert Cutajar
  • 3,181
  • 1
  • 30
  • 42
  • 2
    invert your controlflow. always use unfold and use a nested match inside – the8472 Jun 30 '18 at 12:51
  • Thanks a lot. I did, now the `fn resolve` seems to compile, but I'm not able to use it - https://play.rust-lang.org/?gist=3f76f3d827427df945a39a910366c66a It should just stream the `SocketAddr` and I need to map it somehow so that in the end I get a `Future` to be executed by `tokio::run()` – Robert Cutajar Jun 30 '18 at 13:55
  • Got rid of that `Option<>` now... https://play.rust-lang.org/?gist=5ed8a3790f637e96ac01a491fd690849&version=stable&mode=debug&edition=2015 – Robert Cutajar Jun 30 '18 at 14:06

1 Answers1

8

Rust is a statically typed language which means that the return type of a function has to be a single type, known at compile time. You are attempting to return multiple types, decided at runtime.

The closest solution to your original is to always return the Unfold stream:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::unfold(addrs.to_socket_addrs(), |r| {
        match r {
            Ok(mut iter) => iter.next().map(|addr| future::ok((addr, Ok(iter)))),
            Err(_) => None,
        }
    })
}

But why reinvent the wheel?

futures::stream::iter_ok

Converts an Iterator into a Stream which is always ready to yield the next value.

Subsequent versions of the futures crate implement Stream for Either, which makes this very elegant:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    match addrs.to_socket_addrs() {
        Ok(iter) => stream::iter_ok(iter).left_stream(),
        Err(_) => stream::empty().right_stream(),
    }
}

It's straightforward to backport this functionality to futures 0.1 (maybe someone should submit it as a PR for those who are stuck on 0.1...):

enum MyEither<L, R> {
    Left(L),
    Right(R),
}

impl<L, R> Stream for MyEither<L, R>
where
    L: Stream,
    R: Stream<Item = L::Item, Error = L::Error>,
{
    type Item = L::Item;
    type Error = L::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self {
            MyEither::Left(l) => l.poll(),
            MyEither::Right(r) => r.poll(),
        }
    }
}

trait EitherStreamExt {
    fn left_stream<R>(self) -> MyEither<Self, R>
    where
        Self: Sized;
    fn right_stream<L>(self) -> MyEither<L, Self>
    where
        Self: Sized;
}

impl<S: Stream> EitherStreamExt for S {
    fn left_stream<R>(self) -> MyEither<Self, R> {
        MyEither::Left(self)
    }
    fn right_stream<L>(self) -> MyEither<L, Self> {
        MyEither::Right(self)
    }
}

Even better, use the fact that Result is an iterator and Stream::flatten exists:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::iter_ok(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .flatten()
}

Or if you really want to print errors:

fn resolve(addrs: impl ToSocketAddrs) -> impl Stream<Item = SocketAddr, Error = ()> {
    stream::once(addrs.to_socket_addrs())
        .map(stream::iter_ok)
        .map_err(|e| eprintln!("err: {}", e))
        .flatten()
}

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366