11

I don't know what to do next. It looks like I misunderstand something, or maybe I have not learned some critical topic.

use std::sync::Arc;

use reqwest::{Error, Response}; // 0.11.4
use tokio::sync::mpsc::{self, Receiver, Sender}; // 1.9.0

pub struct Task {
    pub id: u32,
    pub url: String,
}
pub enum Message {
    Failure(Task, Error),
    Success(Task, Response),
}

struct State {
    client: reqwest::Client,
    res_tx: Sender<Message>,
    res_rx: Receiver<Message>,
}

pub struct Proxy {
    state: Arc<State>,
    max_rps: u16,
    max_pending: u16,
    id: u32,
    parent_tx: Sender<String>,
}

async fn send_msg<T>(tx: &Sender<T>, msg: T) {
    match tx.send(msg).await {
        Err(error) => {
            eprintln!("{}", error)
        }
        _ => (),
    };
}

impl Proxy {
    // Starts loop for input channel
    async fn start_chin(&mut self) -> Sender<Task> {
        let (chin_tx, mut chin_rx) = mpsc::channel::<Task>(self.max_pending as usize + 1 as usize);
        let state_outer = self.state.clone();

        tokio::spawn(async move {
            loop {
                match chin_rx.recv().await {
                    Some(task) => {
                        let res_tx = state_outer.res_tx.clone();
                        let state = state_outer.clone();
                        tokio::spawn(async move {
                            match state.client.get(&task.url).send().await {
                                Ok(res) => send_msg(&res_tx, Message::Success(task, res)).await,
                                Err(err) => send_msg(&res_tx, Message::Failure(task, err)).await,
                            }
                        });
                    }
                    None => (),
                }
            }
        });
        chin_tx
    }

    async fn start_chres(&self) {
        let state = self.state.clone();

        tokio::spawn(async move {
            loop {
                match state.res_rx.recv().await { // LINE PRODUCES ERROR
                    Some(task) => {}
                    None => (),
                }
            }
        });
    }
}

impl Proxy {
    pub fn new(
        id: u32,
        parent_tx: Sender<String>,
        proxy_addr: &str,
        max_rps: u16,
        max_pending: u16,
    ) -> Result<Self, Error> {
        let client = reqwest::Client::builder();
        if proxy_addr != "none" {
            client = client.proxy(reqwest::Proxy::all(proxy_addr)?)
        }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        Ok(Proxy {
            id,
            state: Arc::new(State {
                client: client.build()?,
                res_tx,
                res_rx,
            }),
            max_rps,
            max_pending,
            parent_tx,
        })
    }
}
error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/lib.rs:69:23
   |
69 |                 match state.res_rx.recv().await {
   |                       ^^^^^^^^^^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<State>`
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Lex
  • 194
  • 1
  • 2
  • 11
  • 2
    Your question might be answered by the answers of [How do I share a mutable object between threads using Arc?](https://stackoverflow.com/q/31373255/155423). If not, please **[edit]** your question to explain the differences. Otherwise, we can mark this question as already answered. – Shepmaster Jul 27 '21 at 15:03
  • 1
    From the [documentation of `Arc`](https://doc.rust-lang.org/std/sync/struct.Arc.html), emphasis mine: *Shared references in Rust disallow mutation by default, and `Arc` is no exception: you cannot generally obtain a mutable reference to something inside an `Arc`. **If you need to mutate through an `Arc`, use `Mutex`, `RwLock`, or one of the `Atomic` types**.* – Shepmaster Jul 27 '21 at 15:05
  • See also [`tokio::sync::Mutex`](https://docs.rs/tokio/1.9.0/tokio/sync/struct.Mutex.html). – Shepmaster Jul 27 '21 at 15:06
  • 1
    Does this answer your question? [How do I share a mutable object between threads using Arc?](https://stackoverflow.com/questions/31373255/how-do-i-share-a-mutable-object-between-threads-using-arc) – Jmb Jul 27 '21 at 16:08
  • 1
    A single consumer channel can only have a single consumer, so the `res_rx` should be owned by the task and not be shared behind an `Arc`. – Jan Hudec Jul 28 '21 at 08:06
  • Sorry for misleading, yesterday was a hard day. I meant the following: how can I avoid this situation in the most optimal way. Each link helped me understand the big picture and I came up with a great solution. – Lex Jul 28 '21 at 14:03

2 Answers2

7
use std::sync::Arc;

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(something);
    arc.increase();
}

gives

error[E0596]: cannot borrow data in an `Arc` as mutable
  --> src/main.rs:16:5
   |
16 |     arc.increase();
   |     ^^^ cannot borrow as mutable
   |
   = help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<Something>`

error: aborting due to previous error; 1 warning emitted

because it tries to borrow arc as mutable. For it to happen, DerefMut would have to be implemented for Arc but it's not because Arc is not meant to be mutable.

Wraping your object in a Mutex works:

use std::sync::{Arc, Mutex};

struct Something {
    size: usize
}

impl Something {
    fn increase(&mut self) {
        self.size = self.size + 1;
    }
}

fn main() {
    let something = Something{size: 1};
    let arc = Arc::new(Mutex::new(something));
    arc.lock().unwrap().increase();
}

Now it can be shared and can be increased.

PPP
  • 1,279
  • 1
  • 28
  • 71
  • The question asks how to do this, but in context where it shouldn't have been attempted in the first place. – Jan Hudec Jul 28 '21 at 08:07
  • 2
    Better not use `std::sync::Mutex` for async applications as it will block whole system thread causing programm deadlock. If you use `tokio` then it will be better to use `tokio::sync::Mutex` - it doesn't block system thread. – Lex Jul 29 '21 at 16:38
  • 2
    According [this](https://github.com/actix/actix-website/issues/201) the Actix team recommends using the standard library mutex. – fuzzylogical Mar 15 '22 at 10:47
  • Locking a `tokio::sync::Mutex` is more expensive than a `std::sync::Mutex`. If you don't actually need to hold the lock across `.await`s then you should use the one from `std`, or even better, `parking_lot::Mutex`. – Malted_Wheaties May 31 '23 at 12:21
2

Lucas Zanella's answer and Shepmaster's comments helped alot to refactor and simplify code. I've desided to pass ownership inside Proxy::new() function instead of using shared reference. The code became more readable, and I've avoided shared reference for mutable tokio::sync::mpsc::Receiver. Perhaps the question turned out to be too unstructured, but I came to a new approach thanks to the community. Refactored code is listed below.

use reqwest::{Client, Error, Response};
use tokio::sync::mpsc;
use tokio::sync::mpsc::{Sender, Receiver};


pub struct Task {
    pub id: u32,
    pub url:  String,
}
pub enum Message{
    Failure(Task, Error),
    Success(Task, Response),
}
pub struct Proxy{
    id: u32,
    max_rps: u16,
    max_pending: u16,
    in_tx: Sender<Task>,
}


async fn send_msg<T>(tx: &Sender<T>, msg: T){
    match tx.send(msg).await {
        Err(error) => { eprintln!("{}", error) },
        _ => (),
    };
}


async fn start_loop_in(client: Client, mut in_rx: Receiver<Task>, res_tx: Sender<Message>){
    loop {
        if let Some(task) = in_rx.recv().await {
            let client_clone = client.clone();
            let res_tx_clone = res_tx.clone();
            tokio::spawn(async move {
                println!("SENDING: {}", &task.url); // TODO: DELETE DEBUG
                match client_clone.get(&task.url).send().await {
                    Ok(res) => send_msg(&res_tx_clone, Message::Success(task, res)).await,
                    Err(err) => send_msg(&res_tx_clone, Message::Failure(task, err)).await,
                }
            });
        }
    }
}


async fn start_loop_res(mut res_rx: Receiver<Message>, out_tx: Sender<String>){
    loop {
        if let Some(message) = res_rx.recv().await {
            match message {
                Message::Success(task, res) => { 
                    send_msg(
                        &out_tx, 
                        format!("{:#?}", res.text().await.unwrap()) // TODO: change in release!
                    ).await;
                },
                Message::Failure(task, err) => {
                    send_msg(&out_tx, err.to_string()).await;
                },
            }
        }
    }
}


impl Proxy{

    pub fn new(id: u32, parent_tx: Sender<String>, proxy_addr: &str, max_rps: u16, max_pending: u16) -> Result<Self, Error> {
        
        let mut client = Client::builder();
        if proxy_addr != "none" { client = client.proxy(reqwest::Proxy::all(proxy_addr)?) }
        let (res_tx, res_rx) = mpsc::channel::<Message>(max_pending as usize + 1 as usize); // TODO: check size

        let client = client.build()?;
        let (in_tx, in_rx) = mpsc::channel::<Task>(max_pending as usize + 1 as usize);
        let res_tx_clone = res_tx.clone();
        tokio::spawn(async move { start_loop_in(client, in_rx, res_tx_clone).await });

        tokio::spawn(async move { start_loop_res(res_rx, parent_tx).await });
        
        Ok(Proxy{
            id,
            max_rps,
            max_pending,
            in_tx,
        })
    }

    pub fn get_in_tx(&self) -> Sender<Task> {
        self.in_tx.clone()
    }
}
Lex
  • 194
  • 1
  • 2
  • 11