21

I'm using rust_bert for summarising text. I need to set a model with rust_bert::pipelines::summarization::SummarizationModel::new, which fetches the model from the internet. It does this asynchronously using tokio and the issue that (I think) I'm running into is that I am running the Tokio runtime within another Tokio runtime, as indicated by the error message:

Downloading https://cdn.huggingface.co/facebook/bart-large-cnn/config.json to "/home/(censored)/.cache/.rustbert/bart-cnn/config.json"
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /home/(censored)/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.21/src/runtime/enter.rs:38:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

I've tried running the model fetching synchronously with tokio::task::spawn_blocking and tokio::task::block_in_place but neither of them are working for me. block_in_place gives the same error as if weren't there, and spawn_blocking doesn't really seem to be of use to me. I've also tried making summarize_text async, but that didn't help much. Github Issue tokio-rs/tokio#2194 and Reddit post "'Cannot start a runtime from within a runtime.' with Actix-Web And Postgresql" seem similar (same-ish error message), but they weren't of much help in finding a solution.

The code I've got issues with is as follows:

use egg_mode::tweet;
use rust_bert::pipelines::summarization::SummarizationModel;

fn summarize_text(model: SummarizationModel, text: &str) -> String {
    let output = model.summarize(&[text]);
    // @TODO: output summarization
    match output.is_empty() {
        false => "FALSE".to_string(),
        true => "TRUE".to_string(),
    }
}

#[tokio::main]
async fn main() {
    let model = SummarizationModel::new(Default::default()).unwrap();

    let token = egg_mode::auth::Token::Bearer("obviously not my token".to_string());
    let tweet_id = 1221552460768202756; // example tweet

    println!("Loading tweet [{id}]", id = tweet_id);
    let status = tweet::show(tweet_id, &token).await;
    match status {
        Err(err) => println!("Failed to fetch tweet: {}", err),
        Ok(tweet) => {
            println!(
                "Original tweet:\n{orig}\n\nSummarized tweet:\n{sum}",
                orig = tweet.text,
                sum = summarize_text(model, &tweet.text)
            );
        }
    }
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Mib
  • 321
  • 1
  • 2
  • 4
  • It's hard to answer your question because it doesn't include a [MRE]. We can't tell what crates (and their versions), types, traits, fields, etc. are present in the code. It would make it easier for us to help you if you try to reproduce your error in a brand new Cargo project, then [edit] your question to include the complete example. There are [Rust-specific MRE tips](//stackoverflow.com/tags/rust/info) you can use to reduce your original code for posting here. Thanks! – Shepmaster Jun 23 '20 at 14:05
  • Do MREs also apply when the issue is "caused" by the crate in question? (rust_bert) —Either way, I'll add one now. One moment. (Edit: i.e. is it okay to use rust_bert in the mre?) – Mib Jun 23 '20 at 14:13
  • *is it okay to use rust_bert in the mre* — yes, but, like I said: *crates (**and their versions**)*. That version is required to be able to _reproduce_ the problem. – Shepmaster Jun 23 '20 at 14:18

2 Answers2

44

Solving the problem

This is a reduced example:

use tokio; // 1.0.2

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    inner_example();
}
thread 'main' panicked at 'Cannot start a runtime from within a runtime. This happens because a function (like `block_on`) attempted to block the current thread while the thread is being used to drive asynchronous tasks.', /playground/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.0.2/src/runtime/enter.rs:39:9

To avoid this, you need to run the code that creates the second Tokio runtime on a completely independent thread. The easiest way to do this is to use std::thread::spawn:

use std::thread;

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    thread::spawn(|| {
        inner_example();
    }).join().expect("Thread panicked")
}

For improved performance, you may wish to use a threadpool instead of creating a new thread each time. Conveniently, Tokio itself provides such a threadpool via spawn_blocking:

#[tokio::main]
async fn inner_example() {}

#[tokio::main]
async fn main() {
    tokio::task::spawn_blocking(|| {
        inner_example();
    }).await.expect("Task panicked")
}

In some cases you don't need to actually create a second Tokio runtime and can instead reuse the parent runtime. To do so, you pass in a Handle to the outer runtime. You can optionally use a lightweight executor like futures::executor to block on the result, if you need to wait for the work to finish:

use tokio::runtime::Handle; // 1.0.2

fn inner_example(handle: Handle) {
    futures::executor::block_on(async {
        handle
            .spawn(async {
                // Do work here
            })
            .await
            .expect("Task spawned in Tokio executor panicked")
    })
}

#[tokio::main]
async fn main() {
    let handle = Handle::current();

    tokio::task::spawn_blocking(|| {
        inner_example(handle);
    })
    .await
    .expect("Blocking task panicked")
}

See also:

Avoiding the problem

A better path is to avoid creating nested Tokio runtimes in the first place. Ideally, if a library uses an asynchronous executor, it would also offer the direct asynchronous function so you could use your own executor.

It's worth looking at the API to see if there is a non-blocking alternative, and if not, raising an issue on the project's repository.

You may also be able to reorganize your code so that the Tokio runtimes are not nested but are instead sequential:

struct Data;

#[tokio::main]
async fn inner_example() -> Data {
    Data
}

#[tokio::main]
async fn core(_: Data) {}

fn main() {
    let data = inner_example();
    core(data);
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • 1
    Would this allow me to retrieve a variable in the spawned thread? – Mib Jun 23 '20 at 14:16
  • 1
    As per "Avoiding the problem"/"looking at the API to see if there is a non-blocking alternative": so it's an issue with `rust_bert`? the first thing I did was look for an async version and there is none. (this kind of thing kind of "needs" to be blocking — everything else is dependent on it in my case) – Mib Jun 23 '20 at 14:19
  • @Mib "retrieve a variable" is not a well-defined concept, so I cannot answer that question. You can pass values into and return them from a thread, if that's what you mean. – Shepmaster Jun 23 '20 at 14:20
  • @Mib that's not what "blocking" means in this context. You have a set of operations that work in a specific sequence and require a specific order, sure. You don't want to block the entire thread from executing if there's another one of those sequences you could be working on while waiting for more input. In your case, you could be retrieving hundreds of tweets at the same time on a single thread. – Shepmaster Jun 23 '20 at 14:22
  • right, sorry: the blocking code I need to run in the thread returns a variable and I need to use that outside of the spawned thread. — I'm only doing this ONCE and everything else is dependent on it, therefore I'm fine (and want) with blocking before I begin fetching tweets. – Mib Jun 23 '20 at 14:22
  • @Mib give the examples on [the documentation for `thread::spawn`](https://doc.rust-lang.org/std/thread/fn.spawn.html) a read and see if your question is answered. – Shepmaster Jun 23 '20 at 14:24
  • @Mib although my last edit probably does what you want. – Shepmaster Jun 23 '20 at 14:27
  • ah, yep! The edit with running two tokio runtimes sequentially worked out for me. Code is a bit ugly, but it's fine by me. I'll wait a bit to see if any other answers are posted, but thank you very much. :) – Mib Jun 23 '20 at 14:42
  • I am also using `rust-bert` and tried the sequential pattern above, but still seeing the error. @Mib can you post your working code somewhere? – Alex Moore-Niemi Dec 25 '20 at 20:42
  • You can in fact return a value from a spawned thread. The same as you do for functions. So `let variable = std::thread::spawn(|| {...value}}).join().expect("Thread panicked");` – bobajeff Jan 21 '21 at 15:20
  • regarding retrieving the variable - you can always use channels to pass data back and forth. – WhiteStork Sep 09 '21 at 15:04
  • Can you mix and match "futures executor" with "tokio runtime"? – Brandon Ros May 26 '22 at 04:26
  • @BrandonRos sometimes. Some futures require a specific executor/runtime. For example, Tokio's time-related futures, such as [`Sleep`](https://docs.rs/tokio/1.19.2/tokio/time/struct.Sleep.html), require the [time driver](https://docs.rs/tokio/1.19.2/tokio/runtime/struct.Builder.html#method.enable_time) to be present. – Shepmaster Jun 21 '22 at 17:00
5

I had a similar issue using the QA Model while loading it into warp (a tokio runtime), sequential runtimes still weren't working for me, but I found my solution in the github issues of rust-bert. Solution was simply to wrap the initial loading call in task::spawn_blocking. This is fine for me because I can't accept any requests before it's loaded anyway. A snippet below in case it helps others.

   78 fn with_model(
   79     qa: QaModel, // alias for Arc<Mutex<QuestionAnsweringModel>>
   80 ) -> impl Filter<Extract = (QaModel,), Error = std::convert::Infallible>       + Clone {
   81     warp::any().map(move || qa.clone())
   82 }
   83
   84 #[tokio::main]
   85 async fn main() {
   86     env_logger::init();
   87 
   88     // NOTE: have to download the model before booting up
>> 89     let qa_model: QaModel = task::spawn_blocking(move || {
   90         log::debug!("setting up qa model config");
   91         let c = qa_model_config();
   92         log::debug!("finished setting up qa model config");
   93 
   94         log::debug!("setting up qa model");
   95         let m = qa_model(c);
   96         log::debug!("finished setting up qa model");
   97         m
   98     })
   99     .await
  100     .expect("got model");
  101 
  102     let ask_handler = warp::path!("ask")
  103         .and(warp::get())
  104         .and(warp::query::<QaQuery>())
  105         .and(with_model(qa_model))
  106         .and_then(ask);
  107 
  108     warp::serve(ask_handler).run(([127, 0, 0, 1], 3030)).await;
  109 }
Alex Moore-Niemi
  • 2,913
  • 2
  • 24
  • 22