0

I implemented a Tokio Future that has the following high level logic:

  1. Make a call to a function recv_dgram. This should return a Future that is ready when a small datagram is received.
  2. When the Future is ready, keep the small datagram.
  3. If we have enough small datagrams to reconstruct large datagram, set the Future as ready and return the reconstructed datagram. Otherwise, return to step 1.

I have some lifetime problem that I can't manage to understand. I created a self contained simplified piece of code to demonstrate it.

In the code below, RecvMsg is the Future that is ready when enough small datagrams were received. recv_dgram is a function that returns a Future that is ready when a small datagram has arrived.

I am trying to compile the following code (playground):

extern crate futures;
use self::futures::{Future, Poll, Async};

struct RecvMsg<'d,R>
where 
    R: for <'r> FnMut(&'r mut [u8]) -> Box<Future<Item=&'r mut [u8], Error=()> + 'r>,
{
    recv_dgram: R,
    temp_buff: Vec<u8>,
    opt_read_future: Option<Box<Future<Item=&'d mut [u8], Error=()> + 'd>>,
}

impl<'d,R> Future for RecvMsg<'d,R>
where 
    R: for <'r> FnMut(&'r mut [u8]) -> Box<Future<Item=&'r mut [u8], Error=()> + 'r>,
{
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, ()> {

        // Obtain a future datagram, 
        let mut fdgram = (self.recv_dgram)(&mut self.temp_buff);
        // Code compiles if this line is commented out:
        self.opt_read_future = Some(fdgram);
        return Ok(Async::NotReady);
    }
}

fn main() {}

This is the error message I get:

error[E0495]: cannot infer an appropriate lifetime for borrow expression due to conflicting requirements
  --> src/main.rs:25:44
   |
25 |         let mut fdgram = (self.recv_dgram)(&mut self.temp_buff);
   |                                            ^^^^^^^^^^^^^^^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 22:5...
  --> src/main.rs:22:5
   |
22 | /     fn poll(&mut self) -> Poll<Self::Item, ()> {
23 | |
24 | |         // Obtain a future datagram, 
25 | |         let mut fdgram = (self.recv_dgram)(&mut self.temp_buff);
...  |
28 | |         return Ok(Async::NotReady);
29 | |     }
   | |_____^
note: ...so that reference does not outlive borrowed content
  --> src/main.rs:25:44
   |
25 |         let mut fdgram = (self.recv_dgram)(&mut self.temp_buff);
   |                                            ^^^^^^^^^^^^^^^^^^^
note: but, the lifetime must be valid for the lifetime 'd as defined on the impl at 13:1...
  --> src/main.rs:13:1
   |
13 | / impl<'d,R> Future for RecvMsg<'d,R>
14 | | where 
15 | |     R: for <'r> FnMut(&'r mut [u8]) -> Box<Future<Item=&'r mut [u8], Error=()> + 'r>,
16 | | {
...  |
29 | |     }
30 | | }
   | |_^
note: ...so that expression is assignable (expected std::option::Option<std::boxed::Box<futures::Future<Error=(), Item=&'d mut [u8]> + 'd>>, found std::option::Option<std::boxed::Box<futures::Future<Error=(), Item=&mut [u8]>>>)
  --> src/main.rs:27:32
   |
27 |         self.opt_read_future = Some(fdgram);
   |                                ^^^^^^^^^^^^

I have some ideas about what could be wrong. I know that if I comment out the line:

reading_state.opt_read_future = Some(fdgram);

The code compiles successfully. In addition, I suspect that the fact that reading_state.temp_buff is used here as an argument:

let mut fdgram = (reading_state.frag_msg_receiver.recv_dgram)(
                &mut reading_state.temp_buff);

has something to do with the problem. (See also Why can't I store a value and a reference to that value in the same struct?)

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
real
  • 639
  • 1
  • 6
  • 15
  • 2
    Yes, it is [Why can't I store a value and a reference to that value in the same struct?](https://stackoverflow.com/questions/32300132/why-cant-i-store-a-value-and-a-reference-to-that-value-in-the-same-struct). `opt_read_future` contains a reference to `temp_buf`. – red75prime Sep 22 '17 at 08:58
  • @red75prime: Reading your comment, I tried to simplify the code even more to find the core problem. I realize that I have both `temp_buff` and somehow a reference to its contents on the same struct, but `temp_buff` is a vector and its contents are on the heap. Why would this be a problem? I can't seem to understand what could go wrong here. In addition, do you have an idea for a workaround? Where should I put `temp_buff`? – real Sep 22 '17 at 10:54

1 Answers1

0

Types implementing Future but not having 'static lifetime are pretty useless, as the event loop won't run be able to run them.

Also I think lifetimes in a Future::Item are bad; I don't see how the type system could help you determine in the control flow when the future actually completed to release the lifetime.

So you probably want R: FnMut(Vec<u8>) -> Box<Future<Item=Vec<u8>, Error=()>> instead for the recv_dgram function. It will receive a buffer it is supposed to append new received data to (it could use reserve() and "unsafe" buffer filling + set_len() for optimization). You could also wrap Vec in some custom type to enforce "append-only" semantics.

Now you should realize that mutability isn't going to help this function a lot - you could call it 10 times in a row without waiting for the returned futures to complete. Instead you probably want to pass explicit state along. Also lets avoid the boxing, and accept any result convertible with IntoFuture:

// `S` is the state, `F` something convertible to a future `F::Future`.
pub struct RecvMsg<R, F, S>
where
    F: IntoFuture<Item=(S, Vec<u8>), Error=()>,
    R: FnMut(S, Vec<u8>) -> F,
{
    recv_dgram: R,
    opt_read_future: Option<F::Future>,
}

The currently pending read operation now becomes Option<F::Future>, and the buffer lives in that pending operation.

This pending read operation needs to be polled whenever your wrapper gets polled (and you're not done completely yet)!

All in all it could look like this:

Playground

extern crate futures;
use self::futures::{IntoFuture,Future, Poll, Async};

pub struct RecvMsg<R, F, S>
where
    F: IntoFuture<Item=(S, Vec<u8>), Error=()>,
    R: FnMut(S, Vec<u8>) -> F,
{
    recv_dgram: R,
    pending: Option<F::Future>,
}

impl<R, F, S> RecvMsg <R, F, S>
where
    F: IntoFuture<Item=(S, Vec<u8>), Error=()>,
    R: FnMut(S, Vec<u8>) -> F,
{
    pub fn new(mut recv_dgram: R, initial_state: S) -> Self {
        let start = recv_dgram(initial_state, Vec::new()).into_future();
        RecvMsg{
            recv_dgram: recv_dgram,
            pending: Some(start)
        }
    }
}

impl<R, F, S> Future for RecvMsg <R, F, S>
where
    F: IntoFuture<Item=(S, Vec<u8>), Error=()>,
    R: FnMut(S, Vec<u8>) -> F,
{
    type Item = Vec<u8>;
    type Error = ();

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        match self.pending.take() {
            Some(mut p) => {
                match p.poll()? {
                    Async::Ready((state, buf)) => {
                        if buf.len() > 1024 {
                            // enough data
                            Ok(Async::Ready(buf))
                        } else {
                            // not enough data, read more
                            let next = (self.recv_dgram)(state, buf).into_future();
                            self.pending = Some(next);
                            Ok(Async::NotReady)
                        }
                    },
                    Async::NotReady => {
                        // still waiting for more data
                        self.pending = Some(p);
                        Ok(Async::NotReady)
                    },
                }
            },
            None => Ok(Async::NotReady), // already completed
        }
    }
}
Stefan
  • 5,304
  • 2
  • 25
  • 44