I have encountered a very weird problem, while using crossbeam
channels in combination with tokio
.
Seemingly without allocating much memory, my program just stack overflows in debug mode.
But when I decided to replace the unbounded
channels with bounded
ones and thus allocating more memory to the stack everything seems to work, even in debug mode.
On another application I am writing, this is happening too, but without using tokio
and I am required to run it in release
mode.
I tried to wrap everything that might consume a lot of memory and is stack-allocated in a Box
, but without any success.
My question is if I should be worried about this, or just run my program in release
mode without any further concerns.
I am writing a server for my game, so stability is very important.
EDIT
I found out that the rust compiler apparently performs "tail-call optimization" which allow an infinite loop in release mode, source
And because I am running 2 infinite loops in both programs this explains, that they only work in release
mode I think.
minimal working example
communication.rs
use crate::channel::*;
use std::mem::forget;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::runtime;
use tokio::io::AsyncWriteExt;
pub struct Communicator {
event_sender: Sender<[u8; 255]>,
event_receiver: Receiver<[u8; 255]>,
}
impl Communicator {
pub fn init<H: ToSocketAddrs>(host: H) -> Option<Self> {
let rt = runtime::Builder::new_multi_thread()
.enable_io()
.build()
.unwrap();
if let Some((event_sender, event_receiver)) = rt.block_on(async move {
if let Ok(socket) = TcpStream::connect(host).await {
let (mut read, mut write) = socket.into_split();
let (ev_sender_tx, ev_sender_rx): (Sender<[u8; 255]>, Receiver<[u8; 255]>) = channel();
let (ev_receiver_tx, ev_receiver_rx) = channel();
// rx
tokio::spawn(async move {
loop {
let mut buffer: [u8; 255] = [0; 255];
ev_receiver_tx.send(buffer).unwrap();
}
});
// tx
tokio::spawn(async move {
loop {
if let Some(event) = ev_sender_rx.recv() {
write.write_all(&event).await;
}
}
});
Some((ev_sender_tx, ev_receiver_rx))
} else {
None
}
}) {
// not allowed to run destructor, must leak
forget(rt);
Some(Self { event_sender, event_receiver })
} else {
None
}
}
}
channel.rs
use crossbeam::channel;
use crossbeam::channel::SendError;
#[derive(Debug, Clone)]
pub struct Receiver<T> {
inner: channel::Receiver<T>,
}
impl<T> Receiver<T> {
pub fn recv(&self) -> Option<T> {
self.inner.recv().ok()
}
#[allow(dead_code)]
pub fn try_recv(&self) -> Option<T> {
self.inner.try_recv().ok()
}
}
#[derive(Debug, Clone)]
pub struct Sender<T> {
inner: channel::Sender<T>,
}
impl<T> Sender<T> {
pub fn send(&self, data: T) -> Result<(), SendError<T>> {
self.inner.send(data)
}
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let (s, r) = channel::unbounded();
(
Sender {inner: s},
Receiver {inner: r},
)
}
// will block if full
pub fn bounded_channel<T>(bound: usize) -> (Sender<T>, Receiver<T>) {
let (s, r) = channel::bounded(bound);
(
Sender {inner: s},
Receiver {inner: r},
)
}