3

I know this question has been asked many times, but I still can't figure out what to do (more below).

I'm trying to spawn a new thread using std::thread::spawn and then run an async loop inside of it.

The async function I want to run:

#[tokio::main] 
pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let mut scheduler = AsyncScheduler::new();

    scheduler.every(10.seconds()).run(move || {
        let arc_pool = pg_pool2.clone();
        let arc_config = config2.clone();
        async {
            pull_from_main(arc_pool, arc_config).await;
        }
    });

    tokio::spawn(async move {
        loop {
            scheduler.run_pending().await;
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
}

Spawning a thread to run in:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || async {
        pull_tweets(pg_pool2, config2).await;
    });
}

The error:

error[E0277]: `()` is not a future
  --> src/main.rs:89:9
   |
89 |         pull_tweets(pg_pool2, config2).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ `()` is not a future
   |
   = help: the trait `std::future::Future` is not implemented for `()`
   = note: required by `poll`

The last comment here does an amazing job at generalizing the problem. It seems at some point a return value is expected that implements IntoFuture and I don't have that. I tried adding Ok(()) to both the closure and the function, but no luck.

  • Adding into closure does literally nothing
  • Adding into async function gives me a new, but similar-sounding error:
`Result<(), ()>` is not a future

Then I noticed that the answer specifically talks about extension functions, which I'm not using. This also talks about extension functions.

Some other SO answers:

  • This is caused by a missing async.
  • This and this are reqwest library specific.

So none seem to work. Could someone help me understand 1)why the error exists here and 2)how to fix it?

Note 1: All of this can be easily solved by replacing std::thread::spawn with tokio::task::spawn_blocking. But I'm purposefully experimenting with thread spawn as per this article.

Note 2: Broader context on what I want to achieve: I'm pulling 150,000 tweets from twitter in an async loop. I want to compare 2 implementations: running on the main runtime vs running on separate thread. The latter is where I struggle.

Note 3: in my mind threads and async tasks are two different primitives that don't mix. Ie spawning a new thread doesn't affect how tasks behave and spawning a new task only adds work on existing thread. Please let me know if this worldview is mistaken (and what I can read).

ilmoi
  • 1,994
  • 2
  • 21
  • 45
  • 2
    You **await** `pull_tweets()`. At the same time, `#[tokio::main]` tells tokio that `pull_tweets()` is a "main" function, i.e. a sync function that will create an executor and block on it. You cannot have both: it must be either async or main. Just get rid of `#[tokio::main]` and see how it works then. – user4815162342 Jun 14 '21 at 08:40
  • Thanks for the answer. I peaked the idea of adding #[tokio::main] to a non-main function in another SO post I can't find right now. Your solution makes the code compile but it doesn't do anything unfortunately (ie the scheduler doesn't run). But going the other way around - removing async/await and keeping #[tokio::main] actually works! – ilmoi Jun 14 '21 at 17:06
  • Yeah, `thread::spawn(|| async ...)` doesn't make sense, you'd need something like `block_on()` there. You're right that it's ok to use `tokio::main` for something that's not `main()`, it's just a tool that converts an async function to sync. Note however that the default tokio executor is multi-threaded, so while you might consider threads and async different worlds, tokio doesn't agree, at least not by default. – user4815162342 Jun 14 '21 at 17:12

1 Answers1

2

#[tokio::main] converts your function into the following:

#[tokio::main] 
pub fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    let rt = tokio::Runtime::new();
    rt.block_on(async {
        let mut scheduler = AsyncScheduler::new();
        // ...
    });
}

Notice that it is a synchronous function, that spawns a new runtime and runs the inner future to completion. You do not await it, it is a separate runtime with it's own dedicate thread pool and scheduler:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || {
        pull_tweets(pg_pool2, config2);
    });
}

Note that your original example was wrong in another way:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || async {
        pull_tweets(pg_pool2, config2).await;
    });
}

Even if pull_tweets was an async function, the thread would not do anything, as all you do is create another future in the async block. The created future is not executed, because futures are lazy (and there is no executor context in that thread anyways).

I would structure the code to spawn the runtime directly in the new thread, and call any async functions you want from there:

#[actix_web::main]
async fn main() -> std::io::Result<()> {
    let handle = thread::spawn(move || {
        let rt = tokio::runtime::Builder::new_multi_thread()
            .enable_all()
            .build()
            .unwrap();
        rt.block_on(async {
            pull_tweets(pg_pool2, config2).await;
        });
    });
}

pub async fn pull_tweets(pg_pool2: Arc<PgPool>, config2: Arc<Settings>) {
    // ...
}
Ibraheem Ahmed
  • 11,652
  • 2
  • 48
  • 54
  • Hey - thanks for the comment! The part about removing .await was on point. I had to do one extra bit (which is weird, and I think is clokwerk library specific) - I had to remove the tokio::spawn block around the loop (so keep just the loop). Then everything works. Btw the #[tokio::main] with no .await also works. – ilmoi Jun 14 '21 at 17:02