1

I am trying to implement tokio::stream::Stream on a struct that should repeatedly produce the result of calling an async method on that same struct. I am having a difficult time trying to get the lifetimes to work out. Here is the code that I have so far:

use std::future::Future;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::ready;
use hyper::{
    client::{Client, HttpConnector},
    body::Body,
};
use pin_project::pin_project;
use tokio::stream::{Stream, StreamExt};

// Placeholder for actual result type
#[derive(Debug)]
struct MyResult;

#[pin_project]
struct ResultProducer {
    client: Client<HttpConnector, Body>,
    #[pin]
    current_request: Option<Pin<Box<dyn Future<Output=MyResult>>>>,
}

impl ResultProducer {
    fn new() -> ResultProducer {
        ResultProducer {
            client: Client::new(),
            current_request: None,
        }
    }

    // This is the method that should be called repeatedly
    async fn next_result(&self) -> MyResult {
        let url = "http://www.example.com".parse().unwrap();
        let _ = self.client.get(url).await.unwrap();

        MyResult
    }
}

impl Stream for ResultProducer {
    type Item = MyResult;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut fut = self.project().current_request;

        if fut.is_none() {
            fut.set(Some(Box::pin(self.next_result())));
        }

        let res = ready!(fut.as_mut().as_pin_mut().unwrap().poll(cx));
        fut.set(None);

        Poll::Ready(Some(res))
    }
}

#[tokio::main]
async fn main() {
    let requester = ResultProducer::new();
    while let Some(res) = requester.next().await {
        println!("Got a result: {:?}", res);
    }
}

When I try compiling this, I get the following error:

error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
  --> src/main.rs:46:40
   |
46 |             fut.set(Some(Box::pin(self.next_result())));
   |                                        ^^^^^^^^^^^

It seems like the compiler thinks that the future returned from next_result could outlive the ResultProducer instance and wants it to have a 'static lifetime, but it seems like this should be possible. What am I doing wrong here?

Update

Based on the related questions

I can see that there is no safe way to guarantee that the &self reference taken by next_result will be valid on any given call to poll on the future that it returns.

In the case of the code that I am writing, I can guarantee that the referent will not be dropped, so I could perform an unsafe cast of a self reference to give it an arbitrary lifetime, but I decided to go the safe route and extract the state that this method needs into another struct and let ResultProducer own an Arc of this struct (actually an Arc<Mutex<...>> since I need mutable access in my actual code). The async function then takes the Arc<Mutex<...>>, which it can safely access, since the lifetime is no longer tied to self.

This is the general approach suggested in the accepted answer to How to implement a `Future` / `Stream` that polls `async fn(&mut self)`?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Andrew
  • 2,084
  • 20
  • 32
  • It looks like your question might be answered by the answers of [Creating a stream of values while calling async fns?](https://stackoverflow.com/q/56719990/155423); [How to implement a `Future` / `Stream` that polls `async fn(&mut self)`?](https://stackoverflow.com/q/61295176/155423); [Is there any way to create a async stream generator that yields the result of repeatedly calling a function?](https://stackoverflow.com/q/58700741/155423). If not, please **[edit]** your question to explain the differences. Otherwise, we can mark this question as already answered. – Shepmaster Jul 21 '20 at 12:29
  • Thanks, @Shepmaster. This question has no substantial differences from https://stackoverflow.com/q/61295176/155423 and can be closed as soon as the bounty expires. – Andrew Jul 21 '20 at 17:40

0 Answers0