I am trying to implement tokio::stream::Stream
on a struct that should repeatedly produce the result of calling an async method on that same struct. I am having a difficult time trying to get the lifetimes to work out. Here is the code that I have so far:
use std::future::Future;
use std::pin::Pin;
use std::task::{Poll, Context};
use futures::ready;
use hyper::{
client::{Client, HttpConnector},
body::Body,
};
use pin_project::pin_project;
use tokio::stream::{Stream, StreamExt};
// Placeholder for actual result type
#[derive(Debug)]
struct MyResult;
#[pin_project]
struct ResultProducer {
client: Client<HttpConnector, Body>,
#[pin]
current_request: Option<Pin<Box<dyn Future<Output=MyResult>>>>,
}
impl ResultProducer {
fn new() -> ResultProducer {
ResultProducer {
client: Client::new(),
current_request: None,
}
}
// This is the method that should be called repeatedly
async fn next_result(&self) -> MyResult {
let url = "http://www.example.com".parse().unwrap();
let _ = self.client.get(url).await.unwrap();
MyResult
}
}
impl Stream for ResultProducer {
type Item = MyResult;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut fut = self.project().current_request;
if fut.is_none() {
fut.set(Some(Box::pin(self.next_result())));
}
let res = ready!(fut.as_mut().as_pin_mut().unwrap().poll(cx));
fut.set(None);
Poll::Ready(Some(res))
}
}
#[tokio::main]
async fn main() {
let requester = ResultProducer::new();
while let Some(res) = requester.next().await {
println!("Got a result: {:?}", res);
}
}
When I try compiling this, I get the following error:
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter in function call due to conflicting requirements
--> src/main.rs:46:40
|
46 | fut.set(Some(Box::pin(self.next_result())));
| ^^^^^^^^^^^
It seems like the compiler thinks that the future returned from next_result
could outlive the ResultProducer
instance and wants it to have a 'static
lifetime, but it seems like this should be possible. What am I doing wrong here?
Update
Based on the related questions
- Creating a stream of values while calling async fns?
- How to implement a `Future` / `Stream` that polls `async fn(&mut self)`?
- Is there any way to create a async stream generator that yields the result of repeatedly calling a function?
I can see that there is no safe way to guarantee that the &self
reference taken by next_result
will be valid on any given call to poll
on the future that it returns.
In the case of the code that I am writing, I can guarantee that the referent will not be dropped, so I could perform an unsafe cast of a self reference to give it an arbitrary lifetime, but I decided to go the safe route and extract the state that this method needs into another struct and let ResultProducer
own an Arc
of this struct (actually an Arc<Mutex<...>>
since I need mutable access in my actual code). The async
function then takes the Arc<Mutex<...>>
, which it can safely access, since the lifetime is no longer tied to self
.
This is the general approach suggested in the accepted answer to How to implement a `Future` / `Stream` that polls `async fn(&mut self)`?