0

I am trying to send a serialized struct over tcp to multiple machines. The tcp handler receives the serialized struct (String type) by a crossbeam channel from another thread. My problem is that the rx.try_iter() will drain the crossbeam channel, and if more than one client is connected the clients can't receive the same struct. I tried moving the rx.try_iter() out of the individual handle_client() function, but couldn't achieve a good result. Thanks for your time and help!

This is what I have so far:

(Server side)

use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::{Read,Write,Error};
use serde::{Serialize, Deserialize};
use crossbeam_channel::unbounded;

#[derive(Serialize, Deserialize)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

impl Serialized {
    pub fn serialize(buffer: Vec<u32>, timestep: u128) -> String {
        let x = Serialized {
           buffer,
           timestep 
        };
        serde_json::to_string(&x).unwrap()
    }
}

fn handle_client(mut stream: TcpStream, rx: crossbeam_channel::Receiver<String>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from channel
        let serialized = match rx.try_iter().last(){
            Some(x) => x,
            None => continue,
        };

        //write to stream
        stream.write(serialized.as_bytes())?;
    }
}

pub fn start_server(rx: crossbeam_channel::Receiver<String>) {
    
    let listener = TcpListener::bind("localhost:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        let rx = rx.clone();
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

(Client side)

use std::net::TcpStream;
use serde::{Serialize, Deserialize};
use std::error::Error as er;

#[derive(Serialize, Deserialize, Debug)]
pub struct Serialized {
    pub buffer: Vec<u32>,
    pub timestep: u128,
}

fn read_user_from_stream(tcp_stream: &mut TcpStream) -> Result<Serialized, Box<dyn er>> {
    let mut de = serde_json::Deserializer::from_reader(tcp_stream);
    let u = Serialized::deserialize(&mut de)?;

    Ok(u)
}

pub fn start_client() {
    loop {
        let mut stream = TcpStream::connect("localhost:8888").expect("could not connect");
        let serialized = read_user_from_stream(&mut stream).unwrap();
        println!("timestep: {}", serialized.timestep);
    }
}

fn main() {
    start_client();
}
tlhenvironment
  • 319
  • 2
  • 10

2 Answers2

2

You can't use crossbeam to broadcast items. crossbeam only provides a producer-consumer architecture; if you want to deliver an item to multiple receivers, you need to use a different mechanism.

It seems that the bus crate provides what you need.

Finomnis
  • 18,094
  • 1
  • 20
  • 27
  • Thanks for this suggestion, it seems like the right direction for sure! I looked into the bus crate, and it seems that the bounded nature of bus means that once the channel is full, a new message can only be broadcast if all receivers successfully received the old messages. Im also still not sure how to set up the sending and receiving part properly. When I am broadcasting in the main loop using bus, I can't move bus into the function which handles the client. I don't know how to give each client an own BusReader. – tlhenvironment Aug 02 '22 at 02:50
  • That sounds like a new question. Open a new one in stackoverflow, show us the code you have (Minimalized), then we can talk about it. – Finomnis Aug 02 '22 at 06:55
0

After some discussion on the rust user board, I came to this solution (server side):

use std::sync::{Arc, Mutex};
use std::thread;
use std::net::{TcpListener, TcpStream};
use std::io::{Read,Write,Error};
use bus::{Bus, BusReader};

fn main() {
    let mut x: u32 = 0;
    let bus = Bus::<u32>::new(10);

    let bus_mutex = Arc::new(Mutex::new(bus));

    let bus_mutex_cp = Arc::clone(&bus_mutex);
    thread::spawn(move || {
        start_server(bus_mutex_cp);
    });

    //simulation loop
    for _ in 0..99999 {
        x = x + 1;
        println!("Simulation step: {}", x);
        bus_mutex.lock().unwrap().broadcast(x);
        thread::sleep_ms(1000);
    }

    loop {}

}


pub fn start_server(bus_mutex: Arc<Mutex<Bus<u32>>>) {
    
    let listener = TcpListener::bind("0.0.0.0:8888").expect("Could not bind");
    
    for stream in listener.incoming() {
        
        match stream {
            Err(e)=> {eprintln!("failed: {}", e)}
            Ok(stream) => {
                
                let rx = bus_mutex.lock().unwrap().add_rx();
                thread::spawn(move || {
                    handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
                });
                
            }
        } 

    }
}

fn handle_client(mut stream: TcpStream, mut rx: BusReader<u32>)-> Result<(), Error> {
    println!("incoming connection from: {}", stream.peer_addr()?);
    loop {
        //receive from bus
        let x = rx.recv().unwrap();

        //write to stream
        stream.write(&x.to_string().as_bytes())?;

        thread::sleep_ms(100);
    }
}
tlhenvironment
  • 319
  • 2
  • 10