0

I want to implement a concurrent TCP connector using Tokio. The function should initialize multiple outbound connections and only return the one that finishes the handshake first (and drop all other connections).

At first, I want to use select! but as far as I understand, select! does not apply to a vector of futures. Therefore I choose to implement a VecSelect structure that somehow acts as select!(Vec<Future>).

My code is as follows:

use tokio::net::{TcpStream};
use std::net::SocketAddr;
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};


// just a test
pub async fn my_tcp_connect(target: SocketAddr) -> std::io::Result<TcpStream> {
    let conn0 = TcpStream::connect(target);
    let conn1 = TcpStream::connect(target);
    let v = vec![conn0, conn1];
    VecSelect {
        futs: v,
    }.await
}

struct VecSelect<T: Future + Unpin> {
    futs: Vec<T>,
}

impl<T: Future + Unpin> Future for VecSelect<T> {
    type Output = T::Output;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // check all futures
        for mut fut in self.futs.iter_mut() {
            if let Poll::Ready(val) = Pin::new(&mut fut).poll(cx) {
                return Poll::Ready(val);
            }
        }
       
        Poll::Pending
    }
}

And the following are the errors reported:

error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
   --> src/tcp_connector.rs:13:5
    |
13  |     VecSelect {
    |     ^^^^^^^^^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
    |
   ::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
    |
111 |     pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
    |                                                        --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
    |
    = note: consider using `Box::pin`
    = note: required because it appears within the type `impl Future<Output = [async output]>`
    = note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
   --> src/tcp_connector.rs:18:30
    |
18  | struct VecSelect<T: Future + Unpin> {
    |                              ^^^^^ required by this bound in `VecSelect`

error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
   --> src/tcp_connector.rs:13:5
    |
13  | /     VecSelect {
14  | |         futs: v,
15  | |     }.await
    | |_____^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
    |
   ::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
    |
111 |       pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
    |                                                          --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
    |
    = note: consider using `Box::pin`
    = note: required because it appears within the type `impl Future<Output = [async output]>`
    = note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
   --> src/tcp_connector.rs:18:30
    |
18  | struct VecSelect<T: Future + Unpin> {
    |                              ^^^^^ required by this bound in `VecSelect`

error[E0277]: `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>` cannot be unpinned
   --> src/tcp_connector.rs:15:6
    |
15  |     }.await
    |      ^^^^^^ within `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`, the trait `Unpin` is not implemented for `from_generator::GenFuture<[static generator@tokio::net::TcpStream::connect<std::net::SocketAddr>::{closure#0}]>`
    |
   ::: /usr/local/cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.17.0/src/net/tcp/stream.rs:111:56
    |
111 |     pub async fn connect<A: ToSocketAddrs>(addr: A) -> io::Result<TcpStream> {
    |                                                        --------------------- within this `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
    |
    = note: consider using `Box::pin`
    = note: required because it appears within the type `impl Future<Output = [async output]>`
    = note: required because it appears within the type `impl Future<Output = Result<tokio::net::TcpStream, std::io::Error>>`
note: required by a bound in `VecSelect`
   --> src/tcp_connector.rs:18:30
    |
18  | struct VecSelect<T: Future + Unpin> {
    |                              ^^^^^ required by this bound in `VecSelect`

As I'm new to Rust and async programming, I'm not sure if my approach is in general correct.

And I'm wondering whether Unpin is necessary for my implementation and why cannot the compiler derive Unpin automatically?

tavimori
  • 15
  • 3
  • You probably want to use [`select_all`](https://docs.rs/futures/latest/futures/future/fn.select_all.html) – Jmb Mar 30 '22 at 08:25
  • @Jmb thanks, seems to be exactly what I want. – tavimori Mar 30 '22 at 09:44
  • Update: I later used `select_ok` function, which saves me many codes. However, I got the same problem with `Unpin`. I finally solved the problem by wrapping `Result` type with a `Box::Pin()`. (Though I cannot explain why roughly I can understand the reason.) – tavimori Mar 31 '22 at 09:09

0 Answers0