Our Rust application appeared to have a memory leak and I've distilled down the issue to the code example below. I still can't see where the problem is.
My expectation is that on the (500,000 + 1)'th message the memory of the application would return to low levels. Instead I observe the following:
- before sending 500,000 messages the memory usage is 124KB
- after sending 500,000 message the memory usage climbs to 27MB
- after sending 500,000 + 1 message the memory usage drops to 15.5MB
After trying many things, I cannot find where the 15.5MB is hiding. The only way to free the memory is to kill the application. Valgrind did not detect any memory leaks. A work around, solution, or point in the right direction would all be much appreciated.
A demo project with the code below can be found here: https://github.com/loriopatrick/mem-help
Notes
- If I remove
self.items.push(data);
memory usage does not increase so I don't think it's an issue with Sender/Receiver - Wrapping
items: Vec<String>
in anArc<Mutex<..>>
made no observable memory difference
The task where the memory should be managed
struct Processor {
items: Vec<String>,
}
impl Processor {
pub fn new() -> Self {
Processor {
items: Vec::new(),
}
}
pub async fn task(mut self, mut receiver: Receiver<String>) {
while let Some(data) = receiver.next().await {
self.items.push(data);
if self.items.len() > 500000 {
{
std::mem::replace(&mut self.items, Vec::new());
}
println!("Emptied items array");
}
}
println!("Processor task closing in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
}
}
Full runnable example
use std::time::Duration;
use tokio::stream::StreamExt;
use tokio::runtime::Runtime;
use tokio::sync::mpsc::{channel, Receiver, Sender};
struct Processor {
items: Vec<String>,
}
impl Processor {
pub fn new() -> Self {
Processor {
items: Vec::new(),
}
}
pub async fn task(mut self, mut receiver: Receiver<String>) {
while let Some(data) = receiver.next().await {
self.items.push(data);
if self.items.len() > 500000 {
{
std::mem::replace(&mut self.items, Vec::new());
}
println!("Emptied items array");
}
}
println!("Processor task closing in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
}
}
pub fn main() {
{
let mut runtime: Runtime = tokio::runtime::Builder::new()
.threaded_scheduler()
.core_threads(1)
.enable_all()
.build()
.expect("Failed to build runtime");
let (mut sender, receiver) = channel(1024);
let p = Processor::new();
runtime.spawn(async move {
println!("Before send, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
for i in 0..500000 {
sender.send("Hello".to_string()).await;
}
println!("Sent 500,000 items, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
sender.send("Hello".to_string()).await;
println!("Send message to clear items");
tokio::time::delay_for(Duration::from_secs(3)).await;
println!("Closing sender in 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
});
runtime.block_on(async move {
{
p.task(receiver).await;
}
println!("Task is done, waiting 5 seconds");
tokio::time::delay_for(Duration::from_secs(5)).await;
});
}
println!("Runtime closed, waiting 5 seconds");
std::thread::sleep(Duration::from_secs(5));
}
Cargo.toml
[package]
name = "mem-help"
version = "0.1.0"
authors = ["Patrick Lorio <dev@plorio.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.1"
tokio = { version = "0.2.6", features = ["full"] }