-3

I have encountered a very weird problem, while using crossbeam channels in combination with tokio.

Seemingly without allocating much memory, my program just stack overflows in debug mode. But when I decided to replace the unbounded channels with bounded ones and thus allocating more memory to the stack everything seems to work, even in debug mode.

On another application I am writing, this is happening too, but without using tokio and I am required to run it in release mode.

I tried to wrap everything that might consume a lot of memory and is stack-allocated in a Box, but without any success.

My question is if I should be worried about this, or just run my program in release mode without any further concerns.

I am writing a server for my game, so stability is very important.

EDIT

I found out that the rust compiler apparently performs "tail-call optimization" which allow an infinite loop in release mode, source

And because I am running 2 infinite loops in both programs this explains, that they only work in release mode I think.

minimal working example

communication.rs

use crate::channel::*;
use std::mem::forget;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::runtime;
use tokio::io::AsyncWriteExt;

pub struct Communicator {
    event_sender: Sender<[u8; 255]>,
    event_receiver: Receiver<[u8; 255]>,
}
impl Communicator {
    pub fn init<H: ToSocketAddrs>(host: H) -> Option<Self> {
        let rt = runtime::Builder::new_multi_thread()
            .enable_io()
            .build()
            .unwrap();
        
        if let Some((event_sender, event_receiver)) = rt.block_on(async move {
            if let Ok(socket) = TcpStream::connect(host).await {
                let (mut read, mut write) = socket.into_split();

                let (ev_sender_tx, ev_sender_rx): (Sender<[u8; 255]>, Receiver<[u8; 255]>) = channel();
                let (ev_receiver_tx, ev_receiver_rx) = channel();

                // rx
                tokio::spawn(async move {
                    loop {
                        let mut buffer: [u8; 255] = [0; 255];
                        ev_receiver_tx.send(buffer).unwrap();
                    }
                });
                // tx
                tokio::spawn(async move {
                    loop {
                        if let Some(event) = ev_sender_rx.recv() {
                            write.write_all(&event).await;
                        }
                    }
                });

                Some((ev_sender_tx, ev_receiver_rx))
            } else {
                None
            }
        }) {
            // not allowed to run destructor, must leak
            forget(rt);
            Some(Self { event_sender, event_receiver })
        } else {
            None
        }
    }
}

channel.rs

use crossbeam::channel;
use crossbeam::channel::SendError;

#[derive(Debug, Clone)]
pub struct Receiver<T> {
    inner: channel::Receiver<T>,
}
impl<T> Receiver<T> {
    pub fn recv(&self) -> Option<T> {
        self.inner.recv().ok()
    }

    #[allow(dead_code)]
    pub fn try_recv(&self) -> Option<T> {
        self.inner.try_recv().ok()
    }
}

#[derive(Debug, Clone)]
pub struct Sender<T> {
    inner: channel::Sender<T>,
}
impl<T> Sender<T> {
    pub fn send(&self, data: T) -> Result<(), SendError<T>> {
        self.inner.send(data)
    }
}

pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
    let (s, r) = channel::unbounded();
    (
        Sender {inner: s},
        Receiver {inner: r},
    )
}

// will block if full
pub fn bounded_channel<T>(bound: usize) -> (Sender<T>, Receiver<T>) {
    let (s, r) = channel::bounded(bound);
    (
        Sender {inner: s},
        Receiver {inner: r},
    )
}
BrunoWallner
  • 417
  • 3
  • 10
  • This question lacks a [minimal, reproducible example](https://stackoverflow.com/help/minimal-reproducible-example). – kotatsuyaki Jun 12 '22 at 16:11
  • Please add a `main` function to your example and make sure it reproduces your problem. – Sprite Jun 12 '22 at 16:48
  • 1
    Tail-call optimization has to do with recursive functions, not infinite loops. Also, your example code is syntactically invalid -- it doesn't even parse as valid Rust code. – cdhowie Jun 12 '22 at 17:45

2 Answers2

1
loop {
    let mut buffer: [u8; 255] = [0; 255];
    ev_receiver_tx.send(buffer).unwrap();
}

Unless I'm understanding something wrong, this loop just fills up the channel completely. And for an unbounded channel, I'm not surprised that this results in an infinite amount of memory being used. What's the purpose of this loop, or is that just for the example?

If you just run this loop on its own, with an unbounded channel, you can watch the memory get eaten until the program crashes.

But when I decided to replace the unbounded channels with bounded ones and thus allocating more memory to the stack everything seems to work, even in debug mode.

I don't quite follow ... how does replacing the channels change the amount of memory allocated to the stack? This actually limits the amount of memory a channel could take if flooded with items, which seems to be the root cause of the crash in this case. I'm still a bit uncertain, though, because this should cause an out-of-memory error, not a stack overflow.


// not allowed to run destructor, must leak
forget(rt);

That one is also highly suspicious to me. According to its documentation, dropping the runtime handle is the official way of shutting down the runtime, so calling forget on it sounds to me like a memory leak and incorrect behaviour. forget really is one of those functions that don't serve much of a purpose unless paired with some form of unsafe code.

If you added this line because it complained that it cannot destroy it while it is borrowed by event_sender and event_receiver, then that was definitely the wrong way to 'fix' that problem.

I think the general approach (if I understand it correctly) of spawning an async runtime for every connection is not possible the way you implemented it. Especially a multi_thread runtime will spawn a lot of threads as you get more and more connections, and as you forget the runtime handle, those threads will never go away again, even if the users disconnect.

I think a more useful way would be to have a reusable multi_thread runtime somewhere (which is already capable of using 100% of your cpu) and then to use tokio::spawn to create new tasks for the new connection in your init function, as you already do.


My question is if I should be worried about this, or just run my program in release mode without any further concerns.

Yes, in my opinion, you should definitely be worried about this. Normal healthy Rust programs do not just randomly stack overflow in debug mode. This is a strong indication of a programming error.

Try to use memory profiling tools to find out where the memory usage actually comes from.


I found out that the rust compiler apparently performs "tail-call optimization" which allow an infinite loop in release mode

I think you misunderstood that post. It means:

In recursion, the compiler might reduce the usage of the stack by replacing the recursive function with a loop. Here is a discussion about how to the compiler might achieve this.

This does not and under no circumstance mean that the compiler just randomly decides to deadlock in an infinite loop in release. That would be horrible and would take all credibility from the Rust compiler and the language as a whole.

The only reason this article mentioned "infinite" loops is because it was an infinite unbounded recursion to begin with. That's what the article was actually about, to show how fast a stack overflow happens if you cause one on purpose, not how to prevent it.

Unless you perform recursion, it's almost impossible to cause a stack overflow in Rust (and in most languages, for that matter). Memory leaks almost exclusively happen on the heap, as memory usage on the stack is in almost all circumstances already known at compile time and stays within a constant limit. (again, recursions excluded) Further, almost all data structures that store large amounts of data store the data on the heap. Among other things, this is done to prevent excessive copies; memory on the stack usually gets moved around a lot, while memory on the heap stays at a constant location.

Finomnis
  • 18,094
  • 1
  • 20
  • 27
  • This is code from the client and I did not plan to handle more than one connection, nor to spawn any more async tasks. My only goal was to spawn 2 "green" threads and a `TcpStream` and then let the runtime run until the program is terminated. and it at least seems to work like I have expected. – BrunoWallner Jun 12 '22 at 20:49
  • I thought that sending data through an `unbounded` channel results in the data being allocated on the heap, but if this is not the case I think I might have found the problem. I am sending huge amount of data that implements `Copy`, does that mean it gets stack allocated? – BrunoWallner Jun 12 '22 at 20:54
  • It is getting allocated on the heap. But why would you want it to be unbounded if you just push data into it as fast as possible? You definitly want something with backpressure imo. Fyi I'm fairly certain that the bounded channels also lie on the heap. – Finomnis Jun 12 '22 at 23:12
0

I falsely assumed that sending data through an unbounded channel results in the data that is being sent to be stored on the heap and data that is being sent through an bounded channel to be stored on the stack if the datatype allows that.

What seems to fix this issue was to just wrap the data that is sent in a Box and thus forcing a heap allocation.

The code would now look somewhat like this:

use crate::channel::*;
use std::mem::forget;
use tokio::net::{TcpStream, ToSocketAddrs};
use tokio::runtime;
use tokio::io::AsyncWriteExt;

pub struct Communicator {
    event_sender: Sender<Box<[u8; 255]>>,
    event_receiver: Receiver<Box<[u8; 255]>>,
}
impl Communicator {
    pub fn init<H: ToSocketAddrs>(host: H) -> Option<Self> {
        let rt = runtime::Builder::new_multi_thread()
            .enable_io()
            .build()
            .unwrap();
        
        if let Some((event_sender, event_receiver)) = rt.block_on(async move {
            if let Ok(socket) = TcpStream::connect(host).await {
                let (mut read, mut write) = socket.into_split();

                let (ev_sender_tx, ev_sender_rx): (Sender<Box<[u8; 255]>>, Receiver<Box<[u8; 255]>>) = channel();
                let (ev_receiver_tx, ev_receiver_rx) = channel();

                // rx
                tokio::spawn(async move {
                    loop {
                        let mut buffer: [u8; 255] = [0; 255];
                        ev_receiver_tx.send(Box::new(buffer)).unwrap();
                    }
                });
                // tx
                tokio::spawn(async move {
                    loop {
                        if let Some(event) = ev_sender_rx.recv() {
                            write.write_all(&(*event)).await;
                        }
                    }
                });

                Some((ev_sender_tx, ev_receiver_rx))
            } else {
                None
            }
        }) {
            // not allowed to run destructor, must leak
            forget(rt);
            Some(Self { event_sender, event_receiver })
        } else {
            None
        }
    }

main.rs

mod communicator;
mod channel;

fn main() {
    let comm = com::Communicator::init("0.0.0.0:8000");;

    std::thread::park();
}

EDIT

The problem was that I had a data structure somewhere else in the code, which tried to store a big 3-dimensional array.

Just converting [[[u16; 32]; 32]; 32] to Box<[[[u16; 32]; 32]; 32]> worked.

BrunoWallner
  • 417
  • 3
  • 10
  • That is just incorrect. I don't think you understand the difference between bounded and unbounded channels. Both most likely live on the heap. The difference is: the bounded channel only buffers a limited amount of items and then applies backpressure, meaning it blocks at the `send` method until items get retrieved with the `receive` method. The unbounded channel can buffer an infinite amount of items, so it should be obvious why sending an infinite amount of items into it is a really bad idea. – Finomnis Jun 12 '22 at 23:18
  • I'm really confused by all of this. Don't you want to use the `read` object somewhere? What's the point of an `event_receiver` object that just endlessly throws out arrays of zeros? Or is that just for demonstration purposes? – Finomnis Jun 12 '22 at 23:20
  • @Finomnis, this is just for demonstration purposes. In the original code the read will read into the Buffer – BrunoWallner Jun 13 '22 at 11:17
  • Then I think it's almost impossible to help you without a minimal reproducible example :/ If you manage to reproduce the crash in a short and concise way, and post that code here, that's when we can help. – Finomnis Jun 13 '22 at 13:36