24

I'm not able to create a client that tries to connect to a server and:

  • if the server is down it has to try again in an infinite loop
  • if the server is up and connection is successful, when the connection is lost (i.e. server disconnects the client) the client has to restart the infinite loop to try to connect to the server

Here's the code to connect to a server; currently when the connection is lost the program exits. I'm not sure what the best way to implement it is; maybe I have to create a Future with an infinite loop?

extern crate tokio_line;
use tokio_line::LineCodec;

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {                                                                                                                                   
    let remote_addr = "127.0.0.1:9876".parse().unwrap();                                                                                                                                                            
    let tcp = TcpStream::connect(&remote_addr, handle);                                                                                                                                                             

    let client = tcp.and_then(|stream| {                                                                                                                                                                            
        let (sink, from_server) = stream.framed(LineCodec).split();                                                                                                                                                 
        let reader = from_server.for_each(|message| {                                                                                                                                                               
            println!("{}", message);                                                                                                                                                                                
            Ok(())                                                                                                                                                                                                  
        });                                                                                                                                                                                                         

        reader.map(|_| {                                                                                                                                                                                            
            println!("CLIENT DISCONNECTED");                                                                                                                                                                        
            ()                                                                                                                                                                                                      
        }).map_err(|err| err)                                                                                                                                                                                       
    });                                                                                                                                                                                                             

    let client = client.map_err(|_| { panic!()});                                                                                                                                                                   
    Box::new(client)                                                                                                                                                                                                
}                                                                                                                                                                                                                   

fn main() {                                                                                                                                                                                                         
    let mut core = Core::new().unwrap();                                                                                                                                                                            
    let handle = core.handle();                                                                                                                                                                                     
    let client = get_connection(&handle);                                                                                                                                                                           

    let client = client.and_then(|c| {                                                                                                                                                                              
        println!("Try to reconnect");                                                                                                                                                                               
        get_connection(&handle);                                                                                                                                                                                    
        Ok(())                                                                                                                                                                                                      
    });                                                                                                                                                                                                             

    core.run(client).unwrap();                                                                                                                                                                                      
}

Add the tokio-line crate with:

tokio-line = { git = "https://github.com/tokio-rs/tokio-line" }
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Danilo Silva
  • 293
  • 2
  • 6
  • If you have your own answer, you are encouraged to **post an answer** (yes, you can answer your own question). However, it needs to be complete and not just a link. – Shepmaster Mar 11 '17 at 19:15

1 Answers1

19

The key question seems to be: how do I implement an infinite loop using Tokio? By answering this question, we can tackle the problem of reconnecting infinitely upon disconnection. From my experience writing asynchronous code, recursion seems to be a straightforward solution to this problem.

UPDATE: as pointed out by Shepmaster (and the folks of the Tokio Gitter), my original answer leaks memory since we build a chain of futures that grows on each iteration. Here follows a new one:

Updated answer: use loop_fn

There is a function in the futures crate that does exactly what you need. It is called loop_fn. You can use it by changing your main function to the following:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = future::loop_fn((), |_| {
        // Run the get_connection function and loop again regardless of its result
        get_connection(&handle).map(|_| -> Loop<(), ()> {
            Loop::Continue(())
        })
    });

    core.run(client).unwrap();
}

The function resembles a for loop, which can continue or break depending on the result of get_connection (see the documentation for the Loop enum). In this case, we choose to always continue, so it will infinitely keep reconnecting.

Note that your version of get_connection will panic if there is an error (e.g. if the client cannot connect to the server). If you also want to retry after an error, you should remove the call to panic!.


Old answer: use recursion

Here follows my old answer, in case anyone finds it interesting.

WARNING: using the code below results in unbounded memory growth.

Making get_connection loop infinitely

We want to call the get_connection function each time the client is disconnected, so that is exactly what we are going to do (look at the comment after reader.and_then):

fn get_connection(handle: &Handle) -> Box<Future<Item = (), Error = io::Error>> {
    let remote_addr = "127.0.0.1:9876".parse().unwrap();
    let tcp = TcpStream::connect(&remote_addr, handle);
    let handle_clone = handle.clone();

    let client = tcp.and_then(|stream| {
        let (sink, from_server) = stream.framed(LineCodec).split();
        let reader = from_server.for_each(|message| {
            println!("{}", message);
            Ok(())
        });

        reader.and_then(move |_| {
            println!("CLIENT DISCONNECTED");
            // Attempt to reconnect in the future
            get_connection(&handle_clone)
        })
    });

    let client = client.map_err(|_| { panic!()});
    Box::new(client)
}

Remember that get_connection is non-blocking. It just constructs a Box<Future>. This means that when calling it recursively, we still don't block. Instead, we get a new future, which we can link to the previous one by using and_then. As you can see, this is different to normal recursion since the stack doesn't grow on each iteration.

Note that we need to clone the handle (see handle_clone), and move it into the closure passed to reader.and_then. This is necessary because the closure is going to live longer than the function (it will be contained in the future we are returning).

Handling errors

The code you provided doesn't handle the case in which the client is unable to connect to the server (nor any other errors). Following the same principle shown above, we can handle errors by changing the end of get_connection to the following:

let handle_clone = handle.clone();
let client = client.or_else(move |err| {
    // Note: this code will infinitely retry, but you could pattern match on the error
    // to retry only on certain kinds of error
    println!("Error connecting to server: {}", err);
    get_connection(&handle_clone)
});
Box::new(client)

Note that or_else is like and_then, but it operates on the error produced by the future.

Removing unnecessary code from main

Finally, it is not necessary to use and_then in the main function. You can replace your main by the following code:

fn main() {
    let mut core = Core::new().unwrap();
    let handle = core.handle();
    let client = get_connection(&handle);
    core.run(client).unwrap();
}
aochagavia
  • 5,887
  • 5
  • 34
  • 53
  • It's interesting. This is like encoding a state-machine piecemeal in the future: each time you return a future it gives the "next action" to be taken, which can be to attempt to connect, attempt to reconnect, send a message, wait for a reply, ... – Matthieu M. Feb 08 '17 at 11:13
  • 1
    I learned this the hard way using Boost.Asio in C++. You need to be very careful to avoid dangling pointers, since the closures usually outlive their environment... Anyway, it is a pattern that seems very weird at first, if you are used to writing sequential code. – aochagavia Feb 08 '17 at 11:18
  • 2
    It might be worth noting that the *recursion* here this isn't recursion in the classic sense of the function calling itself, thus accumulating stack frames. The "recursive" call to `get_connection` is inside the closure that will be executed only later, after `get_connection` has exited. – user4815162342 Feb 08 '17 at 11:59
  • 1
    @user4815162342 in other contexts, this is usually referred to as a [trampoline](https://en.wikipedia.org/wiki/Trampoline_(computing)#High-level_programming). – Shepmaster Feb 08 '17 at 13:49
  • 2
    @user4815162342 Just added a comment to clarify :) – aochagavia Feb 08 '17 at 15:34
  • 1
    Some [discussion in the Tokio gitter](https://gitter.im/tokio-rs/tokio?at=589b968ade50490822a3b9bc) suggests that while this doesn't accumulate stack frames, it does have unbounded memory usage. I don't know one way or the other; just relaying the message ^_^. – Shepmaster Feb 08 '17 at 22:59
  • the sample with the using of `loop_fn` doesn't work, but thanks (again) for the suggestion, I will try to make it working – Danilo Silva Feb 14 '17 at 13:04
  • what is the concrete problem? – aochagavia Feb 14 '17 at 14:54
  • it does not loop infinitely :) if the server is not listening the program exit if the server is listening, the client start connection to it, but when I stop the server the program exit without retrying (p.s. I have removed the panic from the get_connection function) – Danilo Silva Feb 14 '17 at 15:27
  • That sounds pretty strange. Please post here if you figure out what the problem was, so I can update my answer – aochagavia Feb 14 '17 at 19:31
  • sorry, my fault (obviously :)) I had removed the `panic!()` but I put a return of an `Err` instead, and it doesn't works because the `Loop::Continue(())` is returned only when `get_connection` doesn't returns an error. Trying to add the `map_err` to the `get_connection(..)` doesn't works again; finally I map errors to an Ok(()) in the `get_connection` method with an `let client = client.or_else(|_| Ok(()));` Thank you so much! – Danilo Silva Feb 14 '17 at 19:56