2

Given function that returns a stream for which each item could fail:

pub type Error = Box<dyn error::Error>;
pub type Result<A> = result::Result<A, Error>;
fn query_paginated_items(params: Query) -> impl Stream<Item = Result<Item>> { 
  ... 
}

I would like to flatten the result of it into a future of a result of a vector of items like so,

use futures::stream::TryStreamExt;

async fn query_for_all_items(params: Query) -> Result<Vec<Item>> {
  query_paginated_items(params)
    .try_collect::<Vec<Item>>()
    .await
}

I would expect flattening a Stream to a Future to be trivial task but Rust's compiler eludes me with another arcane error.

[rustc] [E] future cannot be sent between threads safely

future created by async block is not `Send`

help: the trait `std::marker::Send` is not implemented for `(dyn std::error::Error + 'static)`
note: required for the cast to the object type `dyn futures::Future<Output = std::result::Result<std::vec::Vec<Item>, std::boxed::Box<(dyn std::error::Error + 'static)>>> + std::marker::Send`

I've tried avoiding try_collect and just looping over the results as they come in:

  let mut stream = query_paginated_items(params);
  let mut items = Vec::new();
  while let Some(item) = stream.next().await {
    items.push(item?);
  }
  Ok(item)

But it complains that my stream is not pinned. If I try to pin my stream with:

  let mut stream = Box::pin(query_paginated_items(params));

then I get the "not sendable" error again.

cheezsteak
  • 2,731
  • 4
  • 26
  • 41

0 Answers0