5

I'm trying to use async_std to receive UDP datagrams from the network.

There is a UdpSocket that implements async recv_from, this method returns a future but I need a async_std::stream::Stream that gives a stream of UDP datagrams because it is a better abstraction.

I've found tokio::net::UdpFramed that does exactly what I need but it is not available in current versions of tokio.

Generally speaking the question is how do I convert Futures from a given async function into a Stream?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Superuzir
  • 323
  • 2
  • 5

2 Answers2

7

For a single item, use FutureExt::into_stream:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    inner().into_stream()
}

async fn inner() -> i32 {
    42
}

For a stream from a number of futures generated by a closure, use stream::unfold:

use futures::prelude::*; // 0.3.1

fn outer() -> impl Stream<Item = i32> {
    stream::unfold((), |()| async { Some((inner().await, ())) })
}

async fn inner() -> i32 {
    42
}

In your case, you can use stream::unfold:

use async_std::{io, net::UdpSocket}; // 1.4.0, features = ["attributes"]
use futures::prelude::*; // 0.3.1

fn read_many(s: UdpSocket) -> impl Stream<Item = io::Result<Vec<u8>>> {
    stream::unfold(s, |s| {
        async {
            let data = read_one(&s).await;
            Some((data, s))
        }
    })
}

async fn read_one(s: &UdpSocket) -> io::Result<Vec<u8>> {
    let mut data = vec![0; 1024];
    let (len, _) = s.recv_from(&mut data).await?;
    data.truncate(len);
    Ok(data)
}

#[async_std::main]
async fn main() -> io::Result<()> {
    let s = UdpSocket::bind("0.0.0.0:9876").await?;

    read_many(s)
        .for_each(|d| {
            async {
                match d {
                    Ok(d) => match std::str::from_utf8(&d) {
                        Ok(s) => println!("{}", s),
                        Err(_) => println!("{:x?}", d),
                    },
                    Err(e) => eprintln!("Error: {}", e),
                }
            }
        })
        .await;

    Ok(())
}
Farcaller
  • 3,070
  • 1
  • 27
  • 42
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
2

Generally speaking the question is how do I convert Futures from a given async function into a Stream?

There is FutureExt::into_stream, but don't let the name fool you; it is not a good fit for your situation.

There is a UdpSocket that implements async recv_from, this method returns a future but I need a async_std::stream::Stream that gives a stream of UDP datagrams because it is a better abstraction.

It is not necessarily a better abstraction here.

Specifically, async-std's UdpSocket::recv_from returns a future that has output type of (usize, SocketAddr) — the size of the data received and the peer address. If you were to use into_stream to convert it to a stream, it would give you just that, not the data received.

I've found tokio::net::UdpFramed that does exactly what I need but it is not available in current versions of tokio.

It has been moved to tokio-util crate. Unfortunately, you can't (easily) use that either. It requires a tokio::net::UdpSocket, which is not the same as async_std::net::UdpSocket.

You can, of course, use futures utility functions such as futures::stream::poll_fn or futures::stream::unfold to give UdpSocket::recv_from a futures::stream::Stream facade, but then what will you do with that? If you end up calling StreamExt::next to poll a value out of it, you could have used recv_from directly.

It is only necessary to reach for Stream if some API you are using requires a Stream input, such as rusoto:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
edwardw
  • 12,652
  • 3
  • 40
  • 51