2

I'm trying to use Future.rs to manage some tasks in a separate process. I see how to wait for each created future and how to process them one after the other, but I wasn't able to poll the future during its execution to know its state. I always have the error:

thread 'main' panicked at 'no Task is currently running'

I want to do something during the future processing until it finishes. Perhaps I'm not using it the right way? I managed to make it work by using a channel, but I think it should be possible to poll the future and when it's ready get the result. The code I use to test it is:

fn main() {
    println!("test future");
    let thread_pool = CpuPool::new(4);
    let mut future_execution_list = vec![];
    let mutex = Arc::new(AtomicUsize::new(0));
    //create the future to process
    for _ in 0..10 {
        let send_mutex = mutex.clone();
        let future = thread_pool.spawn_fn(move || {
            //Simulate long processing
            thread::sleep(time::Duration::from_millis(10));
            let num = send_mutex.load(Ordering::Relaxed);
            send_mutex.store(num + 1, Ordering::Relaxed);
            let res: Result<usize, ()> = Ok(num);
            res

        });
        future_execution_list.push(future);
    }
    // do the job
    loop {
        for future in &mut future_execution_list {
            match future.poll() {
                Ok(Async::NotReady) => (), //do nothing
                Ok(Async::Ready(num)) => {
                    //update task status
                    println!(" future {:?}", num);
                }
                Err(_) => {
                    //log error and set task status to err
                    ()
                }
            };
        }
        //do something else
    }
}

So I complete my question after Shepmaster answer. Your remarks are very interesting but I still can't find a solutin to my problem. I'll add some informations about my problem. I want to schedule tasks on a automate that can manage several tasks at a time. There is a loop where event are managed and tasks scheduling is calculated. When a task is scheduled, it's spawn. When a task ends a new scheduling is done. During task execution, event are managed. A speudo code can be:

loop {
   event.try_recv() { ...} //manage user command for exemple
   if (schedule) {
      let tasks_to_spawn = schedule_task();
      let futures = tasks_to_spawn.map(|task| {
           thread_pool.spawn_fn( ....)}); 
      let mut one = future::select_all(futures);
      while let Ok((value, _idx, remaining)) = one.wait() {..} //wait here
   }
   //depend on task end state and event  set schedule to true or false.

}

I can joint the scheduling and the task in a future like :

let future = schedule.and_them(|task| execute_task);

But I still need to wait the end of the execution of the first task. I can put everything in a future (event management, schedule, task) and wait the first one that end like you propose. I try but I didn't see how to make a vec of future with different Item and Error type. And with this conception I have to manage more data between thread. Event management and scheduling doesn't have to be executed in a different thread.

I see another problem, select_all take the ownership of the vec. If a new task has to be sheduled during the execution of the other how can I change the vec and add the new future?

Don't know if you have a simple solution. I was thinking that it was simple to get the state of the future during its execution with a method like isDone() without to wait. Perhaps it's planned, I didn't see a PR about that. If you have a simple solution it would be great otherwise I'll rethink my conception.

Phil D
  • 29
  • 1
  • 3
  • There's a 99.9% chance that you do **not** want to use atomic variables in that manner. Instead, you want `fetch_add` and the vast majority of people do not want `Relaxed` ordering. – Shepmaster Jan 23 '17 at 16:19
  • Your right, I just copy / paste some code to show that I want to get a result from the future execution that depends on the execution of the other futures. – Phil D Jan 23 '17 at 20:55
  • It is [*extremely bad form* to change your question **after you received answers**, especially when the change invalidates those answers](http://meta.stackoverflow.com/q/309237/155423). It is on the question asker to ask a good question that includes any pertinent details from the beginning. – Shepmaster Jan 24 '17 at 14:17
  • So I can't change my question (I understand why), I can't reply to the answer. My reply was deleted. So How can I add deeper explanation to the initial question, specially when they refer to the answers , use the comment? I'am really not used to the stackaverflow way to process. Usually the flow of discution allow to follow the evolution. Next time when I'll have a complexe question I will use Reddit or other forums. Thanks for your remarks and your help to improve my use of stackoverflow. – Phil D Jan 24 '17 at 16:12
  • You absolutely **can** reply to an answer. That's what the comments below each answer are for (just like the comments below this question). Stack Overflow is a question and answer site. That doesn't fit everyone's style, and that's OK. [/r/rust](https://www.reddit.com/r/rust/), [the user's forum](https://users.rust-lang.org/) or [IRC](https://www.rust-lang.org/en-US/community.html) are great places to have actual discussions about your Rust code. – Shepmaster Jan 24 '17 at 16:26
  • One thing to recall is that answering *an individual's* question isn't the main purpose of SO. Answering a question that many people may have (which the original formation seemed to fit) once so that it can be found and used again and again is. Building up a library of good answers allows us to move forward as an industry and avoid having to repeat ourselves over and over. – Shepmaster Jan 24 '17 at 16:32
  • I didn't know. Now I'm aware thanks to your help. I understand stackoverflow's positioning, so I hope I will use it on purpose know. – Phil D Jan 24 '17 at 17:19
  • Any idea if future.rs will propose future state access during processing? – Phil D Jan 24 '17 at 17:20
  • I'm not even sure I understand what you mean by that. However, you may want to jump into https://gitter.im/tokio-rs/tokio to ask tokio- / future-related questions about long-term planning. – Shepmaster Jan 24 '17 at 22:10
  • My last comment after I repost my question on a forum to allow a better discussion(I don't have access to the chat). What I think is in Pytgon: [Done](https://docs.python.org/3/library/asyncio-task.html#asyncio.Future.done) or in Java [isDone](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Future.html). I'll investigate a little more to have a better understanding. – Phil D Jan 25 '17 at 08:11

2 Answers2

2

To poll a Future you must have a Task. To get a Task you can poll in a Future passed to futures::executor::spawn(). If you re-write the loop of your example like so:

futures::executor::spawn(futures::lazy(|| {
    // existing loop goes here
})).wait_future();

It runs.

As to why a Future can only be polled in a task, I believe it's so that polling can call Task::Unpark.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Euan Rochester
  • 183
  • 1
  • 8
0

I want to do something during the future processing

As I understand it, that's what futures are - things that can happen in parallel. If you want to do something else, then make another future and throw it in!

You are basically already doing this - each of your threads is "doing something else".

poll the future and when it's ready get the result

Using future::select_all, you can combine multiple futures and get whichever finishes first. It is then up to you decide to wait for the next one.

One potential implementation:

extern crate rand;
extern crate futures;
extern crate futures_cpupool;

use rand::Rng;
use futures::{future, Future};
use futures_cpupool::CpuPool;

use std::{thread, time};

fn main() {
    let thread_pool = CpuPool::new(4);

    let futures = (0..10).map(|i| {
        thread_pool.spawn_fn(move || -> Result<usize, ()> {
            let mut rng = rand::thread_rng();
            // Simulate long processing
            let sleep_time = rng.gen_range(10, 100);
            let sleep_time = time::Duration::from_millis(sleep_time);
            for _ in 0..10 {
                println!("Thread {} sleeping", i);
                thread::sleep(sleep_time);
            }
            Ok(i)
        })
    });

    let mut one = future::select_all(futures);
    while let Ok((value, _idx, remaining)) = one.wait() {
        println!("Future #{} finished", value);
        if remaining.is_empty() {
            break;
        }
        one = future::select_all(remaining);
    }
}

During the call to wait, multiple things are happening concurrently! This can be seen by the interleaved output:

Thread 2 sleeping
Thread 0 sleeping
Thread 3 sleeping
Thread 1 sleeping
Thread 3 sleeping
Thread 0 sleeping
Thread 1 sleeping
Thread 2 sleeping
Thread 3 sleeping

You can verify that things are happening in parallel by setting the sleep time to 1 second for each thread and timing the overall program. Since there are 10 futures, taking 1 second, with a parallelism of 4, the overall program takes 3 seconds to run.


Bonus code review:

  1. Don't split loading and setting an atomic variable to implement incrementing - the stored value might have been changed by another thread between the two calls. Use fetch_add.
  2. You really, really should know what the orderings mean before using them. I do not know them, so I always use SeqCst.
  3. Since it wasn't important for this example, I removed the atomic variable completely.
  4. Always prefer collecting into a Vec instead of pushing into it inside a loop. This allows for more optimized allocation.
  5. In this case, there's no need for a Vec at all as select_all accepts anything that can be turned into an iterator.
Community
  • 1
  • 1
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366