1

I am trying to bridging the async functionality of the SDK with sync code. I need this to create a tokio_stream (a struct with Stream trait implemented.) The problem is quite suprising,

#[cfg(test)]
mod tests {
    use aws_config::meta::region::RegionProviderChain;
    use aws_sdk_kinesis::{Client, Region};
    use aws_config::SdkConfig;
    use aws_sdk_kinesis::error::ListShardsError;
    use aws_sdk_kinesis::output::ListShardsOutput;
    use aws_sdk_kinesis::types::SdkError;
    use futures::executor::block_on;
    use tokio::runtime::Runtime;
    /// It passes this test, without a problem.
    #[tokio::test]
    async fn testing_kinesis_stream_connection() -> () {
        let region_provider = RegionProviderChain::first_try(Region::new("eu-central-1".to_string()));
        let sdk_config =
            aws_config::from_env()
                .region(region_provider)
                .load().await;
        let client = Client::new(&sdk_config);
        let shard_request = client.list_shards().stream_name("datafusion-stream".to_string());
        let shards = shard_request.send().await.unwrap();
        println!("{:?}", shards);
    }
    /// It passes this test, without a problem.
    #[test]
    fn testing_kinesis_stream_connection_without_tokio() -> () {
        let runtime = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap();
        let region_provider = RegionProviderChain::first_try(Region::new("eu-central-1".to_string()));
        let sdk_config =
            runtime.block_on(aws_config::from_env()
                .region(region_provider)
                .load());
        let client = Client::new(&sdk_config);
        let shard_request = client.list_shards().stream_name("datafusion-stream".to_string());
        let shards = runtime.block_on(shard_request.send()).unwrap();
        println!("{:?}", shards);
    }

    fn this_needs_to_be_sync_1() -> SdkConfig {
        let region_provider = RegionProviderChain::first_try(Region::new("eu-central-1".to_string()));
        let sdk_config =
            futures::executor::block_on(aws_config::from_env()
                .region(region_provider)
                .load());
        sdk_config
    }

    fn this_needs_to_be_sync_2(client: Client) -> ListShardsOutput {
        let shard_request = client.list_shards().stream_name("datafusion-stream".to_string());
        let shards = futures::executor::block_on(shard_request.send()).unwrap();
        shards
    }
    /// It hangs in the second block_on.
    #[tokio::test]
    async fn testing_kinesis_stream_connection_sync_inside_async() -> () {
        // Passes this block_on
        let sdk_config = this_needs_to_be_sync_1();
        let client = Client::new(&sdk_config);
        // Blocks here
        let shards = this_needs_to_be_sync_2(client);
        println!("{:?}", shards);
    }

}

I could not come up with a solution since there is no error, only hanged process.

fedonev
  • 20,327
  • 2
  • 25
  • 34

0 Answers0