7

I was able to proceed forward to implement my asynchronous udp server. However I have this error showing up twice because my variable data has type *mut u8 which is not Send:

error: future cannot be sent between threads safely
 help: within `impl std::future::Future`, the trait `std::marker::Send` is not implemented for `*mut u8`
note: captured value is not `Send`

And the code (MRE):

use std::error::Error;
use std::time::Duration;
use std::env;
use tokio::net::UdpSocket;
use tokio::{sync::mpsc, task, time}; // 1.4.0
use std::alloc::{alloc, Layout};
use std::mem;
use std::mem::MaybeUninit;
use std::net::SocketAddr;

const UDP_HEADER: usize = 8;
const IP_HEADER: usize = 20;
const AG_HEADER: usize = 4;
const MAX_DATA_LENGTH: usize = (64 * 1024 - 1) - UDP_HEADER - IP_HEADER;
const MAX_CHUNK_SIZE: usize = MAX_DATA_LENGTH - AG_HEADER;
const MAX_DATAGRAM_SIZE: usize = 0x10000;

/// A wrapper for [ptr::copy_nonoverlapping] with different argument order (same as original memcpy)
unsafe fn memcpy(dst_ptr: *mut u8, src_ptr: *const u8, len: usize) {
    std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, len);
}

// Different from https://doc.rust-lang.org/std/primitive.u32.html#method.next_power_of_two
// Returns the [exponent] from the smallest power of two greater than or equal to n.
const fn next_power_of_two_exponent(n: u32) -> u32 {
    return 32 - (n - 1).leading_zeros();
}

async fn run_server(socket: UdpSocket) {
    let mut missing_indexes: Vec<u16> = Vec::new();
    let mut peer_addr = MaybeUninit::<SocketAddr>::uninit();
    let mut data = std::ptr::null_mut(); // ptr for the file bytes
    let mut len: usize = 0; // total len of bytes that will be written
    let mut layout = MaybeUninit::<Layout>::uninit();
    let mut buf = [0u8; MAX_DATA_LENGTH];
    let mut start = false;
    let (debounce_tx, mut debounce_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);
    let (network_tx, mut network_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);

    loop {
        // Listen for events
        let debouncer = task::spawn(async move {
            let duration = Duration::from_millis(3300);

            loop {
                match time::timeout(duration, debounce_rx.recv()).await {
                    Ok(Some((size, peer))) => {
                        eprintln!("Network activity");
                    }
                    Ok(None) => {
                        if start == true {
                            eprintln!("Debounce finished");
                            break;
                        }
                    }
                    Err(_) => {
                        eprintln!("{:?} since network activity", duration);
                    }
                }
            }
        });
        // Listen for network activity
        let server = task::spawn({
            // async{
            let debounce_tx = debounce_tx.clone();
            async move {
                while let Some((size, peer)) = network_rx.recv().await {
                    // Received a new packet
                    debounce_tx.send((size, peer)).await.expect("Unable to talk to debounce");
                    eprintln!("Received a packet {} from: {}", size, peer);

                    let packet_index: u16 = (buf[0] as u16) << 8 | buf[1] as u16;

                    if start == false { // first bytes of a new file: initialization // TODO: ADD A MUTEX to prevent many initializations
                        start = true;
                        let chunks_cnt: u32 = (buf[2] as u32) << 8 | buf[3] as u32;
                        let n: usize = MAX_DATAGRAM_SIZE << next_power_of_two_exponent(chunks_cnt);
                        unsafe {
                            layout.as_mut_ptr().write(Layout::from_size_align_unchecked(n, mem::align_of::<u8>()));
                            
                            
                            // /!\  data has type `*mut u8` which is not `Send`
                            data = alloc(layout.assume_init());
                            
                            peer_addr.as_mut_ptr().write(peer);
                        }
                        let a: Vec<u16> = vec![0; chunks_cnt as usize]; //(0..chunks_cnt).map(|x| x as u16).collect(); // create a sorted vector with all the required indexes
                        missing_indexes = a;
                    }
                    missing_indexes[packet_index as usize] = 1;
                    unsafe {
                        let dst_ptr = data.offset((packet_index as usize * MAX_CHUNK_SIZE) as isize);
                        memcpy(dst_ptr, &buf[AG_HEADER], size - AG_HEADER);
                    };
                    println!("receiving packet {} from: {}", packet_index, peer);
                }
            }
        });

        // Prevent deadlocks
        drop(debounce_tx);

        match socket.recv_from(&mut buf).await {
            Ok((size, src)) => {
                network_tx.send((size, src)).await.expect("Unable to talk to network");
            }
            Err(e) => {
                eprintln!("couldn't recieve a datagram: {}", e);
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
    let socket = UdpSocket::bind(&addr).await?;
    println!("Listening on: {}", socket.local_addr()?);
    run_server(socket);
    Ok(())
}

Since I was converting from synchronous to asynchronous code I know that, potentially, multiple thread would be writing to data, and that is probably why I encounter such error. But I don't know which syntax I could use to "clone" the mut ptr and make it unique for each thread (and same for the buffer).

As suggested by user4815162342 I think the best would be

to make pointer Send by wrapping it in a struct and declaring unsafe impl Send for NewStruct {}.

Any help strongly appreciated!

PS: Full code can be found on my github repository

kmdreko
  • 42,554
  • 6
  • 57
  • 106
Antonin GAVREL
  • 9,682
  • 8
  • 54
  • 81
  • 1
    Why `as_mut_ptr()`? You can't `Send` that. You need another approach, ideally one that can move it or clone/copy it. – tadman Mar 30 '21 at 02:32
  • Exactly! Could you show me an example where you move, clone or copy it? – Antonin GAVREL Mar 30 '21 at 02:48
  • Not sure I can show you an example. That's just basic Rust stuff. I think you need to reconsider your use of `as_mut_ptr()` here as that's the source of your troubles. What's the motivation behind that? – tadman Mar 30 '21 at 02:51
  • The initial reason was to memcpy bytes from packets to different offset of data (depending on the packet index), and data act as the buffer of the file bytes, until the file is written (and then the data mut ptr is finally deallocated). There is no risk that data would be written twice at the same spot. I think the risk is more about buffer, I have to make sure I have a local buffer for each thread – Antonin GAVREL Mar 30 '21 at 03:01
  • It's hard to tell what's going on here as it's all sort of smashed together with not much in the way of clear structure. This really needs more functions to break it down into smaller, more easily understood parts. As a note, Tokio apps tend to use [Bytes](https://crates.io/crates/bytes) rather than `memcpy`. – tadman Mar 30 '21 at 03:03
  • I agree, I was intending to do this once its working – Antonin GAVREL Mar 30 '21 at 03:05
  • 1
    I'm not sure you can win this fight without pivoting to that approach. What you have here is something that seems more C or possibly C++ flavoured, where that sort of misses what Rust can and should be doing for you. – tadman Mar 30 '21 at 03:05
  • 1
    Ok, I will give it a try and edit the post accordingly, that was in my TODO anyways – Antonin GAVREL Mar 30 '21 at 03:06
  • If you do get it working, it's worth adding a self-answer as untangling this and fixing it is likely quite an adventure and the outcome will be interesting. – tadman Mar 30 '21 at 03:07
  • Apparently https://doc.rust-lang.org/std/rc/struct.Rc.html is the way to go to write to shared memory – Antonin GAVREL Mar 30 '21 at 03:40
  • `Arc` (thread-safe) or `Rc`, sure. – tadman Mar 30 '21 at 03:46
  • I dont need thread-safe as I told, because the threads write at different index of data, so Arc would just add a lot of overhead for no reason. BUT that said, I am getting `has type std::rc::Rc<*mut u8> which is not Send` in my current WIP... – Antonin GAVREL Mar 30 '21 at 03:49
  • Remember `task::spawn` *can* be threaded depending on which Tokio model you're using, so it's safer to assume it is threaded than not. The problem is not `Rc`, the problem is *pointers*. – tadman Mar 30 '21 at 03:50
  • I don't really get your last comment, I thought that task::spawn would be threaded for sure – Antonin GAVREL Mar 30 '21 at 03:52
  • 1
    Not necessarily, it depends on what [runtime](https://docs.rs/tokio/1.4.0/tokio/runtime/struct.Builder.html) you're using. Both single-threaded and multi-threaded versions exist. – tadman Mar 30 '21 at 03:56
  • Interesting, I will look more into that once I solve the current piece ;) – Antonin GAVREL Mar 30 '21 at 04:17
  • I also get `has type std::sync::Arc<*mut u8> which is not Send`, really struggling with the syntax... – Antonin GAVREL Mar 30 '21 at 04:21
  • Again, it's the pointer. No amount of dressing it up will help. You can't `Send` a pointer. – tadman Mar 30 '21 at 06:14
  • Rust does not judge a book by its cover... how annoying! – Antonin GAVREL Mar 30 '21 at 06:16
  • 1
    You can make a pointer `Send` by wrapping it in a struct and declaring `unsafe impl Send for NewStruct {}`. Then it's up to you to prove the invariants such as readers^writer. – user4815162342 Mar 30 '21 at 07:25
  • yes I was going this road right now actually! – Antonin GAVREL Mar 30 '21 at 07:29
  • 1
    It's hard to answer your question because it doesn't include a [MRE]. We can't tell what crates (and their versions), types, traits, fields, etc. are present in the code. It would make it easier for us to help you if you try to reproduce your error on the [Rust Playground](https://play.rust-lang.org) if possible, otherwise in a brand new Cargo project, then [edit] your question to include the additional info. There are [Rust-specific MRE tips](//stackoverflow.com/tags/rust/info) you can use to reduce your original code for posting here. Thanks! – Shepmaster Mar 30 '21 at 14:03
  • I have modified the question with a MRE, thank you! – Antonin GAVREL Mar 30 '21 at 15:01

1 Answers1

1

Short version

Thanks to the comment of user4815162342 I decided to add an implementation for the mut ptr to be able to use it with Send and Sync, which allowed me to solve this part (there are still other issues, but beyond the scope of this question):

pub struct FileBuffer {
     data: *mut u8
 }

 unsafe impl Send for FileBuffer {}
 unsafe impl Sync for FileBuffer {}

//let mut data = std::ptr::null_mut(); // ptr for the file bytes
let mut fileBuffer: FileBuffer = FileBuffer { data:  std::ptr::null_mut() };

Long version

use std::error::Error;
use std::time::Duration;
use std::env;
use tokio::net::UdpSocket;
use tokio::{sync::mpsc, task, time}; // 1.4.0
use std::alloc::{alloc, Layout};
use std::mem;
use std::mem::MaybeUninit;
use std::net::SocketAddr;

const UDP_HEADER: usize = 8;
const IP_HEADER: usize = 20;
const AG_HEADER: usize = 4;
const MAX_DATA_LENGTH: usize = (64 * 1024 - 1) - UDP_HEADER - IP_HEADER;
const MAX_CHUNK_SIZE: usize = MAX_DATA_LENGTH - AG_HEADER;
const MAX_DATAGRAM_SIZE: usize = 0x10000;

/// A wrapper for [ptr::copy_nonoverlapping] with different argument order (same as original memcpy)
unsafe fn memcpy(dst_ptr: *mut u8, src_ptr: *const u8, len: usize) {
    std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, len);
}

// Different from https://doc.rust-lang.org/std/primitive.u32.html#method.next_power_of_two
// Returns the [exponent] from the smallest power of two greater than or equal to n.
const fn next_power_of_two_exponent(n: u32) -> u32 {
    return 32 - (n - 1).leading_zeros();
}

 pub struct FileBuffer {
     data: *mut u8
 }

 unsafe impl Send for FileBuffer {}
 unsafe impl Sync for FileBuffer {}

async fn run_server(socket: UdpSocket) {
    let mut missing_indexes: Vec<u16> = Vec::new();
    let mut peer_addr = MaybeUninit::<SocketAddr>::uninit();
    //let mut data = std::ptr::null_mut(); // ptr for the file bytes
    let mut fileBuffer: FileBuffer = FileBuffer { data:  std::ptr::null_mut() };
    let mut len: usize = 0; // total len of bytes that will be written
    let mut layout = MaybeUninit::<Layout>::uninit();
    let mut buf = [0u8; MAX_DATA_LENGTH];
    let mut start = false;
    let (debounce_tx, mut debounce_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);
    let (network_tx, mut network_rx) = mpsc::channel::<(usize, SocketAddr)>(3300);

    loop {
        // Listen for events
        let debouncer = task::spawn(async move {
            let duration = Duration::from_millis(3300);

            loop {
                match time::timeout(duration, debounce_rx.recv()).await {
                    Ok(Some((size, peer))) => {
                        eprintln!("Network activity");
                    }
                    Ok(None) => {
                        if start == true {
                            eprintln!("Debounce finished");
                            break;
                        }
                    }
                    Err(_) => {
                        eprintln!("{:?} since network activity", duration);
                    }
                }
            }
        });
        // Listen for network activity
        let server = task::spawn({
            // async{
            let debounce_tx = debounce_tx.clone();
            async move {
                while let Some((size, peer)) = network_rx.recv().await {
                    // Received a new packet
                    debounce_tx.send((size, peer)).await.expect("Unable to talk to debounce");
                    eprintln!("Received a packet {} from: {}", size, peer);

                    let packet_index: u16 = (buf[0] as u16) << 8 | buf[1] as u16;

                    if start == false { // first bytes of a new file: initialization // TODO: ADD A MUTEX to prevent many initializations
                        start = true;
                        let chunks_cnt: u32 = (buf[2] as u32) << 8 | buf[3] as u32;
                        let n: usize = MAX_DATAGRAM_SIZE << next_power_of_two_exponent(chunks_cnt);
                        unsafe {
                            layout.as_mut_ptr().write(Layout::from_size_align_unchecked(n, mem::align_of::<u8>()));

                            // /!\  data has type `*mut u8` which is not `Send`
                            fileBuffer.data = alloc(layout.assume_init());

                            peer_addr.as_mut_ptr().write(peer);
                        }
                        let a: Vec<u16> = vec![0; chunks_cnt as usize]; //(0..chunks_cnt).map(|x| x as u16).collect(); // create a sorted vector with all the required indexes
                        missing_indexes = a;
                    }
                    missing_indexes[packet_index as usize] = 1;
                    unsafe {
                        let dst_ptr = fileBuffer.data.offset((packet_index as usize * MAX_CHUNK_SIZE) as isize);
                        memcpy(dst_ptr, &buf[AG_HEADER], size - AG_HEADER);
                    };
                    println!("receiving packet {} from: {}", packet_index, peer);
                }
            }
        });

        // Prevent deadlocks
        drop(debounce_tx);

        match socket.recv_from(&mut buf).await {
            Ok((size, src)) => {
                network_tx.send((size, src)).await.expect("Unable to talk to network");
            }
            Err(e) => {
                eprintln!("couldn't recieve a datagram: {}", e);
            }
        }
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());
    let socket = UdpSocket::bind(&addr).await?;
    println!("Listening on: {}", socket.local_addr()?);
    run_server(socket);
    Ok(())
}
Antonin GAVREL
  • 9,682
  • 8
  • 54
  • 81