1

I am trying to take the tonic routeguide tutorial, and turn the client into a rocket server. I am just taking the response and converting from the gRPC to a string.

service RouteGuide {
    rpc GetFeature(Point) returns (Feature) {}
    rpc ListFeatures(Rectangle) returns (stream Feature) {}
}

This works well enough for GetFeature. For the ListFeatures query, just as Tonic allows the client the stream in the response, I wanted to pass this on to the Rocket client. I see that Rocket supports streaming responses, but I need to implement the AsyncRead trait.

Is there any way to do something like this? Below is a trimmed down version of about what I was doing:

struct FeatureStream {
    stream: tonic::Streaming<Feature>,
}

impl AsyncRead for FeatureStream {
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        // Write out as utf8 any response messages.
        match Pin::new(&mut self.stream.message()).poll(cx) {
            Poll::Pending => Poll::Pending,
            Poll::Ready(feature) => Poll::Pending,
        }
    }
}

#[get("/list_features")]
async fn list_features(client: State<'_, RouteGuideClient<Channel>>) -> Stream<FeatureStream> {
    let rectangle = Rectangle {
        low: Some(Point {
            latitude: 400_000_000,
            longitude: -750_000_000,
        }),
        high: Some(Point {
            latitude: 420_000_000,
            longitude: -730_000_000,
        }),
    };
    let mut client = client.inner().clone();
    let stream = client
        .list_features(Request::new(rectangle))
        .await
        .unwrap()
        .into_inner();
    Stream::from(FeatureStream { stream })
}

#[rocket::launch]
async fn rocket() -> rocket::Rocket {
    rocket::ignite()
        .manage(
            create_route_guide_client("http://[::1]:10000")
                .await
                .unwrap(),
        )
        .mount("/", rocket::routes![list_features,])
}

With the error:

error[E0277]: `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>` cannot be unpinned
   --> src/web_user.rs:34:15
    |
34  |         match Pin::new(&mut self.stream.message()).poll(cx) {
    |               ^^^^^^^^ within `impl std::future::Future`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@Streaming<Feature>::message::{closure#0} for<'r, 's, 't0, 't1, 't2> {ResumeTy, &'r mut Streaming<Feature>, [closure@Streaming<Feature>::message::{closure#0}::{closure#0}], rocket::futures::future::PollFn<[closure@Streaming<Feature>::message::{closure#0}::{closure#0}]>, ()}]>`
    | 
   ::: /home/matan/.cargo/registry/src/github.com-1ecc6299db9ec823/tonic-0.4.0/src/codec/decode.rs:106:40
    |
106 |     pub async fn message(&mut self) -> Result<Option<T>, Status> {
    |                                        ------------------------- within this `impl std::future::Future`
    |
    = note: required because it appears within the type `impl std::future::Future`
    = note: required because it appears within the type `impl std::future::Future`
    = note: required by `Pin::<P>::new`
matanmarkind
  • 219
  • 3
  • 13
  • 1
    It looks like `message()` function is a helper to pick next message from the tonic Streaming. You don't need `message()` function to pick next message for `AsyncRead` you already have the `Stream`, you can pick the next message by yourself, here is the code https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=1a1637cf47eafd720470228cecd5f1dc (it returns `Pending` for all cases as your code does, you can change it as you desire) – Ömer Erden Mar 08 '21 at 10:44

2 Answers2

2

The problem is generated Future from tonic::Streaming<Feature>::message() doesn't implement Unpin since it is an async function. Let's label this type as MessageFuture, you cannot pin &mut MessageFuture pointer safely because the dereferenced type MessageFuture doesn't implement Unpin.

Why it is not safe?

From reference, implementation of Unpin brings:

Types that can be safely moved after being pinned.

It means if T:!Unpin then Pin<&mut T> is not movable, this is important because Futures created by async block has no Unpin implementation since it might hold reference of a member from itself, and if you move the T the pointee of this reference will also be moved, but the reference will still point the same address, to prevent this it should not be movable. Please read "Pinning" section from async-book to visualize the reason.

Note: T:!Unpin means T is the type that has no Unpin implementation.

Solution

message() function is helper to pick next message from the tonic::Streaming<T>. You don't particularly need to call message() to pick next element from the stream, you already have actual stream in your structure.

struct FeatureStream {stream: tonic::Streaming<Feature>}

You can await for the next message for AsyncRead like:

impl AsyncRead for FeatureStream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {

       //it returns Pending for all cases as your code does, you can change it as you want
        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(m))) => Poll::Pending,
            Poll::Ready(Some(Err(e))) => Poll::Pending,
            Poll::Ready(None) => Poll::Pending,
            Poll::Pending => Poll::Pending
        }
    }
}

Please note that tonic::Streaming<T> has implementation of Unpin(reference)

Ömer Erden
  • 7,680
  • 5
  • 36
  • 45
1

Thank you Omer Erden for answering this. So it came down to implementing AsyncRead based on the futures::Stream trait, which tonic::Streaming implements. Here is the code I actually used.

impl AsyncRead for FeatureStream {
    fn poll_read(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<std::io::Result<()>> {
        use futures::stream::StreamExt;
        use std::io::{Error, ErrorKind};

        match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(m))) => {
                buf.put_slice(format!("{:?}\n", m).as_bytes());
                Poll::Ready(Ok(()))
            }
            Poll::Ready(Some(Err(e))) => {
                Poll::Ready(Err(Error::new(ErrorKind::Other, format!("{:?}", e))))
            }
            Poll::Ready(None) => {
                // None from a stream means the stream terminated. To indicate
                // that from AsyncRead we return Ok and leave buf unchanged.
                Poll::Ready(Ok(()))
            }
            Poll::Pending => Poll::Pending,
        }
    }
}

In the meantime my workaround was to create both ends of a TcpStream (which implements AsyncRead) and return one end of it while spawning a separate task to actually write out the results.

#[get("/list_features")]
async fn list_features(
    client: State<'_, RouteGuideClient<Channel>>,
    tasks: State<'_, Mutex<Vec<tokio::task::JoinHandle<()>>>>,
) -> Result<Stream<TcpStream>, Debug<std::io::Error>> {
    let mut client = client.inner().clone();
    let mut feature_stream = client
        .list_features(Request::new(Rectangle {
            low: Some(Point {
                latitude: 400000000,
                longitude: -750000000,
            }),
            high: Some(Point {
                latitude: 420000000,
                longitude: -730000000,
            }),
        }))
        .await
        .unwrap()
        .into_inner();

    // Port 0 tells to operating system to choose an unused port.
    let tcp_listener = TcpListener::bind(("127.0.0.1", 0)).await?;
    let socket_addr = tcp_listener.local_addr().unwrap();
    tasks.lock().unwrap().push(tokio::spawn(async move {
        let mut tcp_stream = TcpStream::connect(socket_addr).await.unwrap();

        while let Some(feature) = feature_stream.message().await.unwrap() {
            match tcp_stream
                .write_all(format!("{:?}\n", feature).as_bytes())
                .await
            {
                Ok(()) => (),
                Err(e) => panic!(e),
            }
        }
        println!("End task");
        ()
    }));
    Ok(Stream::from(tcp_listener.accept().await?.0))
}
matanmarkind
  • 219
  • 3
  • 13
  • 1
    You shouldn't run blocking code while polling, instead you can use tokio timer to create proper delay, more information : https://stackoverflow.com/questions/48735952/why-does-futureselect-choose-the-future-with-a-longer-sleep-period-first – Ömer Erden Mar 09 '21 at 21:32
  • I assume you are referring to the sleep that I had? That was from a test I was running and forgot to take that out when posting here, thanks for pointing it out and for pasting that link. – matanmarkind Mar 10 '21 at 11:52