1

There is an evident lapse in my understanding on concurrent development in Rust unfortunately. This question stems from weeks repeated struggles to solve a seemingly "trivial" problem.


Problem Domain

Developing a Rust library, named twistrs that is a domain name permutation and enumeration library. The aim and objective of the library, is to be provide a root domain (e.g. google.com) and generate permutations of that domain (e.g. guugle.com) and enrichment that permutation (e.g. it resolves to 123.123.123.123).

One of its objectives, is to perform substantially faster than its Python counterpart. Most notably, network calls such as, but not limited to, DNS lookups.

Currently Design Proposal

The idea behind the library (apart from being a learning ground) is to develop a very trivial security library that can be implemented to meet various requirements. You (as a client) can choose to interact directly to the permutation or enrichment modules internally, or use the library provided async/concurrent implementation.

Twistrs proposed architecture

Note that there is no shared state internally. This is probably very inefficient, but somewhat meaningless for the time being as it prevents a lot of issues.

Current Problem

Internally the DNS lookup is done synchronously and blocks by nature. I'm having trouble turning this into concurrent code. The closest I could get was to use tokio mpsc channels, and perform spawn a single tokio task:

use twistrs::enrich::{Result, DomainMetadata};
use twistrs::permutate::Domain;

use tokio::sync::mpsc;

#[tokio::main]
async fn main() {
    let domain = Domain::new("google.com").unwrap();

    let _permutations = domain.all().unwrap().collect::<Vec<String>>();

    let (mut tx, mut rx) = mpsc::channel(1000);

    tokio::spawn(async move {
        for (i, v) in _permutations.into_iter().enumerate() {
            let domain_metadata = DomainMetadata::new(v.clone());

            let dns_resolution = domain_metadata.dns_resolvable();

            if let Err(_) = tx.send((i, dns_resolution)).await {
                println!("receiver dropped");
                return;
            }
        }
    });

    while let Some(i) = rx.recv().await {
        println!("got: {:?}", i);
    }
}

That said, an astute reader will immediately notice that this blocks, and effectively runs the DNS lookups synchronously either way.

Trying to spawn a Tokio task within the for-loop is not possible, due to move being done on the tx (and tx not impl Copy):

for (i, v) in _permutations.into_iter().enumerate() {
    tokio::spawn(async move {
        let domain_metadata = DomainMetadata::new(v.clone());

        let dns_resolution = domain_metadata.dns_resolvable();

        if let Err(_) = tx.send((i, dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
    });
}

Removing the await ofcourse will result in nothing happening, as the spawned task needs to be polled. How would I effectively wrap all those synchronous calls into async tasks, that can run independently and eventually converge into a collection?

A similar Rust project I came across was batch_resolve, which does a tremendous job at this (!). However, I found the implementation to be exceptionally complicated for what I'm looking to achieve (maybe I'm wrong). Any help or insight to achieve this is greatly appreciated.

If you want a quick way to reproduce this, you can simply clone the project and update the examples/twistrs-cli/main.rs using the first code snippet in this post.

Jonas
  • 121,568
  • 97
  • 310
  • 388
Juxhin
  • 5,068
  • 8
  • 29
  • 55
  • 1
    But why use synchronous DNS-lookups? Are you delegating to the OS for DNS? Otherwise you could async network request to a DNS server? – Jonas Aug 23 '20 at 17:44
  • If you use a synchronous dns request, you are limited to the number of worker threads used by Tokio, which is typically around 8. Any reason not use Tokio's [`lookup_host`](https://docs.rs/tokio/0.2/tokio/net/fn.lookup_host.html) for dns queries? That would allow you to use standard concurrency primitives to run a large number of them concurrently. – Alice Ryhl Aug 23 '20 at 17:48
  • Both good points, I'm actually looking into that which would potentially solve my problem. – Juxhin Aug 23 '20 at 18:03

1 Answers1

2

Edit: I misinterpreted your question and didn't realize that the DNS resolution itself wasn't asynchronous. The following approach won't actually work with synchronous code and will just result in the executor stalling because of the blocking code, but I'll leave it up in case you switch to an asynchronous resolution method. I'd recommend using tokio's asynchronous lookup_host() if that fits your needs.


Async executors are designed to handle large numbers of parallel tasks, so you could try spawning a new task for every request, using a Semaphore to create an upper bound on the number of running tasks at once. The code for that might look like this:

let (mut tx, mut rx) = mpsc::channel(1000);
let semaphore = Arc::new(Semaphore::new(1000)); // allow up to 1000 tasks to run at once

for (i, v) in _permutations.into_iter().enumerate() {
    let domain_metadata = DomainMetadata::new(v.clone());
    let mut tx = tx.clone(); // every task will have its own copy of the sender
    let permit = semaphore.acquire_owned().await; // wait until we have a permit

    let dns_resolution = domain_metadata.dns_resolvable();
    tokio::spawn(async move {
        if let Err(_) = tx.send((i, dns_resolution)).await {
            println!("receiver dropped");
            return;
        }
        drop(permit); // explicitly release the permit, to make sure it was moved into this task
    }); // note: task spawn results and handle dropped here
}

while let Some(i) = rx.recv().await {
    println!("got: {:?}", i);
}

If the overhead of the extra tasks proves too significant, you can try instead combining the tasks into a single future, using facilities like FuturesUnordered from the futures crate. This allows you to take an arbitrarily large list of futures and poll them all repeatedly within a single task.

apetranzilla
  • 5,331
  • 27
  • 34
  • 1
    Thank you @apetranzilla -- going to try this out and get back to you. Will look to move to `lookup_host()` as well as was suggested in the comments. – Juxhin Aug 23 '20 at 18:04
  • I managed to get tokio::net::lookup_host to compile which is great. The only issue, is that `domain_metadata.dns_resolvable()` now returns a `Future` which is then pushed through `tx`. So When we get to `rx.recv().await`, all we have are a collection of Futures (which also cannot be debugged/printed). – Juxhin Aug 23 '20 at 18:21
  • For better context: `impl std::future::Future cannot be formatted using {:?} because it doesn't implement std::fmt::Debug` – Juxhin Aug 23 '20 at 18:21
  • If it helps, this is how the `dns_resolvable()` is implemented - https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=2876f70a7da1890880a2ebb3d68bb3ee – Juxhin Aug 23 '20 at 18:27
  • You should be able to just add the `.await` keyword to it inside the new task, so that you instead have `dns_resolution.await` and send the `Result`s through the channel rather than the futures themselves. Since you're awaiting them within the newly spawned task, it won't cause the outer task to wait. – apetranzilla Aug 23 '20 at 18:30
  • That makes a lot more sense. The key insight is that the await would be handled inside the tokio task :-). The only thing left is a move issue with the semaphore. Despite being wrapped in an `Arc`, `let permit = semaphore.acquire_owned().await` is still moved as `Arc` doesn't impl `Copy` – Juxhin Aug 23 '20 at 18:38
  • Cloning might fix that, but I want to test whether or not it actually applies the upper-bound correctly. Unfortunately not much documentation to work with here, but it's my first use of semaphores so I thank you for that. – Juxhin Aug 23 '20 at 18:39
  • I can confirm that the `semaphore.clone().acquire_owned().await` sets the upper-bound which is great. One final thank you for the help. Few weird deadlock issues but that's something I'll spend the next couple of hours figuring out. :-) – Juxhin Aug 23 '20 at 18:49