2

I have a function that takes a &mut io::Write and I'd like to use it to send a streaming response from the actix-web server without having to buffer the whole response. The function is "pushing" the data, and I can't change the function (that's the whole premise of this question) to use async streams or other kind of polling.

Currently I'm forced to use &mut Vec (which implements io::Write) to buffer the whole result and then send the Vec as the response body. However, the response may be large, so I'd rather stream it without buffering.

Is there some kind of adapter that would implement io::Write, with writes blocking as necessary in response to backpressure, and be compatible with types that actix-web can use for responses (e.g. futures::Stream)?

fn generate(output: &mut io::Write) {
    // ...
}

fn request_handler() -> Result<HttpResponse> {
    thread::spawn(|| generate(/*???*/));
    Ok(HttpResponse::Ok().body(/*???*/))
}

std::sync::mpsc and futures::mpsc have either both ends async, or both ends blocking, so it's not obvious how to use them as an adapter between sync and async ends.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Kornel
  • 97,764
  • 37
  • 219
  • 309
  • Is it a third-party function, which you can not alter? Otherwise you could add "checkpoints" to it, buffering a part of the response into a Vec then flushing it into the [response stream](https://docs.rs/actix-web/0.7.19/actix_web/enum.Body.html#variant.Streaming) at a "checkpoint". – chpio Apr 16 '19 at 13:28
  • The `BodyStream` is a wrapper around `futures::Stream`, so this still leaves a question of transforming push-based `io::Write` into pull-based `Stream`. – Kornel Apr 16 '19 at 16:54
  • Why? You could use a [MPSC](https://docs.rs/futures/0.1.25/futures/sync/mpsc/index.html), push your buffers in the mpsc, if it's full then pause the execution until it's ready to be pushed again. Again: this would only work if you're able to split that function into smaller parts. – chpio Apr 17 '19 at 08:42

2 Answers2

4

It is possible. The key piece is futures::sink::Wait:

A sink combinator which converts an asynchronous sink to a blocking sink.

Created by the Sink::wait method, this function transforms any sink into a blocking version. This is implemented by blocking the current thread when a sink is otherwise unable to make progress.

All that is needed is to wrap this type in a struct that implements io::Write:

use futures::{
    sink::{Sink, Wait},
    sync::mpsc,
}; // 0.1.26
use std::{io, thread};

fn generate(_output: &mut io::Write) {
    // ...
}

struct MyWrite<T>(Wait<mpsc::Sender<T>>);

impl<T> io::Write for MyWrite<T>
where
    T: for<'a> From<&'a [u8]> + Send + Sync + 'static,
{
    fn write(&mut self, d: &[u8]) -> io::Result<usize> {
        let len = d.len();
        self.0
            .send(d.into())
            .map(|()| len)
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }

    fn flush(&mut self) -> io::Result<()> {
        self.0
            .flush()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e))
    }
}

fn foo() -> impl futures::Stream<Item = Vec<u8>, Error = ()> {
    let (tx, rx) = mpsc::channel(5);

    let mut w = MyWrite(tx.wait());

    thread::spawn(move || generate(&mut w));

    rx
}
Community
  • 1
  • 1
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
-1

It is not possible. Actix-web manages its own write buffer and socket.

Nikolay Kim
  • 144
  • 1
  • I believe you've misread the question (I've updated it in an attempt to be more clear). The OP doesn't want to have access to Actix' write buffer or socket. – Shepmaster Apr 19 '19 at 16:45