2

When using a channel, let's say at a given moment we have the following messages in the queue for a given receiver:

abc
blah
abc
something
something
blah
something

Usually messages are processed like this:

let (sender, receiver) = mpsc::channel::<Msg>();
for msg in receiver {
   // Process a message...
}

What is the easiest way to de-duplicate messages that have already piled up?

Using the above example, I'd like to only process 3 (not 7) messages:

abc
blah
something

Note: After a message abc has been processed, it's fine to receive (and process) another message abc again in the future. The goal is to only de-duplicate messages that have already piled-up in the queue.

at54321
  • 8,726
  • 26
  • 46

1 Answers1

3

Solution 1

Here is an example solution:

loop {
    let deduped_msgs : HashSet<Msg> = HashSet::from_iter(receiver.try_iter());
    for msg in deduped_msgs {
        // Process a message...                
    }
    thread::sleep(time::Duration::from_millis(10));
}

The above uses the non-blocking try_iter method. I've added a short sleep() to avoid excessive pressure on the CPU.

Solution 2

Here is a similar solution, as per Chayim's suggestions, that avoids the need for periodic checks every X milliseconds:

let mut deduped_msgs = HashSet::new();
loop {
    deduped_msgs.insert(receiver.recv().unwrap());
    deduped_msgs.extend(receiver.try_iter());               
    for msg in deduped_msgs.drain() {
        // Process a message... 
    }
}

Order of processing

If the order in which messages are processed matters, HashSet (alone) might not be the best option.

  • To process messages in the order they arrived (only taking into account the first occurrence of each message), one could use IndexSet for example.
  • In some cases the natural order of messages might make more sense and then BTreeSet could be the simplest option.
  • To apply some sort of a custom priority ordering, one could, for example, utilize a vector and sort it via sort_unstable_by or sort_unstable_by_key (probably simpler to first dedup them with a HashSet, but another option might be to dedup an already sorted vector).
at54321
  • 8,726
  • 26
  • 46
  • 1
    Two things I would change: keeping the set between iterations to avoid excessive allocations, and using `recv()` instead of `sleep()`. Other than that I believe this is the best you can get. – Chayim Friedman Jun 27 '23 at 07:37
  • [Like that](https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=30cf988e078c8aec925da3c73df1cc89). – Chayim Friedman Jun 27 '23 at 07:40
  • Thanks @ChayimFriedman! I've updated my answer to include your suggestions as well. – at54321 Jun 27 '23 at 07:57
  • An important note: this does not keep the order of received messages. If this is important, you can use [`IndexSet`](https://docs.rs/indexmap/latest/indexmap/set/struct.IndexSet.html). – Chayim Friedman Jun 28 '23 at 06:07
  • @ChayimFriedman That's a good point. I've added some notes to my answer. Thanks! – at54321 Jun 28 '23 at 06:47