1

I am writing an application which should read from a serial port in a loop (like a watcher) and also write commands to it.

The main thread is allowed to only write while the created thread can only read.

I created a simple example to reproduce my problem here. tx_thread reads from the serial port in a loop and on a certain condition it sends a message via MPSC channel. rx_thread looks for messages; when there is anything available it processes and it should also mutate the current state of the struct.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;

// this will also implement Drop trait to wait threads to
// be finished (message will be Enum instead of number in this case)

#[derive(Debug)]
struct MyStruct {
    num: u32,
    tx_thread: Option<thread::JoinHandle<()>>,
    rx_thread: Option<thread::JoinHandle<()>>,
}

impl MyStruct {
    fn new() -> MyStruct {
        MyStruct {
            num: 0,
            tx_thread: None,
            rx_thread: None,
        }
    }

    fn start(&mut self) {
        let (tx, rx) = mpsc::channel();

        // tx thread will read from serial port infinitely,
        // and send data to mpsc channel after certain condition
        // to be processed.
        let tx_thread = thread::spawn(move || {
            let mut i = 0;

            loop {
                tx.send(i).unwrap();
                i += 1;
                thread::sleep(Duration::from_secs(1));
            }
        });

        // after this will receive message, it will start
        // processing and mutate `self` state if needed.
        let rx_thread = thread::spawn(move || loop {
            let num = rx.recv().unwrap();
            println!("{:?}", num);

            /* here, how do I save `num` to `self`? */

            thread::sleep(Duration::from_secs(1));
        });

        self.tx_thread = Some(tx_thread);
        self.rx_thread = Some(rx_thread);
    }
}

fn main() {
    let mut s = MyStruct::new();
    s.start();
    thread::sleep(Duration::from_secs(999999));
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Nika
  • 1,864
  • 3
  • 23
  • 44
  • Not a duplicate, the approach I saw in those topics, they use different things (e.g. putting self into arc directly, then there's no way to access it anymore from other methods). And it's not about sharing a mutable object between threads, it's solution on the question. – Nika Jan 06 '19 at 22:15

1 Answers1

3

One amazing guy (Broken pen) on discord channel told me pretty much great solution to this, all credits to him.

So solution is to put properties that we want to be mutated in Arc<Mutex<>> instead and move cloned reference into the thread. So basically code will look like this:

use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

type MyType = Arc<Mutex<u32>>;

#[derive(Debug)]
struct MyStruct {
    num: MyType,
    tx_thread: Option<thread::JoinHandle<()>>,
    rx_thread: Option<thread::JoinHandle<()>>,
}

impl MyStruct {
    fn new() -> MyStruct {
        MyStruct {
            num: Arc::new(Mutex::new(0)),
            tx_thread: None,
            rx_thread: None,
        }
    }

    fn start(&mut self) {
        let (tx, rx) = mpsc::channel();

        // tx thread will read from serial port infinitely,
        // and send data to mpsc channel after certain condition
        // to be processed.
        let tx_thread = thread::spawn(move || {
            let mut i = 0;

            loop {
                tx.send(i).unwrap();
                i += 1;
                thread::sleep(Duration::from_secs(1));
            }
        });

        // clone here.
        let arc_num = self.num.clone();
        let rx_thread = thread::spawn(move || loop {
            let num = rx.recv().unwrap();
            // println!("{:?}", num);

            // now we can use it for writing/reading.
            *arc_num.lock().unwrap() = num;
            println!("{:?}", *arc_num.lock().unwrap());

            thread::sleep(Duration::from_secs(1));
        });

        self.tx_thread = Some(tx_thread);
        self.rx_thread = Some(rx_thread);
    }
}

fn main() {
    let mut s = MyStruct::new();
    s.start();
    thread::sleep(Duration::from_secs(999999));
}

EDIT: another solution is to create inner struct with Arc<Mutex<>> and do work there, which gives you access to everything you need.

See code below:

use std::default::Default;
use std::sync::{mpsc, Arc, Mutex};
use std::thread;
use std::time::Duration;

// this will also implement Drop trait to wait threads to
// be finished (message will be Enum instead of number in this case)

#[derive(Debug, Default)]
struct MyStructInner {
    num: u32,
    tx_thread: Option<thread::JoinHandle<()>>,
    rx_thread: Option<thread::JoinHandle<()>>,
}

#[derive(Debug, Default)]
struct MyStruct {
    inner: Arc<Mutex<MyStructInner>>,
}

impl MyStruct {
    fn new() -> MyStruct {
        MyStruct {
            inner: Arc::new(Mutex::new(MyStructInner {
                num: 0,
                ..Default::default()
            })),
        }
    }

    fn start(&mut self) {
        let (tx, rx) = mpsc::channel();

        // tx thread will read from serial port infinitely,
        // and send data to mpsc channel after certain condition
        // to be processed.
        let tx_thread = thread::spawn(move || {
            let mut i = 0;

            loop {
                tx.send(i).unwrap();
                i += 1;
                thread::sleep(Duration::from_secs(1));
            }
        });

        // after this will receive message, it will start
        // processing and mutate `self` state if needed.
        let local_self = self.inner.clone();
        let rx_thread = thread::spawn(move || loop {
            let num = rx.recv().unwrap();

            local_self.lock().unwrap().num = num;
            println!("{:?}", local_self.lock().unwrap().num);

            thread::sleep(Duration::from_secs(1));
        });

        self.inner.lock().unwrap().tx_thread = Some(tx_thread);
        self.inner.lock().unwrap().rx_thread = Some(rx_thread);
    }
}

fn main() {
    let mut s = MyStruct::new();
    s.start();
    thread::sleep(Duration::from_secs(999999));
}
Nika
  • 1,864
  • 3
  • 23
  • 44