0

I have something similar to the tokio connect example with a method that accepts a sink:

pub async fn connect(
        addr: &SocketAddr,
        mut stdin: impl Stream<Item = Result<Request, io::Error>> + Unpin,
        mut stdout: impl Sink<Response, Error = io::Error> + Unpin,
    ) -> Result<(), Box<dyn Error>> {

Is there a standard/easy way to adapt a function to a sink for printing and/or transformation?

eg. something like:

connect(.., .., sink::from_function(|r| match r {
    Ok(response) => println!("received a response: {:?}", response),
    Err(e) => println!("error! {:?}", e);
})
.await;
arbe
  • 77
  • 1
  • 5

1 Answers1

2

You can use the drain() function (which creates a sink that just discards all items) chained with the .with() method (which maps the inputs of a sink) to create a sink from a function:

use futures::prelude::*;
use futures::sink::drain;

let sink = drain().with(|value| async move { // <-- note async block
    // do something with the input...

    // then return a result
    Ok(())
});

You can also use .with() to inspect or transform an existing stream, you just need to ensure that the success type you return from the closure is the same as the input of the stream you're transforming.

Playground example

Frxstrem
  • 38,761
  • 9
  • 79
  • 119
  • Thank you, this is very helpful! Unfortunately in my particular example I seem to be running into an issue with the Unpin. I've [wrapped up the example](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=bbf528dc7735c7e79fc5341f2fcb2aa4). It sounds like it might be solvable with the `pin_mut` macro? The compiler error is: ```within `impl core::future::future::Future`, the trait `std::marker::Unpin` is not implemented for `std::future::GenFuture<[static generator@src/main.rs:9:39: 13:10 value:_ _]>``` – arbe Apr 10 '20 at 15:10
  • @arbe You can pin the future by wrapping the `async` block in `Box::pin`, which will pin it on the heap: `|value| Box::pin(async move { ... })`. – Frxstrem Apr 10 '20 at 15:52
  • @arbe Also, in your example you have an Unpin constraint on line 20, which is not necessary in this case. You could just remove that constrain, which will also fix the problem without moving the future onto the heap (and still be safe as long as it compiles and you're not using unsafe). But you can only do this if nothing inside the function requires it to be pinned, otherwise you can do what I described above or use pin_mut! inside the function, yes. – Frxstrem Apr 10 '20 at 16:03
  • Adding a `pin_mut!(sink);` following the declaration solved my issue. Thanks again! – arbe Apr 11 '20 at 02:25
  • Does this work if my FnMut has some state? I want to write to a `AsyncWrite`r for each item. – piegames Nov 27 '20 at 16:47
  • @piegames it sounds like you want `futures::io::AsyncWriteExt::into_sink`? You might also need `tokio-util::compat::TokioAsyncWriteCompatExt` if you had a `tokio::io::AsyncWrite`. – Simon Buchan Feb 23 '21 at 05:49