I am trying to send a serialized struct over tcp to multiple machines. The tcp handler receives the serialized struct (String type) by a crossbeam channel from another thread. My problem is that the rx.try_iter() will drain the crossbeam channel, and if more than one client is connected the clients can't receive the same struct. I tried moving the rx.try_iter() out of the individual handle_client() function, but couldn't achieve a good result. Thanks for your time and help!
This is what I have so far:
(Server side)
use std::net::{TcpListener, TcpStream};
use std::thread;
use std::io::{Read,Write,Error};
use serde::{Serialize, Deserialize};
use crossbeam_channel::unbounded;
#[derive(Serialize, Deserialize)]
pub struct Serialized {
pub buffer: Vec<u32>,
pub timestep: u128,
}
impl Serialized {
pub fn serialize(buffer: Vec<u32>, timestep: u128) -> String {
let x = Serialized {
buffer,
timestep
};
serde_json::to_string(&x).unwrap()
}
}
fn handle_client(mut stream: TcpStream, rx: crossbeam_channel::Receiver<String>)-> Result<(), Error> {
println!("incoming connection from: {}", stream.peer_addr()?);
loop {
//receive from channel
let serialized = match rx.try_iter().last(){
Some(x) => x,
None => continue,
};
//write to stream
stream.write(serialized.as_bytes())?;
}
}
pub fn start_server(rx: crossbeam_channel::Receiver<String>) {
let listener = TcpListener::bind("localhost:8888").expect("Could not bind");
for stream in listener.incoming() {
let rx = rx.clone();
match stream {
Err(e)=> {eprintln!("failed: {}", e)}
Ok(stream) => {
thread::spawn(move || {
handle_client(stream, rx).unwrap_or_else(|error| eprintln!("{:?}", error));
});
}
}
}
}
(Client side)
use std::net::TcpStream;
use serde::{Serialize, Deserialize};
use std::error::Error as er;
#[derive(Serialize, Deserialize, Debug)]
pub struct Serialized {
pub buffer: Vec<u32>,
pub timestep: u128,
}
fn read_user_from_stream(tcp_stream: &mut TcpStream) -> Result<Serialized, Box<dyn er>> {
let mut de = serde_json::Deserializer::from_reader(tcp_stream);
let u = Serialized::deserialize(&mut de)?;
Ok(u)
}
pub fn start_client() {
loop {
let mut stream = TcpStream::connect("localhost:8888").expect("could not connect");
let serialized = read_user_from_stream(&mut stream).unwrap();
println!("timestep: {}", serialized.timestep);
}
}
fn main() {
start_client();
}