3

I have a setup where my program spawns several threads for CPU-bound computation using the std::thread::spawn.

I need a GRPC server to handle incoming commands and also stream outputs done by the worker threads. I'm using tonic for the GRPC server, and it only offers an async implementation inside a Tokio future.

I need to be able to send messages from my "normal" standard-library threads to the Tokio future.

I've boiled my code down the the minimum here:

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {}
    });

    let h = thread::spawn(move || {
        // do work
        tx.send(1).await; //<------ error occurs here since I can't await in a non-async block
    });

    h.join().unwrap();
}

How can my main worker threads communicate with the Tokio-spawned GRPC server?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
l3utterfly
  • 2,106
  • 4
  • 32
  • 58
  • Use an unbounded channel, so that the non-async thread does not need to wait. – Colonel Thirty Two Jul 28 '21 at 13:21
  • `mpsc::channel(1)` here is a tokio channel, I see the function must take a buffer > 0 – l3utterfly Jul 28 '21 at 13:22
  • I need the `tokio::sync::mpsc` channel because inside the tokio runtime, I need the `recv().await` so it doesn't block – l3utterfly Jul 28 '21 at 13:22
  • 1
    https://docs.rs/tokio/1.9.0/tokio/sync/mpsc/fn.unbounded_channel.html `UnboundedSender.send` is not async and does not block. – Colonel Thirty Two Jul 28 '21 at 13:26
  • 1
    using unbounded queues is generally a very bad idea as it can lead to OOM and other kinds of resource exhaustion. Using a bounded channel with [blocking_send()](https://docs.rs/tokio/1.9.0/tokio/sync/mpsc/struct.Sender.html#method.blocking_send) is better – Svetlin Zarev Jul 28 '21 at 13:38

1 Answers1

5

You can use tokio's sync features. There are two options - UnboundedSender and Sender::blocking_send().

The issue with the unbounded sender is that it does not have back-pressure and if your producer is faster than the consumer your application may crash with an out-of-memory error or exhaust other limited resources your producer uses.

As a general rule, you should avoid using unbounded queues, which leaves us with the better option of using blocking_send():

Playground:

use std::thread;
use tokio::sync::mpsc; // 1.9.0

fn main() {
    let (tx, mut rx) = mpsc::channel(1);

    let tokio_runtime = tokio::runtime::Runtime::new().unwrap();
    tokio_runtime.spawn(async move {
        // the code below starts the GRPC server in reality, here I'm just demonstrating trying to receive a message
        while let Some(v) = rx.recv().await {
            println!("Received: {:?}", v);
        }
    });

    let h = thread::spawn(move || {
        // do work
        tx.blocking_send(1).unwrap();
    });

    h.join().unwrap();
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Svetlin Zarev
  • 14,713
  • 4
  • 53
  • 82