0

I want to use a container to manage tokio::oneshot::Senders. I'm using a Vec, but seems that values saved in Vec are references and I need to use self, not a reference, to call it:

use bytes::BytesMut;
use tokio::sync::oneshot;

#[derive(Clone)]
pub enum ChannelData {
    Video { timestamp: u32, data: BytesMut },
    Audio { timestamp: u32, data: BytesMut },
    MetaData {},
}

pub type PlayerPublisher = oneshot::Sender<ChannelData>;

pub struct Channel {
    player_producers: Vec<PlayerPublisher>, // consumers who subscribe this channel.
}

impl Channel {
    fn new() -> Self {
        Self {
            player_producers: Vec::new(),
        }
    }

    async fn transmit(&mut self) {
        let b = BytesMut::new();
        let data = ChannelData::Video {
            timestamp: 234,
            data: b,
        };

        for i in self.player_producers {
            i.send(data);
        }
    }
}

The errors:

error[E0507]: cannot move out of `self.player_producers` which is behind a mutable reference
  --> src/lib.rs:31:18
   |
31 |         for i in self.player_producers {
   |                  ^^^^^^^^^^^^^^^^^^^^^
   |                  |
   |                  move occurs because `self.player_producers` has type `Vec<tokio::sync::oneshot::Sender<ChannelData>>`, which does not implement the `Copy` trait
   |                  help: consider iterating over a slice of the `Vec<_>`'s content: `&self.player_producers`

error[E0382]: use of moved value: `data`
  --> src/lib.rs:32:20
   |
26 |         let data = ChannelData::Video {
   |             ---- move occurs because `data` has type `ChannelData`, which does not implement the `Copy` trait
...
32 |             i.send(data);
   |                    ^^^^ value moved here, in previous iteration of loop

How can I achieve my goal?

pub fn send(mut self, t: T) -> Result<(), T> {
    let inner = self.inner.take().unwrap();

    inner.value.with_mut(|ptr| unsafe {
        *ptr = Some(t);
    });

    if !inner.complete() {
        unsafe {
            return Err(inner.consume_value().unwrap());
        }
    }

    Ok(())
}
kmdreko
  • 42,554
  • 6
  • 57
  • 106
Harlan Chen
  • 321
  • 5
  • 20
  • It's hard to answer your question because it doesn't include a [MRE]. We can't tell what crates (**and their versions**), types, traits, fields, etc. are present in the code. It would make it easier for us to help you if you try to reproduce your error on the [Rust Playground](https://play.rust-lang.org) if possible, otherwise in a brand new Cargo project, then [edit] your question to include the additional info. There are [Rust-specific MRE tips](//stackoverflow.com/tags/rust/info) you can use to reduce your original code for posting here. Thanks! – Shepmaster Apr 05 '21 at 16:56

2 Answers2

1

Calling send requires ownership of the oneshot channel. To get that ownership, you can take ownership of the container. In this case, the easiest way is to take ownership of the Channel:

async fn transmit(self) { // Changed to `self`
    for i in self.player_producers {
        let data = ChannelData::Video {
            timestamp: 234,
            data: BytesMut::new(),
        };
        if i.send(data).is_err() {
            panic!("Unable to send data");
        }
    }
}

Other options are to drain the collection:

for i in self.player_producers.drain(..) {

Or swap the collection with an empty one:

use std::mem;
for i in mem::take(&mut self.player_producers) {

In each case, the data payload has to be constructed (or cloned) each time it is sent.

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
1

A Tokio oneshot sender can only send a single message, so send will consume the Sender. To call send from within a Vec, you first have to remove it. The way to remove all elements from a &mut Vec while iterating over the them is to drain it:

for i in self.player_producers.drain(..) {
    i.send(data);
}

Your other error is from data being consumed by the call to send. Since you want to send the same data to multiple senders, you'll have to clone it:

i.send(data.clone());

Be mindful of the warning that send returns a Result.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
kmdreko
  • 42,554
  • 6
  • 57
  • 106