4

I'm working with Tokio doing some UDP stuff.

I want to record the amount of time my UDP probe future takes to resolve. I came up with the following function, time_future(), to wrap a future and give me the result and a duration. The function seems very naive and I think Rust has the power to express the concept much more cleanly.

My working code (Playground):

extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11

use std::time::{Duration, Instant};

use futures::future::{lazy, ok};
use futures::Future;
use tokio::runtime::current_thread::Runtime;
use tokio::timer::Delay;

struct TimedFutureResult<T, E> {
    elapsed: Duration,
    result: Result<T, E>,
}

impl<T, E> TimedFutureResult<T, E> {
    pub fn elapsed_ms(&self) -> i64 {
        return (self.elapsed.as_secs() * 1000 + (self.elapsed.subsec_nanos() / 1000000) as u64)
            as i64;
    }
}

fn time_future<F: Future>(f: F) -> impl Future<Item = TimedFutureResult<F::Item, F::Error>> {
    lazy(|| {
        let start = Instant::now();

        f.then(move |result| {
            ok::<TimedFutureResult<F::Item, F::Error>, ()>(TimedFutureResult {
                elapsed: start.elapsed(),
                result: result,
            })
        })
    })
}

fn main() {
    let when = Instant::now() + Duration::from_millis(100);

    let f = time_future(Delay::new(when)).then(|r| match r {
        Ok(r) => {
            println!("resolved in {}ms", r.elapsed_ms());
            r.result
        }
        _ => unreachable!(),
    });

    let mut runtime = Runtime::new().unwrap();
    runtime.block_on(f).unwrap();
}

How can I improve this and make it more idiomatic? Can I somehow get the interface to work similarly to inspect() or then()?

Delay::new(when)
  .timed(|res, elapsed| println!("{}ms!", elapsed))
  .and_then(...);

I tried creating a Timed trait and implementing it for Future but I didn't feel at all confident in how I was going about it. The types just really threw me for a loop.

Am I at least barking up the right tree?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • *Questions asking us to recommend or find a book, tool, software library, tutorial or other off-site resource are off-topic for Stack Overflow* — I've removed your sentence pertaining to that. Besides that, this is a top-notch first question; it's clear, concise, provides a code example, etc. Nicely done! – Shepmaster Nov 14 '18 at 02:33

2 Answers2

2

Shepmaster's answer is great. However, the version of futures they used is out-of-date and incompatible with stdlib futures. Here's my rewrite to use stdlib futures.

use pin_project::pin_project;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
use tokio::time; // 0.2

/// A wrapper around a Future which adds timing data.
#[pin_project]
struct Timed<Fut, F>
where
    Fut: Future,
    F: FnMut(&Fut::Output, Duration),
{
    #[pin]
    inner: Fut,
    f: F,
    start: Option<Instant>,
}

impl<Fut, F> Future for Timed<Fut, F>
where
    Fut: Future,
    F: FnMut(&Fut::Output, Duration),
{
    type Output = Fut::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
        let this = self.project();
        let start = this.start.get_or_insert_with(Instant::now);

        match this.inner.poll(cx) {
            // If the inner future is still pending, this wrapper is still pending.
            Poll::Pending => Poll::Pending,

            // If the inner future is done, measure the elapsed time and finish this wrapper future.
            Poll::Ready(v) => {
                let elapsed = start.elapsed();
                (this.f)(&v, elapsed);

                Poll::Ready(v)
            }
        }
    }
}

trait TimedExt: Sized + Future {
    fn timed<F>(self, f: F) -> Timed<Self, F>
    where
        F: FnMut(&Self::Output, Duration),
    {
        Timed {
            inner: self,
            f,
            start: None,
        }
    }
} 

// All futures can use the `.timed` method defined above
impl<F: Future> TimedExt for F {}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(100);

    let fut = time::delay_until(when.into())
        .timed(|res, elapsed| println!("{:?} elapsed, got {:?}", elapsed, res));
    fut.await;
}
Adam
  • 482
  • 1
  • 4
  • 12
1

The act of writing the future is easy enough, and adding a chainable method is the same technique as that shown in How can I add new methods to Iterator?.

The only really tricky aspect is deciding when the time starts — is it when the future is created or when it is first polled?

I chose to use when it's first polled, as that seems more useful:

extern crate futures; // 0.1.25
extern crate tokio; // 0.1.11

use std::time::{Duration, Instant};

use futures::{try_ready, Async, Future, Poll};
use tokio::{runtime::current_thread::Runtime, timer::Delay};

struct Timed<Fut, F>
where
    Fut: Future,
    F: FnMut(&Fut::Item, Duration),
{
    inner: Fut,
    f: F,
    start: Option<Instant>,
}

impl<Fut, F> Future for Timed<Fut, F>
where
    Fut: Future,
    F: FnMut(&Fut::Item, Duration),
{
    type Item = Fut::Item;
    type Error = Fut::Error;

    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
        let start = self.start.get_or_insert_with(Instant::now);

        let v = try_ready!(self.inner.poll());

        let elapsed = start.elapsed();
        (self.f)(&v, elapsed);

        Ok(Async::Ready(v))
    }
}

trait TimedExt: Sized + Future {
    fn timed<F>(self, f: F) -> Timed<Self, F>
    where
        F: FnMut(&Self::Item, Duration),
    {
        Timed {
            inner: self,
            f,
            start: None,
        }
    }
}

impl<F: Future> TimedExt for F {}

fn main() {
    let when = Instant::now() + Duration::from_millis(100);

    let f = Delay::new(when).timed(|res, elapsed| println!("{:?} elapsed, got {:?}", elapsed, res));

    let mut runtime = Runtime::new().unwrap();
    runtime.block_on(f).unwrap();
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366