2

I'm trying to perform a parallel operation on several chunks of strings at a time, and I'm finding having an issue with the borrow checker:

(for context, identifiers is a Vec<String> from a CSV file, client is reqwest and target is an Arc<String> that is write once read many)

use futures::{stream, StreamExt};
use std::sync::Arc;

async fn nop(
    person_ids: &[String],
    target: &str,
    url: &str,
) -> String {
    let noop = format!("{} {}", target, url);
    let noop2 = person_ids.iter().for_each(|f| {f.as_str();});
    "Some text".into()
}

#[tokio::main]
async fn main() {
    let target = Arc::new(String::from("sometext"));
    let url = "http://example.com";
    let identifiers = vec!["foo".into(), "bar".into(), "baz".into(), "qux".into(), "quux".into(), "quuz".into(), "corge".into(), "grault".into(), "garply".into(), "waldo".into(), "fred".into(), "plugh".into(), "xyzzy".into()];

    let id_sets: Vec<&[String]> = identifiers.chunks(2).collect();

    let responses = stream::iter(id_sets)
        .map(|person_ids| {
            let target = target.clone();
            tokio::spawn( async move {
                let resptext = nop(person_ids, target.as_str(), url).await;
            })
        })
        .buffer_unordered(2);

    responses
        .for_each(|b| async { })
        .await;
}

Playground: https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=e41c635e99e422fec8fc8a581c28c35e

Given chunks yields a Vec<&[String]>, the compiler complains that identifiers doesn't live long enough because it potentially goes out of scope while the slices are being referenced. Realistically this won't happen because there's an await. Is there a way to tell the compiler that this is safe, or is there another way of getting chunks as a set of owned Strings for each thread?

There was a similarly asked question that used into_owned() as a solution, but when I try that, rustc complains about the slice size not being known at compile time in the request_user function.

EDIT: Some other questions as well:

  1. Is there a more direct way of using target in each thread without needing Arc? From the moment it is created, it never needs to be modified, just read from. If not, is there a way of pulling it out of the Arc that doesn't require the .as_str() method?

  2. How do you handle multiple error types within the tokio::spawn() block? In the real world use, I'm going to receive quick_xml::Error and reqwest::Error within it. It works fine without tokio spawn for concurrency.

kmdreko
  • 42,554
  • 6
  • 57
  • 106
Dragoon
  • 723
  • 6
  • 13
  • I don't think it's true that the `.await` means the chunks can't outlive `identifiers`; at least, that's not clear to me from this snippet. Maybe it depends on a part of the code you're not showing. Please try to create a [mre] to help potential answerers. – trent Mar 02 '21 at 20:09
  • I added a playground link, it reproduces the problem – Dragoon Mar 02 '21 at 22:16
  • Regarding your edit, please only ask one question per post. 1) Since `tokio::spawn` requires `'static` yes, either `Arc` or cloning is required. You can just use `&target` if `as_str()` bothers you. 2) Ask a new question with the representable problem you're having. – kmdreko Mar 03 '21 at 00:12

2 Answers2

6

Is there a way to tell the compiler that this is safe, or is there another way of getting chunks as a set of owned Strings for each thread?

You can chunk a Vec<T> into a Vec<Vec<T>> without cloning by using the itertools crate:

use itertools::Itertools;

fn main() {
    let items = vec![
        String::from("foo"),
        String::from("bar"),
        String::from("baz"),
    ];
    
    let chunked_items: Vec<Vec<String>> = items
        .into_iter()
        .chunks(2)
        .into_iter()
        .map(|chunk| chunk.collect())
        .collect();
        
    for chunk in chunked_items {
        println!("{:?}", chunk);
    }
}
["foo", "bar"]
["baz"]

This is based on the answers here.

kmdreko
  • 42,554
  • 6
  • 57
  • 106
3

Your issue here is that the identifiers are a Vector of references to a slice. They will not necessarily be around once you've left the scope of your function (which is what async move inside there will do).

Your solution to the immediate problem is to convert the Vec<&[String]> to a Vec<Vec<String>> type.

A way of accomplishing that would be:

    let id_sets: Vec<Vec<String>> = identifiers
        .chunks(2)
        .map(|x: &[String]| x.to_vec())
        .collect();
Amir Omidi
  • 329
  • 1
  • 11