3

I am trying to build a service that pulls files from an SFTP server and uploads them to S3.

For the SFTP part, I am using async-ssh2, which gives me a file handler implementing futures::AsyncRead. Since these SFTP files may be quite large, I am trying to turn this File handler into a ByteStream that I can upload using Rusoto. It looks like a ByteStream can be initialized with a futures::Stream.

My plan was to implement Stream on the File object (based on the code here) to be compatible with Rusoto (code reproduced below for posterity):

use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};

pub struct ByteStream<R>(R);

impl<R: tokio::io::AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = u8;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = [0; 1];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf)) {
            Ok(n) if n != 0 => Some(buf[0]).into(),
            _ => None.into(),
        }
    }
}

Would this be a good way to go about doing this? I saw this question, but it seems to be using tokio::io::AsyncRead. Is using tokio the canonical way to go about doing this? If so, is there a way to convert from futures_io::AsyncRead to tokio::io::AsyncRead?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
sonicxml
  • 111
  • 1
  • 6

1 Answers1

1

This was the way I went about doing the conversion. I based it off the code above, except I used a larger buffer (8 KB) to reduce the number of network calls.

use bytes::Bytes;
use core::pin::Pin;
use core::task::{Context, Poll};
use futures::{ready, stream::Stream};
use futures_io::AsyncRead;
use rusoto_s3::StreamingBody;

const KB: usize = 1024;

struct ByteStream<R>(R);

impl<R: AsyncRead + Unpin> Stream for ByteStream<R> {
    type Item = Result<Bytes, std::io::Error>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let mut buf = vec![0_u8; 8 * KB];

        match ready!(Pin::new(&mut self.0).poll_read(cx, &mut buf[..])) {
            Ok(n) if n != 0 => Some(Ok(Bytes::from(buf))).into(),
            Ok(_) => None.into(),
            Err(e) => Some(Err(e)).into(),
        }
    }
}

Allowing me to do this:

fn to_streamingbody(body: async_ssh2::File) -> Option<StreamingBody> {
    let stream = ByteStream(body);
    Some(StreamingBody::new(stream))
}

(note that rusoto::StreamingBody and rusoto::ByteStream are aliases)

sonicxml
  • 111
  • 1
  • 6
  • 2
    Note that this will allocate and zero out an 8MB buffer every time `poll_next()` is called. It'd be good to avoid that expense, for instance by making `buf` a shared [thread local](https://doc.rust-lang.org/stable/std/macro.thread_local.html) buffer. Also, 8MB is [probably much larger than you need](https://stackoverflow.com/questions/2811006/what-is-a-good-buffer-size-for-socket-programming). – John Kugelman Jun 10 '20 at 22:50
  • Oh, cool! I had been trying to do that but didn't know how to get around the compiler error without an unsafe block. And thanks for the other link, will make those changes. – sonicxml Jun 11 '20 at 01:09
  • Updated to use an 8 KB buffer size; however, I am not sure how to get the thread_local working without copying (this might be close to necessitating a new question). `Bytes::from(buf: Vec)` needs ownership of the `Vec`, which we don't have when borrowing from a RefCell (so would need to clone). If we make the buf a `[u8]`, the only method I see in Bytes is `copy_from_slice` which again copies it. – sonicxml Jun 11 '20 at 02:29
  • 1
    On second thought it [may be impossible without GATs](http://smallcultfollowing.com/babysteps/blog/2019/12/10/async-interview-2-cramertj-part-2/): "The main concern that cramertj raised with [Stream] is that, like Iterator, it always gives ownership of each item back to its caller... In practice, many stream/iterator implementations would be more efficient if they could have some internal storage that they re-use over and over. For example, they might have an internal buffer, and when `poll_next` is called, they would give back (upon completion) a reference to that buffer." – John Kugelman Jun 11 '20 at 02:55