5

Consider the following code

extern crate futures; // v0.1 (old)

use std::sync::{atomic, Arc};
use futures::*;

struct F(Arc<atomic::AtomicBool>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        if self.0.load(atomic::Ordering::Relaxed) {
            Ok(Async::Ready(()))
        } else {
            Ok(Async::NotReady)
        }
    }
}

fn main() {
    let flag = Arc::new(atomic::AtomicBool::new(false));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        flag.store(true, atomic::Ordering::Relaxed);
    });
    // ::std::thread::sleep_ms(20);
    let result = future.wait();
    println!("result: {:?}", result);
}

The spawned thread sets a flag, which the future waits for. We also sleep the spawned thread, so the initial .poll() call from .wait() is before the flag is set. This causes .wait() to block (seemingly) indefinitely. If we uncomment the other thread::sleep_ms, .wait() returns, and prints out the result (()).

I would expect the current thread to try to resolve the future by calling poll multiple times, since we're blocking the current thread. However, this is not happening.

I have tried to read some docs, and it seems like the problem is that the thread is parked after getting NotReady from the poll the first time. However, it is not clear to me why this is, or how it is possible to work around this.

What am I missing?

Kornel
  • 97,764
  • 37
  • 219
  • 309
MartinHaTh
  • 1,417
  • 1
  • 13
  • 25

1 Answers1

5

Why would you need to park a waiting future instead of polling it repeatedly? The answer is rather obvious, IMHO. Because at the end of the day it's faster and more efficient!

To repeatedly poll a future (which might be dubbed "busy-waiting") the library would have to decide whether to do it often or seldom and neither answer is satisfactory. Do it often and you're wasting the CPU cycles, do it seldom and the code is slow to react.

So yeah, you need to park the task when you're waiting for something and then unpark it when you've done waiting. Like this:

#![allow(deprecated)]

extern crate futures;

use std::sync::{Arc, Mutex};
use futures::*;
use futures::task::{park, Task};

struct Status {
    ready: bool,
    task: Option<Task>,
}

#[allow(dead_code)]
struct F(Arc<Mutex<Status>>);

impl Future for F {
    type Item = ();
    type Error = ();

    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
        println!("Check if flag is set");
        let mut status = self.0.lock().expect("!lock");
        if status.ready {
            Ok(Async::Ready(()))
        } else {
            status.task = Some(park());
            Ok(Async::NotReady)
        }
    }
}

#[test]
fn test() {
    let flag = Arc::new(Mutex::new(Status {
                                       ready: false,
                                       task: None,
                                   }));
    let future = F(flag.clone());
    ::std::thread::spawn(move || {
        ::std::thread::sleep_ms(10);
        println!("set flag");
        let mut status = flag.lock().expect("!lock");
        status.ready = true;
        if let Some(ref task) = status.task {
            task.unpark()
        }
    });
    let result = future.wait();
    println!("result: {:?}", result);
}

Note that Future::poll is doing several things here: it's checking for an external condition and it's parking the task, so it's possible to have a race, like when:

  1. the poll checks the variable and finds it to be false;
  2. the outer code sets the variable to true;
  3. the outer code checks if the task is parked and finds that it's not;
  4. the poll parks the task, but boom! it is too late, nobody is going to unpark it any longer.

In order to avoid any races, I've used a Mutex to synchronize these interactions.

P.S. If all you need is to wrap a thread result into a Future then consider using the oneshot channel: it has the Receiver that implements the Future interface already.

ArtemGr
  • 11,684
  • 3
  • 52
  • 85
  • 11
    *The answer is rather obvious, IMHO* — if the answer were obvious, people wouldn't need to ask the question ^_^ – Shepmaster Apr 19 '17 at 21:52
  • @Shepmaster It's a valid jibe, but people also ask questions to confirm something or to vent their frustrations, et cetera. = ) By saying that the answer is obvious I'm stating that the matter is simple, no need to overthink it. Besides, "Elementary, my dear Watson" : ) – ArtemGr Apr 19 '17 at 21:53
  • 1
    Alright! I understood the motivation behind the parking scheme, but I didn't realize that I had to handle the `park`ing stuff manually (alrhough it makes sense - how would `futures` know when the value is ready?). Thanks! – MartinHaTh Apr 20 '17 at 08:54
  • When I read it the first time, I really don't understand what's park()/unpark() mean, now the function has changed it's name to current()/notify(). Thanks to @Shepmaster for answer my question again. To someone may not very understand this, you could read https://stackoverflow.com/questions/58377995/why-is-my-future-implementation-blocked-after-it-is-polled-once-and-notready/58378572 for more. – wonderflow Oct 15 '19 at 03:41