7

I'm trying to create a system by which my application can receive streaming data from a Redis PubSub channel and process it. The Redis driver that I'm using, along with all other Redis drivers for Rust that I've seen, use a blocking operation to get data from the channel that only returns a value when it receives data:

let msg = match pubsub.get_message() {
        Ok(m) => m,
        Err(_) => panic!("Could not get message from pubsub!")
};
let payload: String = match msg.get_payload() {
    Ok(s) => s,
    Err(_) => panic!("Could not convert redis message to string!")
};

I wanted to use the futures-rs library to wrap this blocking function call in a future so that I can perform other tasks within my application while waiting for input.

I read the tutorial for futures and tried to create a Stream that would signal when there data is received by the PubSub, but I can't figure out how to do so.

How can I create schedule and poll functions for the blocking pubsub.get_message() function?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Ameo
  • 2,307
  • 5
  • 21
  • 33

1 Answers1

11

Heavy caveat I've never used this library before, and my low-level knowledge of some of the concepts is a bit... lacking. Mostly I'm reading through the tutorial. I'm pretty sure anyone who has done async work will read this and laugh, but it may be a useful starting point for other people. Caveat emptor!


Let's start with something a bit simpler, demonstrating how a Stream works. We can convert an iterator of Results into a stream:

extern crate futures;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let payloads: Vec<Result<String, ()>> = vec![Ok("a".into()), Ok("b".into())];
    let payloads = stream::iter(payloads.into_iter());

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
}

This shows us one way to consume the stream. We use and_then to do something to each payload (here just printing it out) and then for_each to convert the Stream back into a Future. We can then run the future by calling the strangely named forget method.


Next is to tie the Redis library into the mix, handling just one message. Since the get_message() method is blocking, we need to introduce some threads into the mix. It's not a good idea to perform large amount of work in this type of asynchronous system as everything else will be blocked. For example:

Unless it is otherwise arranged to be so, it should be ensured that implementations of this function finish very quickly.

In an ideal world, the redis crate would be built atop a library like futures and expose all this natively.

extern crate redis;
extern crate futures;

use std::thread;
use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let msg = pubsub.get_message().expect("Unable to get message");
        let payload: Result<String, _> = msg.get_payload();
        tx.send(payload).forget();
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

My understanding gets fuzzier here. In a separate thread, we block for the message and push it into the channel when we get it. What I don't understand is why we need to hold onto the thread's handle. I would expect that foo.forget would be blocking itself, waiting until the stream is empty.

In a telnet connection to the Redis server, send this:

publish rust awesome

And you will see it works. Adding print statements shows that the (for me) the foo.forget statement is run before the thread is spawned.


Multiple messages is trickier. The Sender consumes itself to prevent the generating side from getting too far ahead of the consuming side. This is accomplished by returning another future from send! We need to shuttle it back out of there to reuse it for the next iteration of the loop:

extern crate redis;
extern crate futures;

use std::thread;
use std::sync::mpsc;

use futures::Future;
use futures::stream::{self, Stream};

fn main() {
    let client = redis::Client::open("redis://127.0.0.1/").expect("Unable to connect to Redis");

    let mut pubsub = client.get_pubsub().expect("Unable to create pubsub handle");
    pubsub.subscribe("rust").expect("Unable to subscribe to redis channel");

    let (tx, payloads) = stream::channel();

    let redis_thread = thread::spawn(move || {
        let mut tx = tx;

        while let Ok(msg) = pubsub.get_message() {
            let payload: Result<String, _> = msg.get_payload();

            let (next_tx_tx, next_tx_rx) = mpsc::channel();

            tx.send(payload).and_then(move |new_tx| {
                next_tx_tx.send(new_tx).expect("Unable to send successor channel tx");
                futures::finished(())
            }).forget();

            tx = next_tx_rx.recv().expect("Unable to receive successor channel tx");
        }
    });

    let foo = payloads
        .and_then(|payload| futures::finished(println!("{}", payload)))
        .for_each(|_| Ok(()));

    foo.forget();
    redis_thread.join().expect("unable to join to thread");
}

I'm sure that there will be more ecosystem for this type of interoperation as time goes on. For example, the futures-cpupool crate could probably be extended to support a similar usecase to this.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Thanks for the amazing answer! Just one question: doesn't joining the `redis_thread` negate the entire effort of making the result-reading process non-blocking? Perhaps there's something I'm not understanding. – Ameo Aug 12 '16 at 03:54
  • 1
    "I would expect that foo.forget would be blocking itself, waiting until the stream is empty" Actually, futures are not obligated to provide a "block until ready" method. `forget()`, as far as its description goes, is needed to prevent automatic cancellation when the future is dropped, but it is not related to waiting. In Scala, for example, there is no method on `Future` for that, but instead there is a separate `Await.ready`/`Await.result` pair of methods which wait until the future is ready within some timeout. – Vladimir Matveev Aug 12 '16 at 09:02
  • 1
    As far as I understand, in future-rs it is possible to implement a similar thing with [`Future::select`](http://alexcrichton.com/futures-rs/futures/trait.Future.html#method.select), where the second future completes after a fixed timeout. – Vladimir Matveev Aug 12 '16 at 09:03