1

Our Rust application appeared to have a memory leak and I've distilled down the issue to the code example below. I still can't see where the problem is.

My expectation is that on the (500,000 + 1)'th message the memory of the application would return to low levels. Instead I observe the following:

  • before sending 500,000 messages the memory usage is 124KB
  • after sending 500,000 message the memory usage climbs to 27MB
  • after sending 500,000 + 1 message the memory usage drops to 15.5MB

After trying many things, I cannot find where the 15.5MB is hiding. The only way to free the memory is to kill the application. Valgrind did not detect any memory leaks. A work around, solution, or point in the right direction would all be much appreciated.

A demo project with the code below can be found here: https://github.com/loriopatrick/mem-help

Notes

  • If I remove self.items.push(data); memory usage does not increase so I don't think it's an issue with Sender/Receiver
  • Wrapping items: Vec<String> in an Arc<Mutex<..>> made no observable memory difference

The task where the memory should be managed

struct Processor {
    items: Vec<String>,
}

impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }

    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);

            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }

        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}

Full runnable example

use std::time::Duration;

use tokio::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};

struct Processor {
    items: Vec<String>,
}

impl Processor {
    pub fn new() -> Self {
        Processor {
            items: Vec::new(),
        }
    }

    pub async fn task(mut self, mut receiver: Receiver<String>) {
        while let Some(data) = receiver.next().await {
            self.items.push(data);

            if self.items.len() > 500000 {
                {
                    std::mem::replace(&mut self.items, Vec::new());
                }
                println!("Emptied items array");
            }
        }

        println!("Processor task closing in 5 seconds");
        tokio::time::delay_for(Duration::from_secs(5)).await;
    }
}

pub fn main() {
    {
        let mut runtime: Runtime = tokio::runtime::Builder::new()
            .threaded_scheduler()
            .core_threads(1)
            .enable_all()
            .build()
            .expect("Failed to build runtime");

        let (mut sender, receiver) = channel(1024);
        let p = Processor::new();

        runtime.spawn(async move {
            println!("Before send, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;

            for i in 0..500000 {
                sender.send("Hello".to_string()).await;
            }

            println!("Sent 500,000 items, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
            sender.send("Hello".to_string()).await;

            println!("Send message to clear items");
            tokio::time::delay_for(Duration::from_secs(3)).await;

            println!("Closing sender in 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });

        runtime.block_on(async move {
            {
                p.task(receiver).await;
            }
            println!("Task is done, waiting 5 seconds");
            tokio::time::delay_for(Duration::from_secs(5)).await;
        });
    }

    println!("Runtime closed, waiting 5 seconds");
    std::thread::sleep(Duration::from_secs(5));
}

Cargo.toml

[package]
name = "mem-help"
version = "0.1.0"
authors = ["Patrick Lorio <dev@plorio.com>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]

futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }
Patrick Lorio
  • 5,520
  • 11
  • 45
  • 74
  • a leak is a thing that grow over time, you seem to describe this is not the case ? Also, you don't use anything that could make the program leak memory. So 1. why do you think it's not normal ? 2. if leak is real it's certainly not your fault so it's a probably bug – Stargateur Jan 10 '20 at 02:19
  • In my much larger application the memory grows to the point of OOMing (16 gigs). My expectation is that after I execute `std::mem::replace(&mut self.items, Vec::new());` the memory should return to similar levels as before pushing to the vector. I might be miss using async await. If there's a library bug, any hints on narrow it down would be appreciated. – Patrick Lorio Jan 10 '20 at 02:22
  • I should mention in my much larger application I am growing and replacing the Vec over time. It seems this is where the issue is. This example seems to outline the root cause however I'm not 100% certain. Either way, this example does yield a few questions on it's own. Mainly, why the memory isn't going to low levels after the replacement of the vector. – Patrick Lorio Jan 10 '20 at 02:24
  • "the memory should return to similar levels as before pushing to the vector" probably not, it's very rare that memory is really release, it's probably free space that still belong to your program but it's just not used by you. this current code isn't enough to prove a leak. You should use [`clear()`](https://doc.rust-lang.org/std/vec/struct.Vec.html#method.clear) and not this very odd way. – Stargateur Jan 10 '20 at 02:26
  • My understanding is that with Rust's ownership, after a resource is on longer owned and dropped, the memory is freed. There is no garbage collector I need to wait for to clean up the resources. I don't think there's a memory leak in the classical sense, more that there's resources I cannot find that are owned by something I cannot see. – Patrick Lorio Jan 10 '20 at 02:29
  • @Stargateur, just tested. Running `self.items.clear()` before `std::mem::replace(&mut self.items, Vec::new());` made no difference. I used std::mem::replace because the .clear will not reclaim the memory used by the Vec, it will release the memory used by the Vec entries. – Patrick Lorio Jan 10 '20 at 02:32
  • 2
    worth testing as I said this code should not leak any memory, async is pretty new so don't hesitate to post it as a bug on tokio repository, anyway, I advice you to read https://stackoverflow.com/a/55295970/7076153, https://stackoverflow.com/a/1119334/7076153. It's concern C but Rust share a lot of behaviour with it. This should explain more clearly how free memory works. – Stargateur Jan 10 '20 at 02:35
  • Interesting, https://stackoverflow.com/questions/55294985/does-malloc-reserve-more-space-while-allocating-memory/55295970#55295970 let's me think I may need to try a different allocator. – Patrick Lorio Jan 10 '20 at 02:37
  • I don't think this will fix your leak. – Stargateur Jan 10 '20 at 02:40
  • @Stargateur: `clear` only removes the elements of the `Vec`, but does not free the memory used by the `Vec` itself contrary to `mem::replace` with an empty `Vec`. Whether that's better or worse is another matter. – Matthieu M. Jan 10 '20 at 09:49
  • 1
    On Linux, I would advise having a look at your application through `valgrind --tool=massif`: this is a memory profiler which will output periodic snapshot of the memory usage, and for each snapshot how much of the usage is due to a particular stack trace. This will immediately allow you to stop which part of the application is allocating 16G (that are not released). – Matthieu M. Jan 10 '20 at 09:51
  • 2
    Can you try set `MALLOC_ARENA_MAX=2` then run your code? If the problem goes away after that, then this is a duplicate to [Why multiple threads using too much memory](https://stackoverflow.com/questions/58110081/why-multiple-threads-using-too-much-memory-when-holding-mutex/58119742). – edwardw Jan 10 '20 at 10:25
  • if you're intending to reclaim the memory, you should use the method on Vec that does so: Vec::shrink_to_fit(). – Ron Thompson Feb 03 '21 at 04:56

0 Answers0