5

I have problem, why do I get this error when sending request

Error: reqwest::Error {
    kind: Request, 
    url: Url { 
        scheme: "http", 
        cannot_be_a_base: false, 
        username: "", 
        password: None, 
        host: Some(Ipv4(127.0.0.1)), 
        port: Some(3000), 
        path: "/message", 
        query: None, 
        fragment: None 
    }, 
    source: hyper::Error(
        Connect, 
        ConnectError(
            "tcp connect error", 
            Os { 
                code: 10048, 
                kind: AddrInUse, 
                message: "Only one usage of each socket address (protocol/network address/port) is normally permitted." 
            }
        )
    )
}

Its occurring at "random" rates like 15-45s after cargo run. I use axum as server and reqwest as request client. Here is the source. The first piece of code is not multithreaded its just running in a loop (for message in channel), no (async) spawning, the second one is parsing the message and writing some of its context to the database, I'm using SQLite with sqlx as database. The error message is also very confusing, I'm not serving anything except of an app that writes to the database on port 3000, so how is that even possible. I'm also using reqwest client, pool_max_idle_per_host is set to 0 because it's a workaround for hyper::Error(IncompleteMessage): connection closed before message completed. that I was getting as described in this GitHub issue.

//sending at rate of ~1000 per second
pub async fn send_message_to_extractor(
    message: DiscordMessage,
    client: &reqwest::Client,
) -> Result<(), reqwest::Error> {
    //info!("sending message to extractor: {:?}", message);
    let mut message = message;
    message.guild_id = Some("1234".to_owned());
    let request = client
        .post(EXTRACTOR_URL)
        .json(&message)
        .send()
        .await;
    match request {
        Ok(_) => {
            // returned
            Ok(())
        },
        Err(e) => Err(e),
    }
}
pub async fn handle_discord_message(pool: Extension<SqlitePool>, Json(payload): Json<DiscordMessage>) -> impl IntoResponse {
    ...
    // Parsing message about ~400ms of work
    ...
    return StatusCode::OK;
}
    // Code that calls send_message_to_extractor functions which fails after 15-45 sec
    let reqwest_client = reqwest::Client::builder()
        .pool_max_idle_per_host(0)
        .build()?;
    while let Some(row) = stream.try_next().await? {
        let data: Vec<u8> = row.try_get("data")?;
        // turn data to string
        let data_string = if COMPRESSION {
            decode_reader(data)?
        } else {
            String::from_utf8(data)?
        };
        // turn string to discord message
        let discord_message: DiscordMessage = serde_json::from_str(&data_string)?;
        // send message to extractor
        //println!("sending message to extractor: {:?}", discord_message);
        send_message_to_extractor(
            discord_message,
            &reqwest_client
        ).await?;
        bar.inc(1);
        // Do something with the data vector...
    }

    Ok(())
// main.rs
    let pool = SqlitePoolOptions::new()
        .max_connections(50)
        .connect(&DATABASE_URL)
        .await?;

    let app = Router::new()
        .route("/message", post(routes::handle_discord_message))
        .layer(Extension(pool));

    let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
    tracing::info!("listening on {}", addr);
    axum::Server::bind(&addr)
        .serve(app.into_make_service())
        .await
        .unwrap();

    Ok(())

I have tried messing with keep-alive header and different client settings. The only thing that has changed was the time the program took to crash, around ±30s. I don't have anything running except of the axum app on port 3000.

cafce25
  • 15,907
  • 4
  • 25
  • 31
SildCave
  • 51
  • 5

0 Answers0