I am trying to bridging the async functionality of the SDK with sync code. I need this to create a tokio_stream (a struct with Stream trait implemented.) The problem is quite suprising,
#[cfg(test)]
mod tests {
use aws_config::meta::region::RegionProviderChain;
use aws_sdk_kinesis::{Client, Region};
use aws_config::SdkConfig;
use aws_sdk_kinesis::error::ListShardsError;
use aws_sdk_kinesis::output::ListShardsOutput;
use aws_sdk_kinesis::types::SdkError;
use futures::executor::block_on;
use tokio::runtime::Runtime;
/// It passes this test, without a problem.
#[tokio::test]
async fn testing_kinesis_stream_connection() -> () {
let region_provider = RegionProviderChain::first_try(Region::new("eu-central-1".to_string()));
let sdk_config =
aws_config::from_env()
.region(region_provider)
.load().await;
let client = Client::new(&sdk_config);
let shard_request = client.list_shards().stream_name("datafusion-stream".to_string());
let shards = shard_request.send().await.unwrap();
println!("{:?}", shards);
}
/// It passes this test, without a problem.
#[test]
fn testing_kinesis_stream_connection_without_tokio() -> () {
let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
let region_provider = RegionProviderChain::first_try(Region::new("eu-central-1".to_string()));
let sdk_config =
runtime.block_on(aws_config::from_env()
.region(region_provider)
.load());
let client = Client::new(&sdk_config);
let shard_request = client.list_shards().stream_name("datafusion-stream".to_string());
let shards = runtime.block_on(shard_request.send()).unwrap();
println!("{:?}", shards);
}
fn this_needs_to_be_sync_1() -> SdkConfig {
let region_provider = RegionProviderChain::first_try(Region::new("eu-central-1".to_string()));
let sdk_config =
futures::executor::block_on(aws_config::from_env()
.region(region_provider)
.load());
sdk_config
}
fn this_needs_to_be_sync_2(client: Client) -> ListShardsOutput {
let shard_request = client.list_shards().stream_name("datafusion-stream".to_string());
let shards = futures::executor::block_on(shard_request.send()).unwrap();
shards
}
/// It hangs in the second block_on.
#[tokio::test]
async fn testing_kinesis_stream_connection_sync_inside_async() -> () {
// Passes this block_on
let sdk_config = this_needs_to_be_sync_1();
let client = Client::new(&sdk_config);
// Blocks here
let shards = this_needs_to_be_sync_2(client);
println!("{:?}", shards);
}
}
I could not come up with a solution since there is no error, only hanged process.