3

I need to create a distributed system comprised of various processes; to simplify matters a bit, let's say they are three: A, B and C. Every process knows the IP and port of the others from/to which to receive/send UDP datagrams. They all use the same UDP port (let's say port 8080).

Assuming that A's IP is 192.168.1.1, B's IP is 192.168.1.2 and C's IP is 192.168.1.3:

  • A has two UDP sockets, both bind to 0.0.0.0:8080. The first is connected to 192.168.1.2:8080, the second to 192.168.1.3:8080. It uses the first socket to receive and send datagrams from/to B, and the second socket to receive and send datagrams from/to C.
  • B has two UDP sockets, both bind to 0.0.0.0:8080. The first is connected to 192.168.1.1:8080, the second to 192.168.1.3:8080. It uses the first socket to receive and send datagrams from/to A, and the second socket to receive and send datagrams from/to C.
  • C has two UDP sockets, both bind to 0.0.0.0:8080. The first is connected to 192.168.1.1:8080, the second to 192.168.1.2:8080. It uses the first socket to receive and send datagrams from/to A, and the second socket to receive and send datagrams from/to B.

The processes communicate with each other and reject datagrams from others.

The communication I want to enable can be described by the following tuples. All rows are different, thus they should describe a valid setup.

Source address Source port Dest address Dest port Protocol
192.168.1.1 8080 192.168.1.2 8080 UDP
192.168.1.1 8080 192.168.1.3 8080 UDP
192.168.1.2 8080 192.168.1.1 8080 UDP
192.168.1.2 8080 192.168.1.3 8080 UDP
192.168.1.3 8080 192.168.1.1 8080 UDP
192.168.1.3 8080 192.168.1.2 8080 UDP

How can I achieve this in Rust + Tokio?

In Rust, connecting a socket means setting the default destination for send() and limiting packets that are read via recv from the specified address. This concept is not specific to Rust: POSIX sockets (such as UDP sockets) behave the same way.

This is what I have in mind:

// A's startup code

let sockforb = UdpSocket::bind("0.0.0.0:8080").await?;
let sockforc = UdpSocket::bind("0.0.0.0:8080").await?;

let b_addr = "192.168.1.2:8080".parse::<SocketAddr>().unwrap();
let c_addr = "192.168.1.3:8080".parse::<SocketAddr>().unwrap();

sockforb.connect(b_addr).await?;
sockforc.connect(c_addr).await?;

The code above would be handy: I have two distinct socket variables, and if I call send/recv on them I send/ receive datagrams to/from the desired process.

However, the code produces the following error:

Error: Os { code: 98, kind: AddrInUse, message: "Address already in use" }

As a workaround, I can define one socket variable only and pass multiple addresses to the connect method (the argument has type ToSocketAddrs). This lets me send/receive datagrams only to/from the designated processes. However, although this solution is free of errors, it is not handy as I would have one single socket variable, in contrast to multiple variables for different processes. My intent is to have different socket variables in order to put them in different structs for each process.

How can I achieve this in Rust + Tokio, possibly with portable (non OS-dependent) code?

cafce25
  • 15,907
  • 4
  • 25
  • 31
steddy
  • 146
  • 1
  • 9
  • I think you just need a layer of abstraction. Create something like `struct SocketEndpoint` that has fields for both updsocket and address. Then you can have two `SocketEndpoint`s like you want but they can both be tied to the same underlying connection. – etchesketch Mar 23 '23 at 18:07

1 Answers1

1

It's possible setting SO_REUSEADDRESS and SO_REUSEPORT on unix like OS' like MacOS and Linux. I'm not sure it's possible on Windows but it seems to need only SO_REUSEADDRESS, in the following I'm using the net2 crate to do so with a safe abstraction in the UdpBuilder and UnixUdpBuilderExt:

use std::io;

use net2::{unix::UnixUdpBuilderExt, UdpBuilder};
use tokio::net::UdpSocket;

async fn build_socket(me: u16, to: u16) -> io::Result<std::net::UdpSocket> {
    tokio::task::spawn_blocking(move || -> io::Result<_> {
        let socket = UdpBuilder::new_v4()?
            .reuse_address(true)?
            .reuse_port(true)?
            .bind(("0.0.0.0", me))?;
        socket.set_nonblocking(true)?;
        socket.connect(("127.0.0.1", to))?;
        Ok(socket)
    })
    .await
    .unwrap()
}

#[tokio::main]
async fn main() -> io::Result<()> {
    let handles = (0..3)
        .map(|i| {
            tokio::task::spawn(async move {
                let me = 8080 + i;
                let to1 = 8080 + (i + 1) % 3;
                let to2 = 8080 + (i + 2) % 3;
                let a = build_socket(me, to1).await.unwrap();
                let a = UdpSocket::from_std(a).unwrap();

                let b = build_socket(me, to2).await.unwrap();
                let b = UdpSocket::from_std(b).unwrap();

                a.send(format!("{me} -> {to1}").as_bytes()).await.unwrap();
                b.send(format!("{me} -> {to2}").as_bytes()).await.unwrap();
                let mut buf = [0; 1024];

                if let Ok(n) = a.recv(&mut buf).await {
                    println!("{me} received: {:?}", String::from_utf8_lossy(&buf[..n]));
                }
                if let Ok(n) = b.recv(&mut buf).await {
                    println!("{me} received: {:?}", String::from_utf8_lossy(&buf[..n]));
                }
            })
        })
        .collect::<Vec<_>>();

    for handle in handles {
        handle.await.unwrap();
    }
    println!("finished all sends, returning");
    std::process::exit(0);
}

Note: the code above uses different ports on the same machine to represent the different IPs of the OP for easier testing.

cafce25
  • 15,907
  • 4
  • 25
  • 31